# Setup Notebook

In [None]:
#@markdown import libraries
import os
import sys
import json
import logging

import ipywidgets as widgets
from IPython.display import display
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import sqlalchemy as sqla
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

# Setup Cloud Database

In [None]:
from google.colab import auth

auth.authenticate_user()

# Setup Google Cloud Project

In [None]:
#@markdown Please enter your GCP Project ID
project_id = "locust-early-warning" #@param {type:"string"}
assert project_id, "Please enter your Google Project ID to continue"
!gcloud config set project {project_id}

Updated property [core/project].


In [None]:
#@markdown Set IAM policy binding
user_account = !gcloud auth list --filter=status:ACTIVE --format="value(account)"
print("Active User Account: ", user_account[0])

!gcloud projects add-iam-policy-binding {project_id} \
  --member=user:{user_account[0]} \
  --role="roles/cloudsql.client"

Active User Account:  k.panford-quainoo@instadeep.com
bindings:
- members:
- members:
  - serviceAccount:service-835848499535@gcp-sa-aiplatform-cc.iam.gserviceaccount.com
  role: roles/aiplatform.customCodeServiceAgent
- members:
  - serviceAccount:service-835848499535@gcp-sa-aiplatform.iam.gserviceaccount.com
  role: roles/aiplatform.serviceAgent
- members:
  - user:a.pretorius@instadeep.com
  - user:ababiker@google.com
  - user:hectoramoah@google.com
  - user:i.yusuf@instadeep.com
  - user:jyh@google.com
  - user:m.moussa@instadeep.com
  - user:perrynelson@google.com
  - user:s.morjane@instadeep.com
  - user:t.tumiel@instadeep.com
  role: roles/aiplatform.tensorboardWebAppUser
- members:
  role: roles/aiplatform.user
- members:
  - serviceAccount:service-835848499535@gcp-sa-artifactregistry.iam.gserviceaccount.com
  role: roles/artifactregistry.serviceAgent
- members:
  - serviceAccount:835848499535@cloudbuild.gserviceaccount.com
  role: roles/cloudbuild.builds.builder
- members:
  -

In [None]:
# enable Cloud SQL Admin API
!gcloud services enable sqladmin.googleapis.com

In [None]:
#@title Create Cloud SQL Instance

#@markdown Please fill in the both the Google Cloud region and name of your Cloud SQL instance. Once filled in, run the cell.

# Please fill in these values.
region = "us-central1" #@param {type:"string"}
instance_name = "my-test-db" #@param {type:"string"}

assert region, "Please enter a Google Cloud region"
assert instance_name, "Please enter the name of your instance"

# check if Cloud SQL instance exists in the provided region
database_version = !gcloud sql instances describe {instance_name} --format="value(databaseVersion)"
if database_version[0].startswith("POSTGRES"):
  print("Found existing Postgres Cloud SQL Instance!")
else:
  print("Creating new Cloud SQL instance...")
  password = input("Please provide a password to be used for 'postgres' database user: ")
  !gcloud sql instances create {instance_name} --database-version=POSTGRES_15 \
    --region={region} --cpu=1 --memory=4GB --root-password={password} \
    --database-flags=cloudsql.iam_authentication=On


Found existing Postgres Cloud SQL Instance!


In [None]:
instance_connection_name = f"{project_id}:{region}:{instance_name}"
print("Instance Connection Name: ", instance_connection_name)



# Create Database

In [None]:
#@markdown Please Enter database name to create
database_name = "ann-otate" #@param {type: "string"}
assert database_name, "Please enter a name for the database to be created"

!gcloud sql databases create {database_name} --instance={instance_name}

