In [1]:
import pandas as pd
from common.model import Prompt
from common.classification import create_input_instances, create_batch_file, start_batch_job, retrieve_batch_results

ModuleNotFoundError: No module named 'pydantic'

## Load Data

In [None]:
# Load sampled queries
queries = pd.read_parquet("./data/inputs/sampled_queries.parquet")

# Load classification prompt
data_privilege_prompt = Prompt.load("./prompts/data_privilege/data_privilege_classification.json")

In [None]:
# Create input instances
input_instances = create_input_instances(queries["query"], [f"data_privilege-sample-{index}" for index, _ in queries.iterrows()])

In [None]:
# Create files for batch-jobs
batch_file_paths = ["./batch_jobs/data_privilege/data_privilege_batch_job_1.jsonl", "./batch_jobs/data_privilege/data_privilege_batch_job_2.jsonl", "./batch_jobs/data_privilege/data_privilege_batch_job_2.jsonl"]
for batch_file_path in batch_file_paths:
    create_batch_file(prompt=data_privilege_prompt, instances=input_instances, batch_file_path=batch_file_path)

In [None]:
# Start batch jobs
batch_jobs = [start_batch_job(batch_file_path) for batch_file_path in batch_file_paths]

In [None]:
# Initialize the structure of the classification results
from pydantic import BaseModel
from typing import Literal

class SchemaDependence(BaseModel):
    structural_reference_analysis: str
    structural_reference: bool
    value_reference_analysis: str
    value_reference: Literal["True", "Obscure", "False"]
    container_reference_analysis: str
    container_reference: bool

In [None]:
# Retrieve results from batch jobs; we have to wait until all jobs are completed
results = [retrieve_batch_results(batch_job, SchemaDependence) for batch_job in batch_jobs]

In [None]:
results_dfs = [pd.DataFrame({
        "id": result[0],
        "data_privilege_classification": result[1]
    }) for result in results]

In [None]:
for df in results_dfs:
    df["structural_reference"] = df["data_privilege_classification"].apply(lambda x: x.structural_reference if x is not None else None)
    df["structural_reference_analysis"] = df["data_privilege_classification"].apply(lambda x: x.structural_reference_analysis if x is not None else None)
    df["value_reference"] = df["data_privilege_classification"].apply(lambda x: x.value_reference if x is not None else None)
    df["value_reference_analysis"] = df["data_privilege_classification"].apply(lambda x: x.value_reference_analysis if x is not None else None)
    df["container_reference"] = df["data_privilege_classification"].apply(lambda x: x.container_reference if x is not None else None)
    df["container_reference_analysis"] = df["data_privilege_classification"].apply(lambda x: x.container_reference_analysis if x is not None else None)

In [None]:
from typing import List

def aggregate_results(dfs: List[pd.DataFrame], columns_to_aggregate: List[str]) -> pd.DataFrame:
    full_df = dfs[0][["id"]]
    for column in columns_to_aggregate:
        full_df[column] = [[r for r in res] for res in zip(*[df[column] for df in dfs])]
    return full_df

In [None]:
merged_results = aggregate_results(results_dfs, ["structural_reference", "structural_reference_analysis", "value_reference", "value_reference_analysis", "container_reference", "container_reference_analysis"])

In [None]:
merged_results_deviations = merged_results[(merged_results["structural_reference"].apply(lambda x: len(set(x))) != 1) | (merged_results["value_reference"].apply(lambda x: len(set(x))) != 1) | (merged_results["container_reference"].apply(lambda x: len(set(x))) != 1)]

queries_b4 = queries.merge(merged_results_deviations, on="id", how="inner")

b4_input_instances = create_input_instances(queries_b4["query"], queries_b4["id"])

b4_file_path = "./batch_jobs/data_privilege/data_privilege_batch_job_4.jsonl"
create_batch_file(prompt=data_privilege_prompt, instances=b4_input_instances, batch_file_path=b4_file_path)
b4 = start_batch_job(b4_file_path)

