In [None]:
PROJECT_ID = "project-id" #@param  {allow-input: true}
BUCKET = "bucket-name" #@param  {allow-input: true}
SRC_PATH = "gdrive/My Drive/folder/images-folder/" #@param  {allow-input: true}
DEST_PATH = "images/" #@param  {allow-input: true}
BIGQUERY_TABLE = "dataset.table" #@param  {allow-input: true}
CSV_PATH = 'gdrive/My Drive/folder/data.csv' #@param  {allow-input: true}
MODEL= "gemini-1.5-flash-002" #@param  {allow-input: true}

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import pandas as pd
import json

df=pd.read_csv(CSV_PATH)
df.head()

In [3]:

lookup = {
    "Country":{"k":"select_label", "v":"label"},
    "What language is this label written in?": {"k":"task_label", "v":"value"},
    "County":{"k":"select_label", "v":"value"},
    "DAO Accession Number\n": {"k":"task_label", "v":"value"},
    'Scientific Name': {'k': 'task_label', 'v': 'value'},
    'Collected By': {'k': 'task_label', 'v': 'value'},
    'Verbatim Date': {'k': 'task_label', 'v': 'value'},
    'Are there geographic coordinates present?': {'k': 'task_label', 'v': 'value'},
    "Collection Date (year)": {'k': 'select_label', 'v': 'label'},
    "Collection Date (month)": {'k': 'select_label', 'v': 'label'},
    'Collection Date (day)': { 'k': 'select_label', 'v': 'label'},
}


def iterate_json_tree(data, path=""):
    if isinstance(data, dict):
        for key, value in data.items():
            yield from iterate_json_tree(value, path + "/" + key if path else key)
    elif isinstance(data, list):
        for index, value in enumerate(data):
            yield from iterate_json_tree(value, path + f"[{index}]" if path else f"[{index}]")
    else:
        yield (path, data)


def get_data(row):

  metadata = json.loads(row["metadata"])

  annotations = json.loads(row["annotations"])

  subject_data = json.loads(row["subject_data"])

  first = next(iter(subject_data))
  filename = subject_data[first]['Filename']

  obj = {}

  for path, value in iterate_json_tree(annotations):
    path_parts = path.split("/")
    sub_path = "/".join(path_parts[:-1])
    sub_value = path_parts[-1]

    if sub_path not in obj:
      obj[sub_path] = {}

    obj[sub_path][sub_value] = value

  data = {}

  for p, o in obj.items():
    for label, keys in lookup.items():
      if keys["k"] in o and keys["v"] in o and o[keys["k"]]==label:
        data[label.replace("\n","").replace("(","").replace(")","")] = o[keys["v"]]

  return {
      "classification_id": row["classification_id"],
      "filename": filename,
      "data": data,
      "metadata": metadata,
      "subject_data": subject_data,
      "annotations": annotations,
  }


def upload_file(bucket_name, filename):
    """Uploads a file to the bucket, skipping if it already exists."""

    destination_blob_name = DEST_PATH + filename
    source_file_name = SRC_PATH + filename

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    if blob.exists():
        print(f"File {destination_blob_name} already exists in bucket {bucket_name}. Skipping upload.")
        return

    blob.upload_from_filename(source_file_name)

    print(f"File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}.")


def get_ai_generation(image_uri):

  model = GenerativeModel(MODEL)

  response_schema = {
        "type": "object",
        "properties": {
            "content": {
                "type": "string",
            },
            "Scientific Name": {
                "type": "string"
            },
            "label language": {
                "type": "string"
            },
            "location": {
                "type": "object",
                "properties": {
                    "Location Verbatim": {
                        "type": "string"
                    },
                    "Location Verbatim English": {
                        "type": "string"
                    },
                    "country": {
                        "type": "string"
                    },
                    "county": {
                        "type": "string"
                    },
                    "Are there geographic coordinates present?": {
                        "type": "boolean"
                    },
                    "Lat Long":{
                        "type": "string"
                    }
                }
            },
            "date":{
                "type": "object",
                "properties": {
                  "Verbatim Date": {
                      "type": "string"
                  },
                  "Year": {
                      "type": "integer"
                  }
                }
            },
            "Identification Numbers": {
                "type":"array",
                "items": {
                    "type": "string",
                }
            },
            "Collected by":{
                "type": "string"
            },

        },
        "required": ["content", "label language", "Identification Numbers", "location"],

  }

  prompt = {
      "Instructions": "Help describe what is in this image. ",
      "Important Notes": [
        "there may be more than one identification numbers",
        "any country or label language format should be uppercase 2 character ISO country code"
      ],
  }

  response = model.generate_content(
      [
          Part.from_uri(
              image_uri,
              mime_type="image/jpeg",
          ),
          json.dumps(prompt),
      ],
      generation_config=GenerationConfig(
          response_mime_type="application/json", response_schema=response_schema
      ),
  )

  print("AI analysis done. ")

  response_json = json.loads(response.text)
  return response_json