[1;31mERROR:[0m (gcloud.sql.databases.create) HTTPError 400: Invalid request: failed to create database ann-otate. Detail: pq: database "ann-otate" already exists.


In [None]:
#@title Install Cloud SQL connector
!{sys.executable} -m pip install -q cloud-sql-python-connector["pg8000"]

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.2/54.2 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m105.0/105.0 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25h

# Database

In [None]:
#@markdown Run this cell
import reprlib
from datetime import datetime
from typing import List
import sqlalchemy as sqla
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import relationship
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class ProjectConfigurations(Base):
  __tablename__ = 'project_configurations'

  project_id: Mapped[int] = mapped_column(sqla.Integer, primary_key=True, autoincrement=True)
  project_title: Mapped[str] = mapped_column(sqla.String(255))
  cloud_bucket_name: Mapped[str] = mapped_column(sqla.String(255))
  cloud_bucket_prefix: Mapped[str] = mapped_column(sqla.String(255))
  comma_separated_labels: Mapped[str] = mapped_column(sqla.String(255))
  max_annotation_per_example: Mapped[int] = mapped_column(sqla.Integer)
  completion_deadline = mapped_column(sqla.TIMESTAMP, default=datetime.utcnow)
  created_at = mapped_column(sqla.TIMESTAMP, default=datetime.utcnow)

  def __repr__(self)-> str:
    return (f'Project Configurations\n{"*" * 26} \n' +
            # f'project_id={self.project_id!r}\n' +
            f'project title={self.project_title!r}\n' +
            f'cloud_bucket_name={self.cloud_bucket_name!r}\n' +
            f'cloud_bucket_prefix={self.cloud_bucket_prefix!r}\n' +
            f'comma_separated_labels={self.comma_separated_labels!r}\n' +
            f'max_annotation_per_example={self.max_annotation_per_example!r}\n' +
            f'completion_deadline={self.completion_deadline!r}\n' +
            f'project creation date={self.created_at!r}')


class Annotator(Base):
  __tablename__ = 'annotators'

  annotator_id: Mapped[int] = mapped_column(sqla.Integer, primary_key=True,
                                            nullable=False, autoincrement=True)
  username: Mapped[str] = mapped_column(sqla.String(255))
  email: Mapped[str] = mapped_column(sqla.String(255), nullable=True)

  annotations = relationship("Annotation", back_populates="annotator")
  assigned_annotators = relationship("AssignedAnnotator", back_populates="annotator")

  def __repr__(self)-> str:
    return (f'Annotator(annotator_id={self.annotator_id!r},' +
            f'username={self.username!r}, ' +
            f'email={self.email!r}')


class Annotation(Base):
  __tablename__ = 'annotations'

  annotation_id: Mapped[int] = mapped_column(sqla.Integer, primary_key=True, autoincrement=True)
  label: Mapped[str] = mapped_column(sqla.String(60), nullable=False)
  example_id: Mapped[str] = mapped_column(sqla.String(255), sqla.ForeignKey('examples.example_id'), nullable=False)
  example: Mapped['Example'] = relationship("Example", back_populates='annotations')

  annotator_id: Mapped[int] = mapped_column(sqla.Integer, sqla.ForeignKey('annotators.annotator_id'), nullable=False)
  annotator: Mapped['Annotator'] = relationship("Annotator", back_populates='annotations')

  def __repr__(self)-> str:
    return (f'Annotation(annotation_id={self.annotation_id!r},' +
            f'label={self.label!r}, ' +
            f'annotator={reprlib.repr(self.annotator)}')


class Example(Base):
  __tablename__ = 'examples'

  example_id: Mapped[str] = mapped_column(sqla.String(255), nullable=False, primary_key=True)
  image_path: Mapped[str] = mapped_column(sqla.String(255), nullable=False)

  annotations: Mapped[List['Annotation']] = relationship("Annotation", back_populates="example")
  assigned_annotators: Mapped[List['AssignedAnnotator']] = relationship("AssignedAnnotator", back_populates="example")

  def __repr__(self)-> str:
    return (f'Example(example_id={self.example_id!r},' +
            f'image_path={self.image_path!r}, ' +
            f'annotations={reprlib.repr(self.annotations)}')


class AssignedAnnotator(Base):
  __tablename__ = 'assigned_annotators'

  assignment_id: Mapped[int] = mapped_column(sqla.Integer, primary_key=True, autoincrement=True)
  example_id: Mapped[str] = mapped_column(sqla.String(255), sqla.ForeignKey('examples.example_id'), nullable=False)
  annotator_id: Mapped[int] = mapped_column(sqla.Integer, sqla.ForeignKey('annotators.annotator_id'), nullable=False)

  example: Mapped['Example'] = relationship("Example", back_populates="assigned_annotators")
  annotator: Mapped['Annotator'] = relationship("Annotator", back_populates="assigned_annotators")

  def __repr__(self)-> str:
    return (f'AssignedAnnotator(assignment_id={self.assignment_id!r}, ' +
            f'example_id={self.example_id!r}, ' +
            f'annotator_id={self.annotator_id}')

In [None]:
#@title Task Manager

#@markdown Setup_lib.py
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import exc
from sqlalchemy.sql import text
from google.cloud.sql.connector import Connector

class DatabaseManager:
  def __init__(
      self,
      username: str,
      password: str,
      db_name: str,
      instance_connection_name: str,
      **kwargs: dict):

    self.username = username
    self.password = password
    self.db_name = db_name
    self.instance_connection_name = instance_connection_name

    self.kwargs = kwargs
    self._engine = None
    self._session = None

  def setup(self):
    connector = Connector()
    def getconn():
      conn = connector.connect(
          self.instance_connection_name,
          "pg8000",
          user=self.username,
          password=self.password,
          db=self.db_name
      )
      return conn

    self._engine = sqla.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
    )
    Base.metadata.bind = self._engine
    try:
      Base.metadata.create_all(self._engine)
    except exc.ProgrammingError as e:
      raise # 'Error creating Database'

    session_factory = sessionmaker(autocommit=False, autoflush=False, bind=self._engine)
    self._session = scoped_session(session_factory)

    Base.query = self._session.query_property()
    logging.info("Database Manager Initialized")

  def get_session(self):
    return self._session


class TaskManager:
  def __init__(self, db_configs) -> None:
    self.db_configs = db_configs
    self._database_manager = None
    self._task_configs = None
    pass

  def init(self):
    self._database_manager = DatabaseManager(**self.db_configs)
    self._database_manager.setup()
    pass

  def add_tasks_from_csv(self, csv_path: str):
    '''
    Adding Tasks from CSV metadata
    Args:
      csv_path: path to CSV file with metadata
    Return:
      None
    '''
    tasks_df = pd.read_csv(csv_path)
    session = self._database_manager.get_session()

    for _, row in tasks_df.iterrows():
      example_id = row['example_id']
      existing_task = session.query(Example).filter_by(example_id=str(example_id)).first()

      if existing_task is None:
        task = Example(example_id=example_id, image_path=row['image'])
        session.add(task)
        session.commit()
      else:
        logging.info(f"Task with example_id '{example_id}' already exists. Skipping.")
    logging.info("\n")

  def set_project_configs(
      self,
      project_title: str = None,
      cloud_bucket_name: str = None,
      cloud_bucket_prefix: str = None,
      comma_separated_labels: str = None,
      max_annotators_per_example: str = None,
      completion_deadline: datetime = None
      )-> None:

    assert project_title, 'Please Enter project title'
    assert cloud_bucket_name, 'Please Enter source bucket name'
    assert comma_separated_labels, 'Please Enter task labels'

    session = self._database_manager.get_session()
    existing_config = session.query(ProjectConfigurations).filter_by(project_id=1).first()

    if existing_config:
      if project_title: existing_config.project_title = project_title
      if cloud_bucket_name: existing_config.cloud_bucket_name = cloud_bucket_name
      if cloud_bucket_prefix: existing_config.cloud_bucket_prefix = cloud_bucket_prefix
      if comma_separated_labels: existing_config.comma_separated_labels = comma_separated_labels
      if max_annotators_per_example: existing_config.max_annotation_per_example = max_annotators_per_example
      if completion_deadline: existing_config.completion_deadline = completion_deadline

    else:
      project_configs = ProjectConfigurations(
        project_title=project_title,
        cloud_bucket_name=cloud_bucket_name,
        cloud_bucket_prefix=cloud_bucket_prefix,
        comma_separated_labels=comma_separated_labels,
        max_annotation_per_example=max_annotators_per_example
      )
      session.add(project_configs)
    session.commit()

  def assign_tasks(self, max_annotators_per_example):
    # Get all available examples and available annotators
    # Assign examples to annotators such that each example
    # is assigned up to a MAX_ANNOTATORS_PER_TASK number of times

    session = self._database_manager.get_session()
    assignments = []

    # Query humans and tasks
    annotators = session.query(Annotator).all()
    examples = session.query(Example).all()
    num_annotators = len(annotators)
    num_examples = len(examples)

    # Round Robin Assignment Algorithm

    for i in range(num_examples):
      example = examples[i]
      for j in range(max_annotators_per_example):
        annotator = annotators[(i * max_annotators_per_example + j) % num_annotators]
        assignments.append(
          AssignedAnnotator(annotator_id=annotator.annotator_id,
            example_id=example.example_id)
            )
    session.add_all(assignments)
    session.commit()
    return assignments

  def show_assignments(self, limit=None):
    session = self._database_manager.get_session()
    assigned_annotators = session.query(AssignedAnnotator).all()

    limit = limit or len(assigned_annotators)
    for assigned_annotator in assigned_annotators[:limit]:
      print(f'Example ID: {assigned_annotator.example_id}, assigned to Annotator: {assigned_annotator.annotator.username}')

  def get_task_assignments(self):
    session = self._database_manager.get_session()
    results = sqla.select(
      Example.example_id, Example.image_path, Annotator.username
    ).join(Annotator)
    return results.all()

  def get_partially_assigned_tasks(self):
    '''Get tasks that have partially been assigned'''
    session = self._database_manager.get_session()
    partially_labeled_tasks = (
      session.query(Example)
      .outerjoin(Example.annotations)
      .group_by(Example.example_id)
      # .having(sqla.func.count(Annotator.id) < self.tasks_configs.max_annotators_per_task)
      .all()
    )
    return partially_labeled_tasks

  def add_annotators(self, annotators):
    session = self._database_manager.get_session()

    for annotator_data in annotators:
      username = annotator_data.get('username')
      email = annotator_data.get('email')

      existing_annotator = session.query(Annotator).filter_by(username=username).first()

      if existing_annotator is None:
        annotator = Annotator(username=username, email=email)
        try:
          session.add(annotator)
          session.commit()
          logging.info(f"Added new annotator '{username}'")
        except Exception as e:
          session.rollback()
          logging.error(f"Error adding annotator '{username}': {e}")
      else:
        logging.warning(f"Annotator with username '{username}' already exists")


  def get_assigned_tasks(self, username):
    session = self._database_manager.get_session()

    results = session.query(Example).\
      join(AssignedAnnotator, Example.example_id == AssignedAnnotator.example_id).\
      join(Annotator, AssignedAnnotator.annotator_id == Annotator.annotator_id).\
      filter(Annotator.username == username).all()

    return results

  def list_tasks(self, limit=None):
    session = self._database_manager.get_session()
    results = session.query(Example).all()
    limit = limit or len(results)
    return results[:limit]

  def list_annotators(self):
    session = self._database_manager.get_session()
    return session.query(Annotator).all()

  def get_project_configs(self):
    session = self._database_manager.get_session()
    return session.query(ProjectConfigurations).all()

  def grant_user_access(self, user):
    session = self._database_manager.get_session()
    tables = ['project_configurations',
              'examples',
              'annotations',
              'annotators',
              'assigned_annotators']
    for table in tables:
      session.execute(text(f"GRANT SELECT ON {table} TO {user};"))
    # statement1 = text(f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO {user};')
    # statement2 = text(f'GRANT ALL PRIVILEGES ON DATABASE "{self.db_configs["db_name"]}" TO {user};')

    # session.execute(statement1)
    # session.execute(statement2)


In [None]:
#@title Init Task Manager
auth_configs = {'username': 'postgres',
                'password': 'postgres',
                'db_name': database_name,
                'instance_connection_name': instance_connection_name
                }

task_manager = TaskManager(db_configs=auth_configs)
task_manager.init()

In [None]:
#@title Get Project Configs
#@markdown Enter Title for Project
project_title = "SKAI Data Annotation" #@param {type: "string"}
assert project_title, "Please enter a project title"

#@markdown Specify deadline to complete annotation project
completion_deadline = "2024-04-30" #@param {type:"date"}

#@markdown Enter Data Source ie. GCP cloud bucket name
bucket_name = "skai-project" #@param {type: "string"}
assert bucket_name, "Please enter data source for project"

#@markdown Enter Data Source Prefix
bucket_prefix = "skai-source_storage" #@param {type: "string"}

#@markdown Enter list of possible labels for data with comma separation eg. cat, dog
comma_separated_labels = "no_damage, destroyed, major_damage, minor_damage, None of the above" #@param {type: "string"}
assert comma_separated_labels, "Please enter labels as comma-separated string"

#@markdown Enter the maximum number of annotators to assign to each example
max_annotators_per_example = 3 # @param {type:"integer"}

In [None]:
#@title Set Project Configs
task_manager.set_project_configs(
    project_title,
    bucket_name,
    bucket_prefix,
    comma_separated_labels,
    max_annotators_per_example,
    completion_deadline)

# View configs
task_manager.get_project_configs()[0]

Project Configurations
************************** 
project title='SKAI Data Annotation'
cloud_bucket_name='skai-project'
cloud_bucket_prefix='skai-source_storage'
comma_separated_labels='no_damage, destroyed, major_damage, minor_damage, None of the above'
max_annotation_per_example=3
completion_deadline=datetime.datetime(2024, 4, 30, 0, 0)
project creation date=datetime.datetime(2024, 4, 15, 14, 26, 8, 745105)

# Add Users to Database

In [None]:
#@markdown Please Enter Username and password and click to add
from collections import defaultdict

user_dict = defaultdict()

def add_user_to_db(input):
  !gcloud sql users create {USERNAME.value} \
    --instance={instance_name} \
    --password={PASSWORD.value}
  user_dict[USERNAME.value] = EMAIL.value

def remove_user_from_db(input):
  !gcloud sql users delete {USERNAME.value} \
    --instance={instance_name}
  del user_dict[USERNAME.value]

USERNAME = widgets.Text(
    value=None,
    placeholder='username',
    description='USER:',
    disabled=False
)

PASSWORD = widgets.Password(
    value=None,
    placeholder='password',
    description='PASSWORD:',
    disabled=False
)

EMAIL = widgets.Text(
    value=None,
    placeholder='Enter email',
    description='EMAIL:',
    disabled=False
)

add_user = widgets.Button(
    value=False,
    description='Add User',
    disabled=False,
    button_style='success',
    tooltip='Description',
    icon='check',
)

remove_user = widgets.Button(
    value=False,
    description='Remove User',
    disabled=False,
    button_style='danger',
    tooltip='Description',
    icon='check',
)

add_user.on_click(add_user_to_db)
remove_user.on_click(remove_user_from_db)

# TODO: Add User button on_click
data_box1 = widgets.HBox([USERNAME, PASSWORD, EMAIL])
data_box2 = widgets.HBox([add_user, remove_user])
display(widgets.VBox([data_box1, data_box2]))

VBox(children=(HBox(children=(Text(value='', description='USER:', placeholder='username'), Password(descriptio…

In [None]:
#@markdown # View list of users added to SQL Instance
users = !gcloud sql users list --instance={instance_name} --format="value(name)"
print("List of Added users: ", users)

List of Added users:  ['kbb', 'kobby', 'panford', 'postgres']


In [None]:
task_manager.grant_user_access('kobby')

In [None]:
# !grant select on ProjectConfigurations to "kobby";

/bin/bash: line 1: grant: command not found


In [None]:
annotators = [{'username': username} for username in users]
annotators

[{'username': 'kbb'},
 {'username': 'kobby'},
 {'username': 'panford'},
 {'username': 'postgres'}]

In [None]:
for annotator_data in annotators:
  username = annotator_data.get('username')
  email = annotator_data.get('email')

  print("username: ", username)
  print("email: ", email)


username:  kbb
email:  None
username:  kobby
email:  None
username:  panford
email:  None
username:  postgres
email:  None


In [None]:
task_manager.add_annotators(annotators)

ERROR:root:Error adding annotator 'kbb': (pg8000.exceptions.DatabaseError) {'S': 'ERROR', 'V': 'ERROR', 'C': '23502', 'M': 'null value in column "email" of relation "annotators" violates not-null constraint', 'D': 'Failing row contains (8, null, kbb).', 's': 'public', 't': 'annotators', 'c': 'email', 'F': 'execMain.c', 'L': '1974', 'R': 'ExecConstraints'}
[SQL: INSERT INTO annotators (username, email) VALUES (%s::VARCHAR, %s::VARCHAR) RETURNING annotators.annotator_id]
[parameters: ('kbb', None)]
(Background on this error at: https://sqlalche.me/e/20/4xp6)
ERROR:root:Error adding annotator 'kobby': (pg8000.exceptions.DatabaseError) {'S': 'ERROR', 'V': 'ERROR', 'C': '23502', 'M': 'null value in column "email" of relation "annotators" violates not-null constraint', 'D': 'Failing row contains (9, null, kobby).', 's': 'public', 't': 'annotators', 'c': 'email', 'F': 'execMain.c', 'L': '1974', 'R': 'ExecConstraints'}
[SQL: INSERT INTO annotators (username, email) VALUES (%s::VARCHAR, %s::VAR

In [None]:
task_manager.list_annotators()

[]

# Upload Data

In [None]:
# We will upload data from CSV. Lets get the csv file
import google.cloud.storage as storage

filename='image_metadata_copy.csv'

client = storage.Client()
# Get the bucket
bucket = client.bucket(bucket_name=bucket_name)
# Get the blob (file) from the bucket
blob = bucket.blob(os.path.join(bucket_prefix, filename))

# Download the blob to the specified destination path
blob.download_to_filename('metadata.csv')

In [None]:
task_manager.add_tasks_from_csv('metadata.csv')

In [None]:
task_manager.list_tasks(limit=2)

[Example(example_id='4ec2f03040470970a07931e1e0167225',image_path='gs://skai-project/skai-source_storage/4ec2f03040470970a07931e1e0167225.png', annotations=[],
 Example(example_id='dce01ef4824ed6098cd2936e872d4297',image_path='gs://skai-project/skai-source_storage/dce01ef4824ed6098cd2936e872d4297.png', annotations=[]]

In [None]:
task_manager.list_annotators()

[]

# Assign Tasks to Annotators

In [None]:
task_manager.assign_tasks(max_annotators_per_example=max_annotators_per_example)

# Inspect Task Assignments

In [None]:
task_manager.show_assignment()

# Export Annotations

# Delete Data/ Database

In [None]:
!gcloud sql databases delete {DATABASE_NAME.value} \
--instance=INSTANCE_NAME
!gcloud sql instances delete {instance_name}