# Download and enrich CSV data

This notebook downloads journal CSVs from blob storage using Expidite `cloud_connector`, aggregates them into one DataFrame, and labels behaviour.

@@@ TODO: we should add a CSV file that contains the experimental data and then we can automatically enrich the raw data from the devices with this extra info to make analysis easy.

In [5]:
from pathlib import Path

import pandas as pd

from expidite_rpi.core import configuration as root_cfg
from expidite_rpi.core.cloud_connector import CloudConnector

In [6]:
# Required config
AZURE_KEYS_FILE = Path.home() / ".expidite" / "choice_assay_keys.env"

CONTAINER_NAME = "expidite-journals"
TYPE_ID = "CAPOSE"

# Local download directory (relative to notebook working directory)
DOWNLOAD_DIR = Path("./downloads")
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)

PREFIX = f"V3_{TYPE_ID}"
SUFFIX = ".csv"

print(f"Downloading {PREFIX} files from '{CONTAINER_NAME}' to {DOWNLOAD_DIR.resolve()}")

Downloading V3_CAPOSE files from 'expidite-journals' to C:\Users\bee-ops\code\Choice-assay\src\choice_assay\etl\downloads


In [7]:
cc = CloudConnector.get_instance(root_cfg.CloudType.AZURE)
cc.set_keys(AZURE_KEYS_FILE)
files = cc.list_cloud_files(CONTAINER_NAME, prefix=PREFIX, suffix=SUFFIX)

print(f"Found {len(files)} matching CSV files in blobstore")
if files:
    cc.download_container(src_container=CONTAINER_NAME,
                          dst_dir=DOWNLOAD_DIR,
                          files=files)
    print(f"Downloaded {len(files)} files to {DOWNLOAD_DIR.resolve()}")
else:
    print("No matching files to download.")

ValueError: Keys file C:\Users\bee-ops\.expidite\choice_assay_keys.env does not exist

In [None]:
csv_paths = sorted(DOWNLOAD_DIR.rglob("*.csv"))
print(f"CSV files available locally: {len(csv_paths)}")

df_list = []
for csv_path in csv_paths:
    df = pd.read_csv(csv_path)
    if not df.empty:
        df["source_file"] = csv_path.name
        df_list.append(df)

if df_list:
    aggregated_df = pd.concat(df_list, ignore_index=True)
else:
    aggregated_df = pd.DataFrame()

print(f"Aggregated rows: {len(aggregated_df)}")
aggregated_df.head()

In [None]:
# Data validation before behaviour classification
required_columns = ['Tube_prob_likelihood', 'End_prob_likelihood']
missing_columns = [col for col in required_columns if col not in aggregated_df.columns]

if aggregated_df.empty:
    print('Validation: aggregated_df is empty.')
elif missing_columns:
    raise KeyError(f"Validation failed. Missing required columns: {missing_columns}")
else:
    for col in required_columns:
        aggregated_df[col] = pd.to_numeric(aggregated_df[col], errors='coerce')

    invalid_rows = aggregated_df[required_columns].isna().any(axis=1).sum()
    print(f'Validation: {len(aggregated_df)} total rows')
    print(f'Validation: {invalid_rows} rows have invalid/missing likelihood values')

    if invalid_rows:
        display(aggregated_df.loc[aggregated_df[required_columns].isna().any(axis=1), required_columns + ['source_file']].head())

In [None]:
# Define the function to classify behavior
def get_behaviour(row):
    behaviour = 'No_prob'
    if (row['Tube_prob_likelihood'] >= 0.6) & (row['End_prob_likelihood'] >= 0.6):
        behaviour = "Drinking"
    elif (row['Tube_prob_likelihood'] >= 0.6) ^ (row['End_prob_likelihood'] >= 0.6):
        behaviour = "Prob_out"
    return behaviour

required_columns = ['Tube_prob_likelihood', 'End_prob_likelihood']
missing_columns = [col for col in required_columns if col not in aggregated_df.columns]

if aggregated_df.empty:
    print('No data loaded; behaviour classification skipped.')
elif missing_columns:
    raise KeyError(f'Missing required columns for behaviour classification: {missing_columns}')
else:
    before_count = len(aggregated_df)
    clean_df = aggregated_df.dropna(subset=required_columns).copy()
    dropped_count = before_count - len(clean_df)

    if dropped_count:
        print(f'Dropped {dropped_count} rows with invalid/missing likelihood values before classification.')

    clean_df['Behaviour'] = clean_df.apply(get_behaviour, axis=1)
    aggregated_df = clean_df
    print(aggregated_df['Behaviour'].value_counts(dropna=False))

aggregated_df.head()

In [None]:
output_path = DOWNLOAD_DIR / 'aggregated_journals_with_behaviour.csv'
if not aggregated_df.empty:
    aggregated_df.to_csv(output_path, index=False)
    print(f"Saved aggregated dataset to: {output_path.resolve()}")
else:
    print('Aggregated dataframe is empty; no output written.')