In [5]:
from ToM import classify_hypothesis_n_agents, get_dataset
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask import delayed, compute

In [7]:
from dask_jobqueue import SGECluster
from dask.distributed import Client
import os

n_workers = 100

cluster = SGECluster(
        cores=1,
        memory="1GB",
        processes=1,
        queue="short.q",
        job_extra_directives=["-t 1-100"],
        log_directory=os.path.join(os.environ["HOME"], "logs/"),
        local_directory=os.path.join(os.environ["HOME"], "dask-worker-space/"),
        walltime="01:59:00",
        name="cis7000-{$SGE_TASK_ID}",
    )
client = Client(cluster)
cluster.scale(n=n_workers)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 34357 instead


In [5]:
# set seed
np.random.seed(42)

# try:
#     # Load file if exists
#     df_results = pd.read_excel("data/results_ToM.csv")
# except:
#     df_results = pd.DataFrame(columns=["premise", "hypothesis", "label", "predicted", "result"])

df_results = pd.DataFrame(columns=["premise", "hypothesis", "label", "predicted", "result"])

# Config
few_shot = [1, 4, 8]  # [1, 4, 8, 16, 32]
agents = [1, 3, 5]  # [1, 3, 5, 9]

# The batch size of new samples
n_test_samples = 10

# Add new results
# new_samples = ToM.get_n_new_premises(new_n_samples, df_results)

@delayed
def process_sample(row) -> dict:
    sample_premise = row["premise"]
    sample_hypothesis = row["hypothesis"]
    sample_label = row["label"]
    shot = row["few_shot"]
    agent = row["n_agents"]
    
    # Get the prediction with combination of agents and few shot examples
    predicted, result = classify_hypothesis_n_agents(sample_hypothesis, sample_premise, sample_label, agent, shot)
    new_row = {
        "sample_id": row["sample_id"],
        "premise": sample_premise,
        "hypothesis": sample_hypothesis,
        "n_agents": agent,
        "m_examples": shot,
        "label": sample_label,
        "predicted": predicted,
        "result": result,
    }
    print(f"Sample ID: {row['sample_id']} - Few shot examples: {shot} - Agents: {agent} - Predicted: {predicted} - Result: {result}")
    return new_row


test_data = get_dataset(split="test")
test_samples = test_data.sample(n_test_samples)
# Create a product of all combinations
combinations = pd.MultiIndex.from_product([
    test_samples.index,
    few_shot,
    agents
], names=['sample_id', 'few_shot', 'n_agents'])

# Create expanded DataFrame with all combinations
expanded_df = pd.DataFrame(index=combinations).reset_index()

# Merge with original samples
final_df = expanded_df.merge(
    test_samples, 
    left_on='sample_id', 
    right_index=True
)

dask_df = dd.from_pandas(final_df, npartitions=n_workers)  # Adjust the number of partitions as needed

delayed_results = [process_sample(row) for _, row in dask_df.iterrows()]
results = compute(*delayed_results)


NameError: name 'n_workers' is not defined

In [None]:

# Flatten the list of results and create a DataFrame
all_results = [item for sublist in results for item in sublist]
df_results = pd.concat([df_results, pd.DataFrame(all_results)], ignore_index=True)

# Debugging: Check if df_results is empty
if df_results.empty:
    print("Warning: df_results is empty. No data to save.")
else:
    # Save results with error handling
    try:
        df_results.to_csv("data/results_ToM.csv", index=False)
        print("Results saved!")
    except Exception as e:
        print(f"Error saving results: {e}")
