In [2]:
!pip install pyspark pandas numpy matplotlib seaborn duckdb


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.3[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


# Duolingo Datathon - EDA with PySpark
## DatasetC: Sleeping, Recovering Bandit Algorithm for Notifications
Working with training dataset (~87M rows, 4GB extracted)

In [3]:
# Import libraries
import os
import glob
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Suppress warnings
import warnings
warnings.filterwarnings('ignore')

In [4]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("duolingo_datathon") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Driver Memory: {spark.sparkContext.getConf().get('spark.driver.memory')}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/19 15:09:10 WARN Utils: Your hostname, codespaces-80ea76, resolves to a loopback address: 127.0.0.1; using 10.0.0.39 instead (on interface eth0)
26/02/19 15:09:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/19 15:09:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 4.1.1
Driver Memory: 4g


In [5]:
# Find all training directories (not individual parquet files)
train_dir = "/workspaces/datathon-duolingo"
train_parts = sorted([d for d in glob.glob(f"{train_dir}/train-part-*") if os.path.isdir(d)])
print(f"Found {len(train_parts)} training data partitions:")
for tp in train_parts:
    print(f"  - {tp}")

Found 3 training data partitions:
  - /workspaces/datathon-duolingo/train-part-1
  - /workspaces/datathon-duolingo/train-part-2
  - /workspaces/datathon-duolingo/train-part-3


In [6]:
# Quick test: read one parquet file with DuckDB to verify integrity
import duckdb
import glob

test_file = glob.glob("/workspaces/datathon-duolingo/train-part-1/*.parquet")[0]
print(f"Testing parquet: {test_file}")

try:
    conn = duckdb.connect()
    result = conn.execute(f"SELECT * FROM read_parquet('{test_file}') LIMIT 3").df()
    print(f"\n✓ Successfully read! Shape: {result.shape}")
    print(f"\nColumns: {result.columns.tolist()}")
    print(f"\nFirst 3 rows:")
    print(result)
except Exception as e:
    print(f"✗ Error: {type(e).__name__}: {e}")
    import traceback
    traceback.print_exc()

Testing parquet: /workspaces/datathon-duolingo/train-part-1/part-00000-9b4bba6b-feac-44b1-a155-17c796835cca-c000.snappy.parquet

✓ Successfully read! Shape: (3, 6)

Columns: ['datetime', 'ui_language', 'eligible_templates', 'history', 'selected_template', 'session_end_completed']

First 3 rows:
   datetime ui_language              eligible_templates  \
0  0.153461          en  [G, E, B, A, K, H, J, L, F, D]   
1  2.827303          es  [G, E, B, A, K, H, J, L, F, D]   
2  2.792662          en     [G, E, B, K, H, J, L, F, D]   

                                             history selected_template  \
0  [{'template': 'A', 'n_days': 28.19564819335937...                 B   
1  [{'template': 'A', 'n_days': 29.836181640625},...                 A   
2  [{'template': 'G', 'n_days': 8.197543144226074...                 J   

   session_end_completed  
0                  False  
1                   True  
2                   True  


In [7]:
# Load ALL 3 partitions with DuckDB (memory-efficient queries)
import duckdb
import glob

# Find all parquet files
parquet_files = sorted(glob.glob("/workspaces/datathon-duolingo/train-part-*/part-*.parquet"))
print(f"Found {len(parquet_files)} parquet files total")
print("Files:")
for f in parquet_files:
    print(f"  - {f}")

try:
    # Create DuckDB connection
    conn = duckdb.connect()
    
    # Get total row count across ALL partitions (fast - reads metadata only)
    total_rows = conn.execute(f"""
        SELECT COUNT(*) as total_rows 
        FROM read_parquet({parquet_files})
    """).fetchone()[0]
    
    print(f"\n✓ Total rows across all partitions: {total_rows:,}")
    
    # Option 1: Load a SAMPLE from all partitions (memory-safe)
    # This takes 10,000 rows from EACH file = 30,000 total
    print("\nLoading sample (10K rows per partition = 30K total)...")
    df_train = conn.execute(f"""
        SELECT * FROM read_parquet({parquet_files})
        USING SAMPLE 30000
    """).df()
    
    print(f"✓ Loaded sample: {len(df_train):,} rows")
    print(f"\nColumns: {df_train.columns.tolist()}")
    print(f"\nData types:\n{df_train.dtypes}")
    print(f"\nFirst 5 rows:")
    print(df_train.head())
    
    # NOTE: To load FULL dataset (all 87M rows), replace above query with:
    # df_train = conn.execute(f"SELECT * FROM read_parquet({parquet_files})").df()
    # WARNING: This uses ~2-4GB RAM and might crash kernel
    
except Exception as e:
    print(f"✗ Error: {type(e).__name__}: {e}")
    import traceback
    traceback.print_exc()

Found 3 parquet files total
Files:
  - /workspaces/datathon-duolingo/train-part-1/part-00000-9b4bba6b-feac-44b1-a155-17c796835cca-c000.snappy.parquet
  - /workspaces/datathon-duolingo/train-part-2/part-00001-9b4bba6b-feac-44b1-a155-17c796835cca-c000.snappy.parquet
  - /workspaces/datathon-duolingo/train-part-3/part-00002-9b4bba6b-feac-44b1-a155-17c796835cca-c000.snappy.parquet

✓ Total rows across all partitions: 87,665,839

Loading sample (10K rows per partition = 30K total)...


✓ Loaded sample: 30,000 rows

Columns: ['datetime', 'ui_language', 'eligible_templates', 'history', 'selected_template', 'session_end_completed']

Data types:
datetime                 float64
ui_language                  str
eligible_templates        object
history                   object
selected_template            str
session_end_completed       bool
dtype: object

First 5 rows:
    datetime ui_language              eligible_templates  \
0   5.673044          es  [G, E, B, A, K, H, J, L, F, D]   
1   3.281991          en     [G, E, B, K, H, J, L, F, D]   
2  12.006505          en  [K, H, G, E, B, J, L, F, D, A]   
3  13.067569          vi     [K, H, G, E, B, J, L, F, D]   
4   1.970174          en     [G, E, B, K, H, J, L, F, D]   

                                             history selected_template  \
0                                                 []                 B   
1  [{'template': 'F', 'n_days': 1.999998569488525...                 K   
2                              

In [None]:
# Basic statistics (pandas-based)
print("=== DATA PROFILING ===\n")

# Row count and memory usage
print(f"Total rows in sample: {len(df_train):,}")
print(f"Number of columns: {len(df_train.columns)}")
print(f"Memory usage: {df_train.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Null values
print("\nMissing values:")
print(df_train.isnull().sum())

# Column data types
print("\nData types:")
print(df_train.dtypes)


# Basic statistics for numeric columnsprint(df_train.describe())
print("\nNumeric column statistics:")

In [None]:
# Target variable analysis (pandas-based)
print("=== TARGET VARIABLE ANALYSIS ===\n")

# Session completion rate
completion_counts = df_train['session_end_completed'].value_counts()
print("Session completion distribution:")
print(completion_counts)

completion_pct = (completion_counts / completion_counts.sum() * 100)
print(f"\nCompletion rate (True): {completion_pct.get(True, 0):.2f}%")
print(f"Non-completion rate (False): {completion_pct.get(False, 0):.2f}%")

# Visualize
plt.figure(figsize=(8, 5))
completion_counts.plot(kind='bar', color=['#fc8d62', '#66c2a5'])
plt.title('Session Completion Distribution (Sample)')
plt.xlabel('Session Completed (2h window)')
plt.ylabel('Count')
plt.xticks(rotation=0)
plt.tight_layout()
plt.show()