In [9]:
import sys
import os
from pathlib import Path
import pandas as pd
from minio import Minio
from loguru import logger
from io import BytesIO

# Set working directory to project root
project_root = Path().parent
os.chdir(project_root)
sys.path.append('src')

logger.remove()
logger.add(sys.stderr, level="INFO")

# Create MinIO client directly
minio_client = Minio(
    "localhost:9000",
    access_key="minioadmin",
    secret_key="minioadmin",
    secure=False
)
bucket = "streampro-data"

# Create DuckDB connection directly 
import duckdb
conn = duckdb.connect(":memory:")

# Install extensions
conn.execute("INSTALL httpfs;")
conn.execute("LOAD httpfs;")
conn.execute("INSTALL parquet;")
conn.execute("LOAD parquet;")

print("Loading tables from MinIO...")

# Load each table directly
table_configs = {
    "trusted_users": "trusted/users/ingestion_date=2025-09-09/data.parquet",
    "trusted_videos": "trusted/videos/ingestion_date=2025-09-09/data.parquet", 
    "trusted_devices": "trusted/devices/ingestion_date=2025-09-09/data.parquet",
    "trusted_events": "trusted/events/ingestion_date=2025-09-09/data.parquet"
}

for table_name, minio_path in table_configs.items():
    try:
        # Read parquet from MinIO
        response = minio_client.get_object(bucket, minio_path)
        df = pd.read_parquet(BytesIO(response.data))
        
        # Create table in DuckDB
        conn.execute(f"DROP TABLE IF EXISTS {table_name}")
        conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df")
        
        print(f"✅ {table_name}: {len(df):,} rows loaded")
    except Exception as e:
        print(f"❌ Failed to load {table_name}: {e}")

# List loaded tables
result = conn.execute("SELECT table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' ORDER BY table_name").fetchall()
tables = [row[0] for row in result]
print(f"Available tables: {tables}")

Loading tables from MinIO...
✅ trusted_users: 100 rows loaded
✅ trusted_videos: 20 rows loaded
✅ trusted_devices: 5 rows loaded
✅ trusted_events: 13,703 rows loaded
Available tables: ['trusted_devices', 'trusted_events', 'trusted_users', 'trusted_videos']


# Data Analysis

In [13]:
users_count = conn.execute("SELECT COUNT(*) as count FROM trusted_users").df()
videos_count = conn.execute("SELECT COUNT(*) as count FROM trusted_videos").df()
devices_count = conn.execute("SELECT COUNT(*) as count FROM trusted_devices").df()
events_count = conn.execute("SELECT COUNT(*) as count FROM trusted_events").df()

print(f"Users: {users_count['count'].iloc[0]:,}")
print(f"Videos: {videos_count['count'].iloc[0]:,}")
print(f"Devices: {devices_count['count'].iloc[0]:,}")
print(f"Events: {events_count['count'].iloc[0]:,}")

Users: 100
Videos: 20
Devices: 5
Events: 13,703


In [15]:
result = conn.execute("""
    SELECT
        user_id,
        MIN(session_id) as first_session_id,
        max(session_id) as last_session_id
    FROM trusted_events
    GROUP BY user_id
""").df()
result[:10]

Unnamed: 0,user_id,first_session_id,last_session_id
0,user_1,user_1_sess_0_0,user_1_sess_4_1
1,user_5,user_5_sess_0_0,user_5_sess_4_2
2,user_16,user_16_sess_0_0,user_16_sess_4_0
3,user_21,user_21_sess_0_0,user_21_sess_4_1
4,user_23,user_23_sess_0_0,user_23_sess_4_2
5,user_25,user_25_sess_0_0,user_25_sess_4_0
6,user_34,user_34_sess_0_0,user_34_sess_4_1
7,user_51,user_51_sess_0_0,user_51_sess_4_1
8,user_59,user_59_sess_0_0,user_59_sess_4_2
9,user_63,user_63_sess_0_0,user_63_sess_4_0


