In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster # Optional: for Dask dashboard
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import time # To time the Dask computation
import logging # Optional: Configure logging if src modules use it
import sys
from pathlib import Path

In [2]:
project_root = Path.cwd().parent

if str(project_root) not in sys.path:
    sys.path.append(str(project_root))
    print(f"Added project root to sys.path: {project_root}")
else:
    print("Project root already in sys.path.")

current_dir = Path.cwd()
print(f"Initial CWD: {current_dir}")
if current_dir.name == 'notebooks':
    BASE_DIR = current_dir.parent
    print(f"Adjusted BASE_DIR to parent (project root): {BASE_DIR}")
# Check if the current directory *already* looks like the project root
elif (current_dir / 'src').is_dir() and (current_dir / 'notebooks').is_dir() and (current_dir / 'data').is_dir():
    BASE_DIR = current_dir
    print(f"CWD appears to be project root: {BASE_DIR}")
else:
    # Fallback or Warning - might need manual adjustment if structure is unexpected
    BASE_DIR = current_dir
    print(f"WARNING: Could not reliably determine project root relative to CWD.")
    print(f"Using CWD as BASE_DIR: {BASE_DIR}. Check if paths below are correct.")
    # You might need to adjust manually here if the above checks fail, e.g.
    # BASE_DIR = Path('/absolute/path/to/your/Dataviz-VAST2022')


Added project root to sys.path: /Users/kozy/_projects/Dataviz-VAST2022
Initial CWD: /Users/kozy/_projects/Dataviz-VAST2022/notebooks
Adjusted BASE_DIR to parent (project root): /Users/kozy/_projects/Dataviz-VAST2022


In [3]:
try:
    from src.data.load_data import load_participants
    from src.features.build_features import add_financial_group
    from src.visualization.visualize import plot_balance_distribution
    print("Successfully imported functions from src.")
except ImportError as e:
    print(f"Error importing from src: {e}")
    print("Ensure the project root directory is in your Python path or adjust sys.path.")

Successfully imported functions from src.


In [4]:
# --- Define other paths relative to the determined BASE_DIR ---
DATA_DIR = BASE_DIR / 'data'
RAW_DATA_DIR = DATA_DIR / 'raw' # Use 'raw' based on your tree structure

LOGS_DIR = RAW_DATA_DIR / 'Activity Logs'
ATTR_DIR = RAW_DATA_DIR / 'Attributes'
JOURNAL_DIR = RAW_DATA_DIR / 'Journals'

PRIMARY_DATA_DIR = DATA_DIR / '03_primary'
REPORTS_DIR = BASE_DIR / 'reports' / 'figures'

# Create output directories if they don't exist
PRIMARY_DATA_DIR.mkdir(parents=True, exist_ok=True)
REPORTS_DIR.mkdir(parents=True, exist_ok=True)

# --- Verification ---
print(f"\nUsing Project Root (BASE_DIR): {BASE_DIR.resolve()}")
print(f"Raw Data Directory: {RAW_DATA_DIR}")
print(f"Looking for Participants file at: {ATTR_DIR / 'Participants.csv'}")
print(f"Does Participants file exist? {(ATTR_DIR / 'Participants.csv').exists()}") # Verify check
print(f"Does Logs directory exist? {LOGS_DIR.exists()}")
print(f"Does Journals directory exist? {JOURNAL_DIR.exists()}")


Using Project Root (BASE_DIR): /Users/kozy/_projects/Dataviz-VAST2022
Raw Data Directory: /Users/kozy/_projects/Dataviz-VAST2022/data/raw
Looking for Participants file at: /Users/kozy/_projects/Dataviz-VAST2022/data/raw/Attributes/Participants.csv
Does Participants file exist? True
Does Logs directory exist? True
Does Journals directory exist? True


In [5]:
cluster = LocalCluster(n_workers=4, threads_per_worker=2, memory_limit='4GB') # Example config
client = Client(cluster)
print("Dask Client Setup Complete.")
print(f"Dask dashboard link: {client.dashboard_link}")

Dask Client Setup Complete.
Dask dashboard link: http://127.0.0.1:8787/status


In [6]:
participants_df = load_participants(ATTR_DIR)

if participants_df.empty:
    print("ERROR: Participants DataFrame is empty. Cannot proceed.")
    # Stop execution or raise error if critical
