In [None]:
# -------------------------------
# Imports & Setup
# -------------------------------
!pip install dtlpy pandas requests beautifulsoup4 google-cloud-bigquery

import dtlpy as dl
from datetime import date, timedelta
import requests
from bs4 import BeautifulSoup
import pandas as pd
import json
from google.cloud import bigquery


In [None]:
# -------------------------------
# Date Utilities
# -------------------------------
def get_yesterday_and_today():
    today = date.today()
    yesterday = today - timedelta(days=1)
    return yesterday.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d")

def get_last_date():
    query = "SELECT MAX(date) FROM `patches_quality_report_all`"
    client = bigquery.Client(project='project_id')
    results = client.query(query).to_dataframe()
    return str(results.iloc[0][0])


In [None]:
# -------------------------------
# Fetch Patches Report (Mock BigQuery)
# -------------------------------
def get_patches_report(start_date, finish_date):
    query = f"""
    SELECT * FROM `human_patches_clear_data`
    WHERE is_correct_answer = False
      AND date > '{start_date}' 
      AND date < '{finish_date}'
    """
    client = bigquery.Client(project='project_name')
    results = client.query(query).to_dataframe()
    return results

# Example usage:
start_date, finish_date = get_yesterday_and_today()
results = get_patches_report(start_date, finish_date)


In [None]:
# -------------------------------
# Extract Patch Image Links
# -------------------------------
def build_patch_report(results):
    list_all_patches = [dict(results.iloc[i]) for i in range(len(results))]
    patch_report_list = []

    for row in list_all_patches:
        url = row.get("patch_url")
        response = requests.get(url)
        
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, "html.parser")
            image_url = [img["src"] for img in soup.find_all("img")][0]

            row_dict = {k: row[k] for k in row.keys()}
            row_dict['link'] = image_url
            row_dict['tagger'] = None
            patch_report_list.append(row_dict)
        else:
            print(f"Failed to retrieve images. Status code: {response.status_code}")

    return pd.DataFrame(patch_report_list)

In [None]:
# -------------------------------
# Mock Dataloop API
# -------------------------------
def get_creator_metadata(item_id):
    return {item.metadata['annotation_item_id']:[(action['status']['creator'],action['status']['timestamp']) for action in item.metadata['system']['taskStatusLog'] if action['action'] == 'created']}
    # {item_id: [("Tagger_1", "2025-09-07T10:00:00")]}

dataloop_project = dl.projects.get(project_id="project_id")

def get_tagger(row):
    try:
        dataset_id = row['dataloop_link'].split('datasets/')[1].rsplit('/items')[0]
    except:
        dataset_id = None

    annotation_item_ids = row.get('annotation_item_ids')
    if pd.isnull(annotation_item_ids) or dataset_id is None:
        return None

    dataset = dataloop_project.datasets.get(dataset_id=dataset_id)
    filters = dl.Filters()
    filters.add(field="metadata.annotation_item_id", 
                values=json.loads(annotation_item_ids), 
                operator=dl.FiltersOperations.IN)
    list_items = dataset.items.get_all_items(filters=filters)
    taggers = [get_creator_metadata(item) for item in list_items]
    
    return taggers


In [None]:
# -------------------------------
# Run Pipeline
# -------------------------------

# Assign taggers
patch_report_list['tagger'] = patch_report_list.apply(get_tagger, axis=1)

# Clean dataset for portfolio
patch_report_list = patch_report_list.drop_duplicates(subset=['patch_id'])
patch_report_list['patch_id'] = patch_report_list['patch_id'].astype(int)

# Keep only relevant columns
patch_report_list = patch_report_list[['image_id', 'annotation_item_ids', 'dataloop_link', 'tagger', 'link']]


In [None]:
# -------------------------------
# Upload to BigQuery (Portfolio-safe)
# -------------------------------
client = bigquery.Client(project='portfolio-project')
table_id = "portfolio_dataset.patch_reports"

job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
job = client.load_table_from_dataframe(patch_report_list, table_id, job_config=job_config)
job.result()
print(f"Table {table_id} uploaded successfully.")