In [14]:
result = conn.execute("""
    WITH user_first_sessions AS (
        SELECT
            user_id,
            MIN(session_id) as first_session_id
        FROM trusted_events
        GROUP BY user_id
    ),
    first_session_watch_times AS (
        SELECT
            ufs.user_id,
            SUM(CAST(e.value AS DOUBLE)) as total_watch_time
        FROM user_first_sessions ufs
        INNER JOIN trusted_events e
            ON ufs.user_id = e.user_id
            AND ufs.first_session_id = e.session_id
        WHERE e.event_name = 'watch_time'
            AND e.value IS NOT NULL
            AND e.value > 0
        GROUP BY ufs.user_id
    )
    SELECT
        COUNT(DISTINCT u.user_id) as total_users,
        COUNT(DISTINCT CASE
            WHEN fswt.total_watch_time >= 30 THEN fswt.user_id
        END) as users_with_30_plus,
        ROUND(
            100.0 * COUNT(DISTINCT CASE
                WHEN fswt.total_watch_time >= 30 THEN fswt.user_id
            END) / NULLIF(COUNT(DISTINCT u.user_id), 0),
            2
        ) as pct_reaching_30_seconds
    FROM trusted_users u
    LEFT JOIN first_session_watch_times fswt ON u.user_id = fswt.user_id
""").df()

print(f"Total users: {result['total_users'].iloc[0]:,}")
print(f"Users reaching 30+ seconds: {result['users_with_30_plus'].iloc[0]:,}")
print(f"Percentage: {result['pct_reaching_30_seconds'].iloc[0]}%")

Total users: 100
Users reaching 30+ seconds: 1
Percentage: 1.0%


## Data Understanding for Q1 Analysis

In [28]:
# Understanding session ID format: user_{id}_sess_{day}_{sub_session}
session_structure = conn.execute("""
    SELECT DISTINCT 
        session_id,
        SPLIT_PART(session_id, '_', 1) || '_' || SPLIT_PART(session_id, '_', 2) as user_part,
        SPLIT_PART(session_id, '_', 4) as day_index,
        SPLIT_PART(session_id, '_', 5) as sub_session_index
    FROM trusted_events
    WHERE user_id = 'user_1'
    ORDER BY session_id
""").df()

print("Understanding session ID format:")
session_structure

Understanding session ID format:


Unnamed: 0,session_id,user_part,day_index,sub_session_index
0,user_1_sess_0_0,user_1,0,0
1,user_1_sess_1_0,user_1,1,0
2,user_1_sess_2_0,user_1,2,0
3,user_1_sess_3_0,user_1,3,0
4,user_1_sess_3_1,user_1,3,1
5,user_1_sess_3_2,user_1,3,2
6,user_1_sess_4_0,user_1,4,0
7,user_1_sess_4_1,user_1,4,1


In [29]:
# User session overview - count sessions per user
user_sessions = conn.execute("""
    SELECT 
        user_id,
        COUNT(DISTINCT session_id) as total_sessions,
        MIN(session_id) as first_session,
        MAX(session_id) as last_session,
        MAX(CAST(SPLIT_PART(session_id, '_', 4) AS INTEGER)) + 1 as active_days
    FROM trusted_events
    GROUP BY user_id
    ORDER BY total_sessions DESC
    LIMIT 10
""").df()

print("User session overview:")
user_sessions

User session overview:


Unnamed: 0,user_id,total_sessions,first_session,last_session,active_days
0,user_95,15,user_95_sess_0_0,user_95_sess_4_2,5
1,user_9,13,user_9_sess_0_0,user_9_sess_4_2,5
2,user_22,13,user_22_sess_0_0,user_22_sess_4_2,5
3,user_23,13,user_23_sess_0_0,user_23_sess_4_2,5
4,user_3,13,user_3_sess_0_0,user_3_sess_4_0,5
5,user_28,13,user_28_sess_0_0,user_28_sess_4_1,5
6,user_46,13,user_46_sess_0_0,user_46_sess_4_1,5
7,user_67,13,user_67_sess_0_0,user_67_sess_4_1,5
8,user_4,12,user_4_sess_0_0,user_4_sess_4_0,5
9,user_73,12,user_73_sess_0_0,user_73_sess_4_2,5