In [None]:
b4_results = retrieve_batch_results(b4, SchemaDependence)
b4_res_df = pd.DataFrame({
        "id": b4_results[0],
        "data_privilege_classification": b4_results[1]
    })

In [None]:
b4_merge = merged_results.merge(b4_res_df, on="id", how="inner")

b4_merge["structural_reference"] = b4_merge.apply(lambda row: row["structural_reference"] + [row["data_privilege_classification"].structural_reference], axis=1)
b4_merge["value_reference"] = b4_merge.apply(lambda row: row["value_reference"] + [row["data_privilege_classification"].value_reference], axis=1)
b4_merge["container_reference"] = b4_merge.apply(lambda row: row["container_reference"] + [row["data_privilege_classification"].container_reference], axis=1)

cols = ["structural_reference", "value_reference", "container_reference"]

# ensure unique ids in b4_merge (keep last if duplicates)
b4_indexed = b4_merge.drop_duplicates(subset="id", keep="last").set_index("id")

# index merged_results by id, update only the listed columns from b4_merge
merged_indexed = merged_results.set_index("id")
merged_indexed.update(b4_indexed[cols])

In [None]:
def voting(l):
    res = {}
    for item in l:
        res[item] = res.get(item, 0) + 1
    return res

def check_voting_majority(voting_dict, threshold):
    for key, count in voting_dict.items():
        if count >= threshold:
            return key
    return None

In [None]:
structural_votings = merged_indexed["structural_reference"].apply(lambda x: voting(x))
value_votings = merged_indexed["value_reference"].apply(lambda x: voting(x))
container_votings = merged_indexed["container_reference"].apply(lambda x: voting(x))
b5_mask = structural_votings.apply(lambda x: check_voting_majority(x, 3) is None) | value_votings.apply(lambda x: check_voting_majority(x, 3) is None) | container_votings.apply(lambda x: check_voting_majority(x, 3) is None)

b5_queries = queries.set_index("id")[b5_mask]

b5_input_instances = create_input_instances(b5_queries["query"], b5_queries.index)
b5_file_path = "./batch_jobs/data_privilege/data_privilege_batch_job_5.jsonl"
create_batch_file(prompt=data_privilege_prompt, instances=b5_input_instances, batch_file_path=b5_file_path)
b5 = start_batch_job(b5_file_path)

In [None]:
b5_results = retrieve_batch_results(b5, SchemaDependence)
b5_res_df = pd.DataFrame({
        "id": b5_results[0],
        "data_privilege_classification": b5_results[1]
    })
b5_merge = merged_indexed.merge(b5_res_df, on="id", how="inner")

In [None]:
b5_merge["structural_reference"] = b5_merge.apply(lambda row: row["structural_reference"] + [row["data_privilege_classification"].structural_reference], axis=1)
b5_merge["value_reference"] = b5_merge.apply(lambda row: row["value_reference"] + [row["data_privilege_classification"].value_reference], axis=1)
b5_merge["container_reference"] = b5_merge.apply(lambda row: row["container_reference"] + [row["data_privilege_classification"].container_reference], axis=1)

In [None]:
cols = ["structural_reference", "value_reference", "container_reference"]

# ensure unique ids in b4_merge (keep last if duplicates)
b5_indexed = b5_merge.drop_duplicates(subset="id", keep="last").set_index("id")

# index merged_results by id, update only the listed columns from b4_merge
# merged_indexed = merged_indexed.set_index("id")
merged_indexed.update(b5_indexed[cols])

In [None]:
merged_indexed["structural_reference_final"] = merged_indexed["structural_reference"].apply(lambda x: check_voting_majority(voting(x), 3))
merged_indexed["value_reference_final"] = merged_indexed["value_reference"].apply(lambda x: check_voting_majority(voting(x), 3))
merged_indexed["container_reference_final"] = merged_indexed["container_reference"].apply(lambda x: check_voting_majority(voting(x), 3))

In [None]:
results = queries.merge(merged_indexed, left_on="id", right_index=True, how="inner")

In [None]:
results.to_parquet("./data/outputs/data_privilege/data_privilege_classification.parquet", index=False)