In [1]:
import time
import datetime

from experiment.utils import dbutils, transformation
from experiment.utils.logging import logger
from experiment.utils.tables.upload_tasks_table import UploadTasksTable
from experiment.api import label_studio

import openai



In [2]:
db = dbutils.DatabaseUtils()

In [3]:
PROMPT_N_MORE_REPORTS = 200
PROMPT = "Perform the following transformation on the report: Translate into English" 
PRIORITIZE_BY = "normal" # "normal" or "emergency"

In [4]:
reports_raw, Base = UploadTasksTable()

In [5]:
# generate annotation tables
# Base.metadata.create_all(db.engine)

db.run_dbt_model('all')

[0m22:15:53  Running with dbt=1.6.1
[0m22:15:53  Registered adapter: postgres=1.6.1
[0m22:15:54  Found 9 models, 1 snapshot, 3 sources, 0 exposures, 0 metrics, 689 macros, 0 groups, 0 semantic models
[0m22:15:54  
[0m22:15:56  Concurrency: 5 threads (target='prod')
[0m22:15:56  
[0m22:15:56  1 of 1 START snapshot snapshot.report_classifications_snapshot ................. [RUN]
[0m22:15:58  1 of 1 OK snapshotted snapshot.report_classifications_snapshot ................. [[32msuccess[0m in 1.60s]
[0m22:15:58  
[0m22:15:58  Finished running 1 snapshot in 0 hours 0 minutes and 4.78 seconds (4.78s).
[0m22:15:58  
[0m22:15:58  [32mCompleted successfully[0m
[0m22:15:58  
[0m22:15:58  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
[0m22:16:00  Running with dbt=1.6.1
[0m22:16:00  Registered adapter: postgres=1.6.1
[0m22:16:01  Found 9 models, 1 snapshot, 3 sources, 0 exposures, 0 metrics, 689 macros, 0 groups, 0 semantic models
[0m22:16:01  
[0m22:16:03  Concurrency: 5 thread

### 1) Select Reports

In [6]:
PRIORITIZE_BY_VALUES = {
    "normal": "ASC",
    "emergency": "DESC"
}

In [7]:
# get reports directly from database
query = f"""
            SELECT * FROM annotation.upload_tasks ut 
            WHERE
                report_id NOT IN (
                SELECT
                    (DATA ->> 'report_id')::INT AS report_id
                FROM
                    public.task)
                ORDER BY patient_report_count {PRIORITIZE_BY_VALUES[PRIORITIZE_BY]}, report_length {PRIORITIZE_BY_VALUES[PRIORITIZE_BY]} 
        """

# get values from the database
df_reports = db.read_sql_query(query)
df_reports.head()

Unnamed: 0,report_id,patient_no,protocol_no,report_original,report_length,report_prompted,patient_report_count
0,4211,2005095794,22813555,RAPOR TARİHİ:16/07/2022 FİLM NO:\n\n Beyin B...,47,,1
1,2844,2004946777,21894588,RAPOR TARİHİ: 07/12/2021 FİLM NO: 12405087\n...,47,,1
2,915,2004336082,23924988,RAPOR TARİHİ:14/05/2023 FİLM NO:\n\nKontrast...,47,,1
3,3301,2005040837,22665007,RAPOR TARİHİ:27/06/2022 FİLM NO:\n\nBeyin ...,47,,1
4,4923,2005145142,22865336,RAPOR TARİHİ:30/07/2022 FİLM NO:\n\nBeyin BT...,47,,1


In [8]:
# get annotated reports 
query = """
            SELECT 
                DISTINCT data ->> 'patient_no' as patient_no
            FROM task
            WHERE is_labeled = TRUE
        """

# get values from the database
annotated_patient_nos = db.read_sql_query(query)["patient_no"].to_list()

In [9]:
# get tasks that have been prompted
query = """
            SELECT 
                report_id
            FROM annotation.upload_tasks
            WHERE report_prompted != '' 
        """

# get values from the database
upload_tasks_prompted = db.read_sql_query(query)["report_id"].to_list()

In [10]:
# use only non-prompted reports & non-annotated patients
df_upload_tasks = (
    df_reports.loc[~df_reports["patient_no"].isin(annotated_patient_nos)]
    .loc[~df_reports["report_id"].isin(upload_tasks_prompted)]
    .head(PROMPT_N_MORE_REPORTS)
)

### 2) Prompt Reports

In [14]:
cols_to_upsert = df_upload_tasks.columns.to_list()
cols_to_upsert.remove("report_id")
data_to_insert = []

for idx, (_, row) in enumerate(df_upload_tasks.iterrows()):
    try:
        data_to_insert.append(
            {
                "report_id": row["report_id"],
                "patient_no": row["patient_no"],
                "protocol_no": row["protocol_no"],
                "report_original": row["report_original"],
                "report_prompted": transformation.prompt_report(
                    report=row["report_original"], prompt=PROMPT
                ),
                "report_length": row["report_length"],
                "patient_report_count": row["patient_report_count"],
            }
        )

        db.upsert_values(reports_raw, data_to_insert, cols_to_upsert, ["report_id"])

        logger.info(f"{idx + 1}. report translated & uploaded")

        time.sleep(20)
    except Exception as error:
        # openai restriction: 3 RPM - 200 RPD
        logger.error(error)

logger.info(f"Finished prompting {len(data_to_insert)} reports")

2023-11-25 01:20:42,270 - AI Reports - INFO - 1 - A report translated & uploaded


### 3) Upload Tasks to Label Studio

In [None]:
# get reports directly from database
query = """
            SELECT
                report_id,
                patient_no,
                protocol_no,
                report_original,
                report_prompted as text,
                report_length,
                patient_report_count
            FROM
                annotation.upload_tasks
            WHERE
                report_id NOT IN (
                SELECT
                    (DATA ->> 'report_id')::INT AS report_id
                FROM
                    public.task)
                AND report_prompted != ''
        """

# get values from the database
df_upload_tasks = db.read_sql_query(query)

# output tasks as a csv file
output_path = (
    transformation.get_project_root() / "tmp" / "data" / "upload_tasks.csv"
)
df_upload_tasks.to_csv(output_path, index=False)

In [None]:
# upload tasks to label studio
label_studio.upload_csv_tasks(csv_path=output_path, project_id=7)

In [None]:
label_studio.stop_label_studio()