In [25]:
# Daily session patterns - multiple sessions per day
daily_patterns = conn.execute("""
    SELECT 
        SPLIT_PART(session_id, '_', 1) || '_' || SPLIT_PART(session_id, '_', 2) as user_id,
        SPLIT_PART(session_id, '_', 4) as day_index,
        COUNT(DISTINCT session_id) as sessions_per_day,
        GROUP_CONCAT(SPLIT_PART(session_id, '_', 5) ORDER BY session_id) as sub_session_indices
    FROM trusted_events
    WHERE user_id IN ('user_1', 'user_2', 'user_3')
    GROUP BY 1, 2
    HAVING COUNT(DISTINCT session_id) > 1
    ORDER BY 1, CAST(day_index AS INTEGER)
""").df()

print("Days with multiple sessions:")
daily_patterns

Days with multiple sessions:


Unnamed: 0,user_id,day_index,sessions_per_day,sub_session_indices
0,user_1,3,3,"0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,..."
1,user_1,4,2,"0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,..."
2,user_2,0,2,"0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,..."
3,user_2,1,3,"0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,..."
4,user_2,3,3,"0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,..."
5,user_2,4,3,"0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,..."
6,user_3,0,3,"0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,..."
7,user_3,1,3,"0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,..."
8,user_3,2,3,"0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,..."
9,user_3,3,3,"0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,2,2,2,2,2,..."


In [30]:
# Detailed session timeline for user_1
user1_timeline = conn.execute("""
    SELECT 
        session_id,
        SPLIT_PART(session_id, '_', 4) as day_index,
        SPLIT_PART(session_id, '_', 5) as sub_session,
        MIN(timestamp) as session_start,
        MAX(timestamp) as session_end,
        COUNT(*) as event_count,
        COUNT(CASE WHEN event_name = 'watch_time' THEN 1 END) as watch_events,
        SUM(CASE WHEN event_name = 'watch_time' THEN CAST(value AS DOUBLE) ELSE 0 END) as total_watch_time
    FROM trusted_events
    WHERE user_id = 'user_1'
    GROUP BY session_id, day_index, sub_session
    ORDER BY CAST(day_index AS INTEGER), CAST(sub_session AS INTEGER)
""").df()

print("User_1 detailed session timeline:")
user1_timeline

User_1 detailed session timeline:


Unnamed: 0,session_id,day_index,sub_session,session_start,session_end,event_count,watch_events,total_watch_time
0,user_1_sess_0_0,0,0,2025-04-18T04:47:00,2025-04-18T04:48:20,17,6,21.0
1,user_1_sess_1_0,1,0,2025-04-19T22:27:00,2025-04-19T22:27:55,12,3,10.0
2,user_1_sess_2_0,2,0,2025-04-20T12:34:00,2025-04-20T12:34:55,12,2,2.0
3,user_1_sess_3_0,3,0,2025-04-21T10:02:00,2025-04-21T10:02:35,8,3,8.0
4,user_1_sess_3_1,3,1,2025-04-21T19:07:00,2025-04-21T19:08:20,17,4,9.0
5,user_1_sess_3_2,3,2,2025-04-21T18:58:00,2025-04-21T18:58:40,9,5,16.0
6,user_1_sess_4_0,4,0,2025-04-22T14:22:00,2025-04-22T14:23:25,18,6,19.0
7,user_1_sess_4_1,4,1,2025-04-22T20:31:00,2025-04-22T20:31:35,8,2,7.0


## Q1: What % of new users reach at least 30 seconds of watch_time in their first session?

