In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Load Content Activity with manual repair logic
temp_cols = [f'col_{i}' for i in range(10)]
df_raw = pd.read_csv('content_activity.csv', names=temp_cols, header=None,
                     skiprows=1, index_col=False, on_bad_lines='skip', engine='python')

df_act = pd.DataFrame()
df_act['user_id'] = df_raw['col_0'].astype(str).str.strip() # Clean user_id
df_act['activity_timestamp'] = pd.to_datetime(df_raw['col_1'], errors='coerce')
df_act['ip_address'] = df_raw['col_2']

# 2. Load other datasets normally
df_exams = pd.read_csv('exam_sessions.csv', on_bad_lines='skip')
df_exams['user_id'] = df_exams['user_id'].astype(str).str.strip()
df_exams['started_at'] = pd.to_datetime(df_exams['started_at'], errors='coerce')

df_plans = pd.read_csv('study_plans.csv', on_bad_lines='skip')
df_plans['user_id'] = df_plans['user_id'].astype(str).str.strip()
df_plans['target_exam_date'] = pd.to_datetime(df_plans['target_exam_date'], errors='coerce')

# Drop unparseable dates to prevent .dt accessor errors
df_act = df_act.dropna(subset=['activity_timestamp'])

print(f"Content Activity: {df_act.shape[0]} rows")
print(f"Exam Sessions: {df_exams.shape[0]} rows")
print(f"Study Plans: {df_plans.shape[0]} rows")

Content Activity: 1350964 rows
Exam Sessions: 386920 rows
Study Plans: 188682 rows


In [17]:
# 1. Post-Exam Activity (Zombies)
# Merge using cleaned user_ids
df_merged_act = df_act.merge(df_plans[['user_id', 'target_exam_date']], on='user_id', how='inner')

if df_merged_act.empty:
    print("Warning: df_merged_act is empty. Check if user_ids match in both files.")
else:
    # Use normalized dates for comparison
    df_merged_act['is_post_exam'] = (
        df_merged_act['activity_timestamp'].dt.normalize() >
        df_merged_act['target_exam_date'].dt.normalize()
    )

    activity_analysis = df_merged_act[df_merged_act['is_post_exam']].groupby('user_id').agg(
        activity_days_after_exam=('activity_timestamp', lambda x: x.dt.normalize().nunique())
    ).reset_index()

# 2. Performance Volatility (Standard Deviation and Jumps)
df_exams = df_exams.sort_values(['user_id', 'started_at'])
perf_stats = df_exams.groupby('user_id')['score'].agg(
    score_std='std',
    score_mean='mean'
).reset_index()

# Calculate score_jump
df_exams['prev_score'] = df_exams.groupby('user_id')['score'].shift(1)
df_exams['score_jump'] = (df_exams['score'] - df_exams['prev_score']).abs()
max_jumps = df_exams.groupby('user_id')['score_jump'].max().reset_index()
max_jumps.rename(columns={'score_jump': 'max_score_jump'}, inplace=True)

# 3. Final Consolidation
risk_report = df_plans[['user_id', 'target_exam_date']].merge(activity_analysis, on='user_id', how='left')
risk_report = risk_report.merge(perf_stats, on='user_id', how='left')
risk_report = risk_report.merge(max_jumps, on='user_id', how='left').fillna(0)

# Flagging
risk_report['is_suspicious'] = (
    (risk_report['activity_days_after_exam'] > 60) |
    (risk_report['score_std'] > .25) |
    (risk_report['max_score_jump'] > .50)
)

In [19]:
print("--- [ SUSPICIOUS USERS ] ---")
flagged = risk_report[risk_report['is_suspicious']].sort_values(
    by=['max_score_jump', 'activity_days_after_exam'], ascending=False
)

if not flagged.empty:
    cols_to_print = ['user_id', 'activity_days_after_exam', 'score_std', 'max_score_jump']
    print(flagged[cols_to_print].to_string(index=False))
else:
    print("No users flagged. Check if data join logic is capturing study activity.")

print(f"\nTotal users analyzed: {len(risk_report)}")

--- [ SUSPICIOUS USERS ] ---
                             user_id  activity_days_after_exam  score_std  max_score_jump
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.252628        1.000000
8f8305d7-478a-4d64-831e-be9b0abaf082                      24.0   0.2526