def evaluate_classification(image_uri, ai_generation, subject):
  model = GenerativeModel(MODEL)

  response_schema = {
        "type": "object",
        "properties": {
            "evaluation": {
                "type": "string",
            },
            "differences": {
                "type": "array",
                "items": {
                    "type": "string",
                }
            },
            "missing fields": {
                "type": "array",
                "items": {
                    "type": "string"
                }
            },
            "score": {
                "type": "integer"
            }

        },
        "required": ["evaluation", "score"],

  }

  prompt = {
      "Instructions": [
          "Please evaluate the human classification against AI generation",
          "Provide a summary and enumerate the differences.",
          "Provide a quality score of the human classification from 0 to 5 based on accuracy and completeness"
          "Identify missing fields in human classification based on required fields",
      ],
      "Important Notes": [
        "The label language is provided as the 2-character ISO country code",
      ],
      "AI Generation": ai_generation,
      "Human Classification": subject["data"],
      "Required fields": list(lookup.keys()),
  }


  response = model.generate_content(
      [
          # Part.from_uri(
          #     image_uri,
          #     mime_type="image/jpeg",
          # ),
          json.dumps(prompt),
      ],
      generation_config=GenerationConfig(
          response_mime_type="application/json", response_schema=response_schema
      ),
  )

  print("Evaluation done. ")

  response_json = json.loads(response.text)
  return response_json

def add_to_bigquery(record):
  bigquery_client.insert_rows_json(BIGQUERY_TABLE, [record])
  print("Record added to BigQuery.")

def get_completed_classifications():
  query = f"""
    SELECT classification_id
    FROM `{BIGQUERY_TABLE}`
  """
  query_job = bigquery_client.query(query)
  results = query_job.result()
  existing_ids = [row.classification_id for row in query_job]

  # Convert the list to a Pandas Series for easier comparison
  existing_ids_series = pd.Series(existing_ids)
  return existing_ids_series

In [None]:
from google.colab import auth

auth.authenticate_user()

! gcloud config set project {PROJECT_ID}
! gcloud auth application-default login -q

In [6]:


from google.cloud import storage
import vertexai
from vertexai.generative_models import GenerativeModel, Part, GenerationConfig
vertexai.init(project=PROJECT_ID, location="us-central1")

from google.cloud import bigquery
bigquery_client = bigquery.Client(project=PROJECT_ID)


In [None]:

existing_ids_series = get_completed_classifications()
filtered_df = df[~df['classification_id'].isin(existing_ids_series)]

for i in range(0, 50):
  subject = get_data(filtered_df.iloc[i])

  filename = subject["filename"]

  print(f"({i}) Analyzing {subject['classification_id']} {filename} ")

  upload_file(BUCKET, filename)

  image_uri = "gs://"+BUCKET+"/"+DEST_PATH+filename
  json_str = json.dumps(subject["data"])


  ai_generation = get_ai_generation(image_uri)
  evaluation = evaluate_classification(image_uri, ai_generation, subject)

  record = {
      "classification_id": float(subject["classification_id"]),
      "uri": image_uri,
      "filename": filename,
      "scientific_name": ai_generation["Scientific Name"],
      "parsed_annotations": json.dumps(subject["data"]),
      "ai_generation": json.dumps(ai_generation),
      "human_evaluation": json.dumps(evaluation),
  }

  add_to_bigquery(record)