In [35]:
# Q1 Analysis: Proper interpretation
q1_analysis = conn.execute("""
    WITH user_first_sessions AS (
        SELECT
            user_id,
            MIN(session_id) as first_session_id
        FROM trusted_events
        GROUP BY user_id
    ),
    first_session_watch_times AS (
        SELECT
            ufs.user_id,
            ufs.first_session_id,
            SUM(CAST(e.value AS DOUBLE)) as total_watch_time
        FROM user_first_sessions ufs
        INNER JOIN trusted_events e
            ON ufs.user_id = e.user_id
            AND ufs.first_session_id = e.session_id
        WHERE e.event_name = 'watch_time'
            AND e.value IS NOT NULL
            AND e.value > 0
        GROUP BY ufs.user_id, ufs.first_session_id
    )
    SELECT
        COUNT(DISTINCT u.user_id) as total_users,
        COUNT(DISTINCT fswt.user_id) as users_with_watch_time,
        COUNT(DISTINCT CASE WHEN fswt.total_watch_time >= 30 THEN fswt.user_id END) as users_with_30_plus,
        ROUND(100.0 * COUNT(DISTINCT CASE WHEN fswt.total_watch_time >= 30 THEN fswt.user_id END) / NULLIF(COUNT(DISTINCT u.user_id), 0), 2) as pct_reaching_30_seconds
    FROM trusted_users u
    LEFT JOIN first_session_watch_times fswt ON u.user_id = fswt.user_id
""").df()

print("Q1 ANSWER:")
print(f"  Total users: {q1_analysis['total_users'].iloc[0]:,}")
print(f"  Users with watch time in first session: {q1_analysis['users_with_watch_time'].iloc[0]:,}")
print(f"  Users reaching 30+ seconds: {q1_analysis['users_with_30_plus'].iloc[0]:,}")
print(f"  *** FINAL ANSWER: {q1_analysis['pct_reaching_30_seconds'].iloc[0]}% ***")

q1_analysis

Q1 ANSWER:
  Total users: 100
  Users with watch time in first session: 97
  Users reaching 30+ seconds: 1
  *** FINAL ANSWER: 1.0% ***


Unnamed: 0,total_users,users_with_watch_time,users_with_30_plus,pct_reaching_30_seconds
0,100,97,1,1.0


In [33]:
# Show the successful user who reached 30+ seconds
successful_user = conn.execute("""
    WITH user_first_sessions AS (
        SELECT
            user_id,
            MIN(session_id) as first_session_id
        FROM trusted_events
        GROUP BY user_id
    ),
    first_session_watch_times AS (
        SELECT
            ufs.user_id,
            ufs.first_session_id,
            SUM(CAST(e.value AS DOUBLE)) as total_watch_time
        FROM user_first_sessions ufs
        INNER JOIN trusted_events e
            ON ufs.user_id = e.user_id
            AND ufs.first_session_id = e.session_id
        WHERE e.event_name = 'watch_time'
            AND e.value IS NOT NULL
            AND e.value > 0
        GROUP BY ufs.user_id, ufs.first_session_id
    )
    SELECT 
        user_id,
        first_session_id,
        total_watch_time
    FROM first_session_watch_times
    WHERE total_watch_time >= 30
    ORDER BY total_watch_time DESC
""").df()

print("User who reached 30+ seconds in first session:")
successful_user

User who reached 30+ seconds in first session:


Unnamed: 0,user_id,first_session_id,total_watch_time
0,user_78,user_78_sess_0_0,39.0


## Q2: Which video genres drive the highest 2nd-session retention within 3 days?

In [37]:
# Check available video genres
genres_overview = conn.execute("""
    SELECT 
        genre,
        COUNT(*) as video_count,
        COUNT(DISTINCT e.user_id) as users_exposed
    FROM trusted_videos v
    INNER JOIN trusted_events e ON v.video_id = e.video_id
    GROUP BY genre
    ORDER BY users_exposed DESC
""").df()

print("Video genres overview:")
genres_overview

Video genres overview:


Unnamed: 0,genre,video_count,users_exposed
0,Comedy,4089,100
1,Action,4780,100
2,Drama,2067,100
3,Documentary,2767,100