In [20]:
def check_pipeline_health(df_act, df_exams, df_plans, risk_report, df_merged_act):
    print("="*60)
    print(f"{'DATA PIPELINE DIAGNOSTIC':^60}")
    print("="*60)

    # 1. Join Success Rate
    total_users = df_plans['user_id'].nunique()
    users_with_activity = df_merged_act['user_id'].nunique()
    join_pct = (users_with_activity / total_users) * 100

    print(f"1. JOIN SUCCESS: {users_with_activity}/{total_users} users ({join_pct:.1f}%)")
    if join_pct == 0:
        print("   CRITICAL: No users matched. Check ID cleaning/strip logic.")
    elif join_pct < 10:
        print("   WARNING: Very low match rate. Potential data mismatch.")
    else:
        print("   SUCCESS: Activity data correctly mapped to Study Plans.")

    # 2. Variable Ranges (Sanity Check)
    print(f"\n2. FEATURE RANGES:")
    print(f"   - Max Days Post-Exam: {risk_report['activity_days_after_exam'].max():.0f}")
    print(f"   - Max Score Jump:     {risk_report['max_score_jump'].max():.1f}")
    print(f"   - Max Score Std Dev:  {risk_report['score_std'].max():.2f}")

    # 3. Data Integrity
    null_count = risk_report.isnull().sum().sum()
    print(f"\n3. INTEGRITY CHECK:")
    print(f"   - Null values in report: {null_count}")

    # 4. Sample Cross-Check
    print(f"\n4. SAMPLE RAW DATA (df_act):")
    print(df_act[['user_id', 'activity_timestamp']].head(3))

    print("="*60)

# Run the health check
check_pipeline_health(df_act, df_exams, df_plans, risk_report, df_merged_act)

                  DATA PIPELINE DIAGNOSTIC                  
1. JOIN SUCCESS: 12910/26968 users (47.9%)
   SUCCESS: Activity data correctly mapped to Study Plans.

2. FEATURE RANGES:
   - Max Days Post-Exam: 145
   - Max Score Jump:     1.0
   - Max Score Std Dev:  0.57

3. INTEGRITY CHECK:
   - Null values in report: 0

4. SAMPLE RAW DATA (df_act):
                                user_id  activity_timestamp
0  fb65cf6f-bb87-4165-8397-f97353971bad 2025-11-08 22:18:47
1  95b27553-ded6-46b8-a030-d2a8ded1c499 2025-08-29 20:04:49
2  c1c27c21-b61c-4051-9144-a9738477ba7f 2025-06-17 04:13:19


In [23]:
# 1. Ensure we are looking at the collapsed risk_report, not the merged raw activity
# The risk_report we built earlier is already 1 row per user
flagged_export = risk_report[risk_report['is_suspicious']].copy()

# 2. Final check for duplicates (just in case df_plans had double IDs)
flagged_export = flagged_export.drop_duplicates(subset=['user_id'])

# 3. Rename with the professional labels we discussed
export_label_map = {
    'user_id': 'USER_ID',
    'activity_days_after_exam': 'DAYS_ACTIVE_POST_EXAM',
    'score_std': 'PERFORMANCE_VOLATILITY_SIGMA',
    'max_score_jump': 'MAX_SCORE_JUMP_DELTA'
}
flagged_export = flagged_export.rename(columns=export_label_map)

# 4. Save to CSV
flagged_export.to_csv('flagged_account_handoffs.csv', index=False)

print("="*60)
print(f"CLEAN EXPORT SUCCESSFUL")
print(f"Total Unique Users Exported: {len(flagged_export)}")
print("="*60)

CLEAN EXPORT SUCCESSFUL
Total Unique Users Exported: 1819


In [25]:
# 1. Clean the data to ensure 1 row per User ID
clean_risk_df = risk_report.drop_duplicates(subset=['user_id']).copy()

# 2. Define Filters
# Group A: Behavioral Overstay (Zombie Accounts)
zombie_filter = clean_risk_df['activity_days_after_exam'] > 30

# Group B: Performance Volatility (Potential Account Sharing/Handoff)
# Note: Flagging both std deviation OR a massive single jump
volatility_filter = (clean_risk_df['score_std'] > 0.25) | (clean_risk_df['max_score_jump'] > 0.50)

# 3. Create Dataframes
df_zombies = clean_risk_df[zombie_filter].copy()
df_volatility = clean_risk_df[volatility_filter].copy()

# 4. Standardize Labels for Export
export_labels = {
    'user_id': 'USER_ID',
    'activity_days_after_exam': 'DAYS_ACTIVE_POST_EXAM',
    'score_std': 'PERFORMANCE_VOLATILITY_SIGMA',
    'max_score_jump': 'MAX_SCORE_JUMP_DELTA'
}

# 5. Export to CSVs
try:
    df_zombies.rename(columns=export_labels).to_csv('flagged_zombie_accounts.csv', index=False)
    df_volatility.rename(columns=export_labels).to_csv('flagged_performance_volatility.csv', index=False)

    print("="*60)
    print(f"{'SEGREGATED EXPORT SUMMARY':^60}")
    print("="*60)
    print(f"Zombies (Days Flag):      {len(df_zombies)} users")
    print(f"Volatility (Score Flag):  {len(df_volatility)} users")
    print(f"\nFiles generated: \n1. flagged_zombie_accounts.csv \n2. flagged_performance_volatility.csv")
    print("="*60)
except Exception as e:
    print(f"Export Error: {e}")

                 SEGREGATED EXPORT SUMMARY                  
Zombies (Days Flag):      484 users
Volatility (Score Flag):  1783 users

Files generated: 
1. flagged_zombie_accounts.csv 
2. flagged_performance_volatility.csv