else:
    print("\nParticipants Data Sample:")
    display(participants_df.head())
    participants_df.info()


2025-04-17 20:01:05,942 - INFO - load_data - Attempting to load participants file: /Users/kozy/_projects/Dataviz-VAST2022/data/raw/Attributes/Participants.csv
2025-04-17 20:01:05,946 - INFO - load_data - Successfully loaded and performed basic checks on Participants data. Shape: (1011, 7)



Participants Data Sample:


Unnamed: 0,participantId,householdSize,haveKids,age,educationLevel,interestGroup,joviality
0,0,3,True,36,HighSchoolOrCollege,H,0.001627
1,1,3,True,25,HighSchoolOrCollege,B,0.328087
2,2,3,True,35,HighSchoolOrCollege,A,0.39347
3,3,3,True,21,HighSchoolOrCollege,I,0.138063
4,4,3,True,43,Bachelors,H,0.857397


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1011 entries, 0 to 1010
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   participantId   1011 non-null   Int64  
 1   householdSize   1011 non-null   int64  
 2   haveKids        1011 non-null   bool   
 3   age             1011 non-null   int64  
 4   educationLevel  1011 non-null   object 
 5   interestGroup   1011 non-null   object 
 6   joviality       1011 non-null   float64
dtypes: Int64(1), bool(1), float64(1), int64(2), object(2)
memory usage: 49.5+ KB


In [7]:
# 5. Load Logs Data
log_dtypes = {
    'participantId': 'Int64',
    'apartmentId': 'Int64',
    'jobId': 'Int64', # Dask might handle N/A better with object, or use float64 if appropriate
    'availableBalance': 'float64',
    'dailyFoodBudget': 'float64',
    'weeklyExtraBudget': 'float64',
    'currentMode': 'category', # Use category for repeated strings
    'hungerStatus': 'category',
    'sleepStatus': 'category',
    'financialStatus': 'category'
    # 'currentLocation' will be object (string)
}

print("Defining Dask DataFrame for all logs...")

Defining Dask DataFrame for all logs...


In [8]:
# The '*' acts as a wildcard to match all log files
logs_dd = dd.read_csv(
    LOGS_DIR / 'ParticipantStatusLogs*.csv',
    dtype=log_dtypes,
    parse_dates=['timestamp'],
    on_bad_lines='warn' # or 'skip' or 'error'
)

# Check the divisions (how Dask partitioned the data) and dtypes
print(f"\nNumber of partitions: {logs_dd.npartitions}")
print("\nDask DataFrame dtypes:")
print(logs_dd.dtypes)

# Display a sample (computes only the first few rows of the first partition)
print("\nSample from Dask DataFrame head:")
display(logs_dd.head())


Number of partitions: 214

Dask DataFrame dtypes:
timestamp            datetime64[ns, UTC]
currentLocation          string[pyarrow]
participantId                      Int64
currentMode                     category
hungerStatus                    category
sleepStatus                     category
apartmentId                        Int64
availableBalance                 float64
jobId                              Int64
financialStatus                 category
dailyFoodBudget                  float64
weeklyExtraBudget                float64
dtype: object

Sample from Dask DataFrame head:


Unnamed: 0,timestamp,currentLocation,participantId,currentMode,hungerStatus,sleepStatus,apartmentId,availableBalance,jobId,financialStatus,dailyFoodBudget,weeklyExtraBudget
0,2022-03-01 00:00:00+00:00,POINT (-2724.6277665310454 6866.2081834436985),0,AtHome,JustAte,Sleeping,926,1286.519556,254,Stable,12.0,1104.30257
1,2022-03-01 00:00:00+00:00,POINT (-1526.9372331431534 5582.2951345645315),1,AtHome,JustAte,Sleeping,928,860.574204,929,Stable,12.0,926.714377
2,2022-03-01 00:00:00+00:00,POINT (-1360.9905987829304 2108.804385379679),2,AtHome,JustAte,Sleeping,291,1298.184541,348,Stable,16.0,848.802876
3,2022-03-01 00:00:00+00:00,POINT (-1558.517200825967 5600.664347152427),3,AtHome,JustAte,Sleeping,1243,1180.641725,316,Stable,12.0,819.325405
4,2022-03-01 00:00:00+00:00,POINT (976.2409614204214 4574.575079082071),4,AtHome,JustAte,Sleeping,194,-681.650588,177,Unstable,20.0,0.0