In [38]:
# Q2 Analysis: 2nd-session retention by genre
q2_analysis = conn.execute("""
    WITH user_first_sessions AS (
        -- Get each user's first session details
        SELECT 
            e.user_id,
            MIN(e.session_id) as first_session_id,
            SUBSTRING(MIN(e.timestamp), 1, 10) as first_session_date
        FROM trusted_events e
        GROUP BY e.user_id
    ),
    first_session_genres AS (
        -- Get genres watched in first session
        SELECT 
            ufs.user_id,
            ufs.first_session_id,
            ufs.first_session_date,
            v.genre,
            COUNT(DISTINCT e.video_id) as videos_watched
        FROM user_first_sessions ufs
        INNER JOIN trusted_events e 
            ON ufs.user_id = e.user_id 
            AND ufs.first_session_id = e.session_id
        INNER JOIN trusted_videos v ON e.video_id = v.video_id
        GROUP BY ufs.user_id, ufs.first_session_id, ufs.first_session_date, v.genre
    ),
    user_second_sessions AS (
        -- Check if users had a 2nd session within 3 days
        SELECT DISTINCT
            fsg.user_id,
            fsg.genre,
            CASE WHEN COUNT(DISTINCT later_sessions.session_id) > 0 THEN 1 ELSE 0 END as has_second_session
        FROM first_session_genres fsg
        LEFT JOIN trusted_events later_sessions
            ON fsg.user_id = later_sessions.user_id
            AND later_sessions.session_id > fsg.first_session_id  -- Any session after first
            AND SUBSTRING(later_sessions.timestamp, 1, 10) <= CAST(DATE_ADD(CAST(fsg.first_session_date AS DATE), INTERVAL 3 DAY) AS VARCHAR)
        GROUP BY fsg.user_id, fsg.genre
    )
    SELECT 
        genre,
        COUNT(*) as users_exposed_to_genre,
        SUM(has_second_session) as users_with_second_session,
        ROUND(100.0 * SUM(has_second_session) / NULLIF(COUNT(*), 0), 1) as retention_rate_pct
    FROM user_second_sessions
    GROUP BY genre
    ORDER BY retention_rate_pct DESC, users_exposed_to_genre DESC
""").df()

print("Q2 RESULTS:")
print(q2_analysis.to_string())

print(f"\n*** ANSWER: {q2_analysis.iloc[0]['genre']} drives the highest 2nd-session retention at {q2_analysis.iloc[0]['retention_rate_pct']}% ***")

Q2 RESULTS:
         genre  users_exposed_to_genre  users_with_second_session  retention_rate_pct
0       Action                      98                       98.0               100.0
1       Comedy                      97                       97.0               100.0
2  Documentary                      90                       90.0               100.0
3        Drama                      86                       86.0               100.0

*** ANSWER: Action drives the highest 2nd-session retention at 100.0% ***


In [39]:
# Investigate why retention rates are so high
retention_investigation = conn.execute("""
    SELECT 
        'Total Users' as metric,
        COUNT(DISTINCT user_id) as value
    FROM trusted_events
    
    UNION ALL
    
    SELECT 
        'Users with 2+ sessions' as metric,
        COUNT(DISTINCT user_id) as value
    FROM (
        SELECT user_id, COUNT(DISTINCT session_id) as session_count
        FROM trusted_events
        GROUP BY user_id
        HAVING session_count >= 2
    )
    
    UNION ALL
    
    SELECT 
        'Users with sessions spanning multiple days' as metric,
        COUNT(DISTINCT user_id) as value
    FROM (
        SELECT 
            user_id,
            COUNT(DISTINCT SUBSTRING(timestamp, 1, 10)) as unique_days
        FROM trusted_events
        GROUP BY user_id
        HAVING unique_days >= 2
    )
""").df()

print("Understanding high retention rates:")
retention_investigation

Understanding high retention rates:


Unnamed: 0,metric,value
0,Total Users,100
1,Users with sessions spanning multiple days,100
2,Users with 2+ sessions,100