In [9]:
# 6. Calculate Average Balance (Dask Computation)


In [10]:
print("Calculating average balance across all logs using Dask...")
start_time = time.time()

# Define the computation graph
avg_balance_dd = logs_dd.groupby('participantId')['availableBalance'].mean()

# Trigger the computation
# The result will be a Pandas Series indexed by participantId
avg_balance_series = avg_balance_dd.compute()

end_time = time.time()
print(f"\nDask computation finished in {end_time - start_time:.2f} seconds.")
print(f"Calculated average balance for {len(avg_balance_series)} participants.")

# Display sample results
print("\nSample of computed average balances (Pandas Series):")
display(avg_balance_series.head())
print("\nBasic stats of computed average balances:")
display(avg_balance_series.describe())

Calculating average balance across all logs using Dask...





Dask computation finished in 92.80 seconds.
Calculated average balance for 1012 participants.

Sample of computed average balances (Pandas Series):


participantId
3     41213.248470
7     29183.684991
46    32376.352117
52     5570.113454
60    25210.880962
Name: availableBalance, dtype: float64


Basic stats of computed average balances:


count      1011.000000
mean      18842.000909
std       17701.701581
min         921.999578
25%        5898.556498
50%       14310.464940
75%       25759.165685
max      120775.235440
Name: availableBalance, dtype: float64

In [11]:
# 7. Combine and Prepare Summary Data (Pandas)
print("Merging computed balances with participant data...")
# Convert the Pandas Series to a DataFrame
avg_balance_df = avg_balance_series.reset_index()
avg_balance_df = avg_balance_df.rename(columns={'availableBalance': 'averageBalance'}) # Use the computed column name

# Merge with participants_df
if not participants_df.empty:
    participant_summary_df = pd.merge(
        participants_df,
        avg_balance_df,
        on='participantId',
        how='left' # Keep all participants, assign NaN avg balance if not in logs
    )
    print(f"Created participant_summary_df with shape: {participant_summary_df.shape}")
    print("Checking for participants missing from logs (NaN averageBalance):")
    print(participant_summary_df['averageBalance'].isnull().sum())
    display(participant_summary_df.head())
else:
    print("ERROR: participants_df is empty, cannot merge.")
    participant_summary_df = pd.DataFrame() # Avoid error later


Merging computed balances with participant data...
Created participant_summary_df with shape: (1011, 8)
Checking for participants missing from logs (NaN averageBalance):
0


Unnamed: 0,participantId,householdSize,haveKids,age,educationLevel,interestGroup,joviality,averageBalance
0,0,3,True,36,HighSchoolOrCollege,H,0.001627,58665.701153
1,1,3,True,25,HighSchoolOrCollege,B,0.328087,48739.465046
2,2,3,True,35,HighSchoolOrCollege,A,0.39347,43597.414323
3,3,3,True,21,HighSchoolOrCollege,I,0.138063,41213.24847
4,4,3,True,43,Bachelors,H,0.857397,50405.268696


In [12]:
# 8. Visualize Distribution (Initial Viz - Problem Finding)
if not participant_summary_df.empty:
    print("Plotting distribution of average balances...")
    figure_path = REPORTS_DIR / 'initial_avg_balance_histogram_full_data.png'
    # Call the visualization function
    plot_balance_distribution(
        participants_summary_df=participant_summary_df,
        save_path=figure_path
    )
    print(f"Histogram saved to {figure_path}")
    # Display the plot if needed (function might close it)
    # from IPython.display import Image
    # display(Image(filename=figure_path))
else:
    print("Skipping visualization as participant_summary_df is empty.")


2025-04-17 20:02:40,734 - INFO - visualize - Plotting average balance distribution and attempting to save to /Users/kozy/_projects/Dataviz-VAST2022/reports/figures/initial_avg_balance_histogram_full_data.png
  with pd.option_context('mode.use_inf_as_na', True):


Plotting distribution of average balances...


2025-04-17 20:02:40,994 - INFO - visualize - Saved balance distribution plot to /Users/kozy/_projects/Dataviz-VAST2022/reports/figures/initial_avg_balance_histogram_full_data.png


Histogram saved to /Users/kozy/_projects/Dataviz-VAST2022/reports/figures/initial_avg_balance_histogram_full_data.png


In [14]:
# 9. Analyze Histogram & Define Financial Group
FINANCIAL_THRESHOLD = 50.0 # <<< ADJUST THIS VALUE based on your plot analysis!
print(f"Using financial threshold for 'Low Balance': {FINANCIAL_THRESHOLD}")

# Add the financial group column using the MODIFIED helper function
if not participant_summary_df.empty:
    # --- MODIFIED CALL ---
    # Pass only the summary dataframe and the threshold
    participant_summary_df = add_financial_group(
        summary_df=participant_summary_df,
        threshold=FINANCIAL_THRESHOLD
    )
    print("\nParticipant summary with financial groups:")
    display(participant_summary_df.head())
    print("\nCounts per financial group:")
    print(participant_summary_df['financialGroup'].value_counts())
else:
     print("Skipping group assignment as participant_summary_df is empty.")


2025-04-17 20:03:29,047 - INFO - build_features - Assigning financial group based on threshold: 50.0
2025-04-17 20:03:29,050 - INFO - build_features - Financial group counts:
financialGroup
Other    1011
Name: count, dtype: int64


Using financial threshold for 'Low Balance': 50.0

Participant summary with financial groups:


Unnamed: 0,participantId,householdSize,haveKids,age,educationLevel,interestGroup,joviality,averageBalance,financialGroup
0,0,3,True,36,HighSchoolOrCollege,H,0.001627,58665.701153,Other
1,1,3,True,25,HighSchoolOrCollege,B,0.328087,48739.465046,Other
2,2,3,True,35,HighSchoolOrCollege,A,0.39347,43597.414323,Other
3,3,3,True,21,HighSchoolOrCollege,I,0.138063,41213.24847,Other
4,4,3,True,43,Bachelors,H,0.857397,50405.268696,Other



Counts per financial group:
financialGroup
Other    1011
Name: count, dtype: int64


In [15]:
import pyarrow.parquet as pq

In [16]:
# 10. Save Primary Data
# Save the enriched participant summary DataFrame. This is the key input for the next stage (targeted analysis and visualization). Use Parquet for efficiency.
if not participant_summary_df.empty:
    primary_data_path = PRIMARY_DATA_DIR / 'participant_summary_full.parquet' # Use parquet
    try:
        participant_summary_df.to_parquet(primary_data_path, index=False)
        print(f"Successfully saved primary data to {primary_data_path}")
    except Exception as e:
        print(f"ERROR saving primary data: {e}")
else:
    print("Skipping saving primary data as the DataFrame is empty.")


Successfully saved primary data to /Users/kozy/_projects/Dataviz-VAST2022/data/03_primary/participant_summary_full.parquet


# 11. Problem Identification & Next Steps

 **Summary of Findings:**
 - We successfully calculated the average `availableBalance` per participant using the entire dataset via Dask.
 - The histogram (saved in `reports/figures/`) shows the distribution of these average balances.
 - **Based on the histogram, we observed [DESCRIBE OBSERVATION - e.g., a significant number of participants below $X, a bimodal distribution, etc.].**
 - We defined a 'Low Balance' group using a threshold of `$FINANCIAL_THRESHOLD`. Approximately [X]% of participants fall into this group.

 **Refined Question:**
 How do the daily spatial, temporal, and financial patterns (spending, income timing, activity locations, time allocation) of the identified 'Low Balance' group differ from the 'Other' group over the full 15-month study period?

 **Next Steps:**
 - Use the saved `participant_summary_full.parquet` file.
 - Develop targeted visualizations (Notebook 04 or scripts in `src/visualization/`) comparing the 'Low Balance' and 'Other' groups. This will likely involve loading log/journal data again (potentially using Dask or targeted loading) for participants in these specific groups.


In [17]:
print("Shutting down Dask client and cluster...")
try:
    client.close()
    cluster.close()
    print("Dask client and cluster closed.")
except NameError:
    print("Dask client/cluster was not initialized or already closed.")
except Exception as e:
    print(f"Error closing Dask client/cluster: {e}")

Shutting down Dask client and cluster...
Dask client and cluster closed.
