# 01 — Data Exploration

Initial exploration of downloaded DCA data to determine:
1. What we actually have (schemas, row counts, column names)
2. Which of the 24 benchmark queries our data can support
3. How the raw tables map to the `reporting.system_*` tables the queries expect
4. What aggregation work is needed to bridge the gap

In [None]:
import json
import re
from pathlib import Path

import duckdb
import pandas as pd

DATA = Path("../data")
QUERIES = Path("../docs/queries")

con = duckdb.connect()

## 1. Inventory of downloaded data

In [None]:
# Parquet tables
parquet_tables = {}
for d in sorted(DATA.iterdir()):
    if d.is_dir():
        pfiles = sorted(d.glob("*.parquet"))
        if pfiles:
            parquet_tables[d.name] = pfiles

# Gzipped text tables (from dca_update_dec_2024)
gz_files = sorted(DATA.glob("*.gz"))

print("=== Parquet Tables ===")
for name, files in parquet_tables.items():
    total_mb = sum(f.stat().st_size for f in files) / 1e6
    print(f"  {name:45s} {len(files)} file(s)  {total_mb:>8.1f} MB")

print(f"\n=== Gzipped Text Files (dca_update_dec_2024) ===")
for f in gz_files:
    mb = f.stat().st_size / 1e6
    print(f"  {f.name:45s} {mb:>8.1f} MB")

## 2. Schema inspection — Parquet tables

In [None]:
for name, files in parquet_tables.items():
    glob = str(files[0].parent / "*.parquet")
    schema = con.execute(f"DESCRIBE SELECT * FROM read_parquet('{glob}')").df()
    nrows = con.execute(f"SELECT COUNT(*) FROM read_parquet('{glob}')").fetchone()[0]
    print(f"\n{'='*60}")
    print(f"{name} — {nrows:,} rows, {len(schema)} columns")
    print(f"{'='*60}")
    print(schema[['column_name', 'column_type']].to_string(index=False))

## 3. Schema inspection — Gzipped text files

DuckDB can read gzipped CSVs/TSVs directly. Let's detect the format and inspect.

In [None]:
for f in gz_files:
    try:
        schema = con.execute(
            f"DESCRIBE SELECT * FROM read_csv('{f}', auto_detect=true, sample_size=1000)"
        ).df()
        nrows = con.execute(
            f"SELECT COUNT(*) FROM read_csv('{f}', auto_detect=true, sample_size=1000)"
        ).fetchone()[0]
        print(f"\n{'='*60}")
        print(f"{f.name} — {nrows:,} rows, {len(schema)} columns")
        print(f"{'='*60}")
        print(schema[['column_name', 'column_type']].to_string(index=False))
    except Exception as e:
        print(f"\n{f.name}: ERROR — {e}")

### 3b. Verify new downloads — battery events and blocker history

In [None]:
# Check __tmp_batt_dc_events — unlocks 2 battery queries
batt_file = DATA / "__tmp_batt_dc_events.txt000.gz"
if batt_file.exists():
    batt_schema = con.execute(
        f"DESCRIBE SELECT * FROM read_csv('{batt_file}', auto_detect=true, sample_size=1000)"
    ).df()
    batt_rows = con.execute(
        f"SELECT COUNT(*) FROM read_csv('{batt_file}', auto_detect=true, sample_size=1000)"
    ).fetchone()[0]
    print(f"__tmp_batt_dc_events: {batt_rows:,} rows, {len(batt_schema)} columns")
    print(batt_schema[['column_name', 'column_type']].to_string(index=False))
    
    # The battery queries need: guid, num_power_ons, duration_mins
    print(f"\nSample rows:")
    print(con.execute(
        f"SELECT * FROM read_csv('{batt_file}', auto_detect=true, sample_size=1000) LIMIT 5"
    ).df().to_string())
else:
    print("__tmp_batt_dc_events.txt000.gz not found")

print("\n" + "="*60)

# Check mods_sleepstudy_top_blocker_hist — unlocks 2 blocker queries
blocker_file = DATA / "mods_sleepstudy_top_blocker_hist.txt000.gz"
if blocker_file.exists():
    blocker_schema = con.execute(
        f"DESCRIBE SELECT * FROM read_csv('{blocker_file}', auto_detect=true, sample_size=1000)"
    ).df()
    blocker_rows = con.execute(
        f"SELECT COUNT(*) FROM read_csv('{blocker_file}', auto_detect=true, sample_size=1000)"
    ).fetchone()[0]
    print(f"\nmods_sleepstudy_top_blocker_hist: {blocker_rows:,} rows, {len(blocker_schema)} columns")
    print(blocker_schema[['column_name', 'column_type']].to_string(index=False))
else:
    print("mods_sleepstudy_top_blocker_hist.txt000.gz not found")

## 4. Key column checks

### 4a. hw_metric_stats — what metric names are available?

The `name` column should contain values like `HW::PACKAGE:RAP:WATTS`, `HW::CORE:C0:PERCENT`, etc.
This determines whether we can build the 4 reporting tables needed for the 5-way chassis query.

In [None]:
hw_glob = str(DATA / "hw_metric_stats" / "*.parquet")

metric_names = con.execute(f"""
    SELECT name, COUNT(*) AS n, COUNT(DISTINCT guid) AS n_guids
    FROM read_parquet('{hw_glob}')
    GROUP BY name
    ORDER BY n DESC
""").df()

print(f"Total distinct metric names: {len(metric_names)}")
print(f"\nAll metric names:")
print(metric_names.to_string(index=False))

In [None]:
# Check specifically for the 4 metrics we need for the reporting tables
targets = ['RAP', 'PSYS', 'C0', 'FREQ', 'TEMP', 'POWER', 'PKG']

for t in targets:
    matches = metric_names[metric_names['name'].str.contains(t, case=False, na=False)]
    if len(matches) > 0:
        print(f"\n✓ Matches for '{t}':")
        print(matches.to_string(index=False))
    else:
        print(f"\n✗ No matches for '{t}'")

### 4b. sysinfo — verify key columns for query joins

In [None]:
sysinfo_glob = str(DATA / "system_sysinfo_unique_normalized" / "*.parquet")

# Check a few key columns
for col in ['chassistype', 'countryname_normalized', 'persona', 'cpu_family', 'processornumber', 'ram', 'os']:
    try:
        vals = con.execute(f"""
            SELECT "{col}", COUNT(*) AS n
            FROM read_parquet('{sysinfo_glob}')
            GROUP BY "{col}"
            ORDER BY n DESC
            LIMIT 10
        """).df()
        print(f"\n{col} — top 10 values:")
        print(vals.to_string(index=False))
    except Exception as e:
        print(f"\n✗ {col}: {e}")

### 4c. os_network_consumption_v2 — verify input_description values

In [None]:
net_glob = str(DATA / "os_network_consumption_v2" / "*.parquet")

net_descs = con.execute(f"""
    SELECT input_description, COUNT(*) AS n
    FROM read_parquet('{net_glob}')
    GROUP BY input_description
    ORDER BY n DESC
""").df()

print("input_description values:")
print(net_descs.to_string(index=False))

# The queries need these exact strings:
needed = [
    'OS:NETWORK INTERFACE::BYTES RECEIVED/SEC::',
    'OS:NETWORK INTERFACE::BYTES SENT/SEC::',
]
for n in needed:
    found = n in net_descs['input_description'].values
    print(f"\n{'✓' if found else '✗'} '{n}' — {'FOUND' if found else 'MISSING'}")

### 4d. web_cat_pivot — check column names match persona query expectations

In [None]:
pivot_glob = str(DATA / "web_cat_pivot" / "*.parquet")

pivot_schema = con.execute(f"DESCRIBE SELECT * FROM read_parquet('{pivot_glob}')").df()
pivot_cols = set(pivot_schema['column_name'].values)

# The persona query references these column names (from the SQL)
expected_cols = {
    'guid', 'content_creation_photo_edit_creation', 'content_creation_video_audio_edit_creation',
    'content_creation_web_design_development', 'education_education', 'entertainment_music_audio_streaming',
    'entertainment_other', 'entertainment_video_streaming', 'finance_banking_and_accounting',
    'games_other', 'games_video_games', 'mail_mail', 'news_news', 'other_unclassified',
    'private_private', 'productivity_crm', 'productivity_other', 'productivity_presentations',
    'productivity_programming', 'productivity_project_management', 'productivity_spreadsheets',
    'productivity_word_processing', 'recreation_travel', 'reference_reference', 'search_search',
    'shopping_shopping', 'social_social_network', 'social_communication', 'social_communication_live',
}

print(f"Pivot table columns ({len(pivot_cols)}): {sorted(pivot_cols)}")
print(f"\nExpected columns ({len(expected_cols)}):")

# The persona query SQL uses shortened names (e.g., 'education' not 'education_education')
# Let's check what matches and what doesn't
for ec in sorted(expected_cols):
    if ec in pivot_cols:
        print(f"  ✓ {ec}")
    else:
        # Try partial match
        partial = [c for c in pivot_cols if ec in c or c in ec]
        if partial:
            print(f"  ~ {ec} → partial match: {partial}")
        else:
            print(f"  ✗ {ec}")

### 4e. os_memsam_avail_percent — check column names for RAM histogram query

In [None]:
mem_glob = str(DATA / "os_memsam_avail_percent" / "*.parquet")

# The RAM query expects: sysinfo_ram, guid, nrs, avg_percentage_used
# Our raw table has: sample_count, average, below_0..below_100, equal_to_greater_than_100
# We need to derive avg_percentage_used from (100 - average) and map sample_count -> nrs

sample = con.execute(f"""
    SELECT *
    FROM read_parquet('{mem_glob}')
    LIMIT 5
""").df()

print("Sample rows (first 5):")
print(sample.to_string())
print(f"\nNote: 'average' column = % available memory (not % used)")
print(f"avg_percentage_used = 100 - average")
print(f"nrs = sample_count")
print(f"sysinfo_ram must come from JOIN with sysinfo table")

## 5. Query coverage analysis

Parse all 24 benchmark queries, extract the `reporting.system_*` tables they reference,
and determine which ones we can actually serve with our downloaded data.

In [None]:
# Parse all queries and extract table references
queries = []
for qf in sorted(QUERIES.glob("*.json")):
    with open(qf) as f:
        data = json.load(f)
        if isinstance(data, list):
            data = data[0]
        sql = data.get('sql', '')
        # Extract reporting.system_* table names
        tables = set(re.findall(r'reporting\.(\w+)', sql))
        queries.append({
            'file': qf.stem,
            'question': data.get('question', '')[:80] + '...',
            'tables': tables,
            'sql': sql,
        })

print(f"Total benchmark queries: {len(queries)}\n")

# Map reporting tables to our available data
AVAILABLE_DATA = {
    # Direct matches (Parquet)
    'system_sysinfo_unique_normalized': 'system_sysinfo_unique_normalized/*.parquet',
    # Raw tables that need aggregation into reporting schema
    'system_network_consumption': 'os_network_consumption_v2/*.parquet (needs column rename: input_description->input_desc, nr_samples->nrs)',
    'system_web_cat_usage': 'web_cat_usage_v2/*.parquet (direct, has browser/duration_ms/guid)',
    'system_web_cat_pivot_duration': 'web_cat_pivot/*.parquet (has 28 category columns, needs days/total_duration derivation)',
    'system_memory_utilization': 'os_memsam_avail_percent/*.parquet (needs: avg_percentage_used=100-average, nrs=sample_count, sysinfo_ram from JOIN)',
    'system_hw_pkg_power': 'hw_metric_stats/*.parquet (filter by PKG_POWER metric name)',
    # Reporting tables derived from hw_metric_stats (need to verify metric names exist)
    'system_psys_rap_watts': 'hw_metric_stats/*.parquet (filter by PSYS_RAP metric — NEEDS VERIFICATION)',
    'system_pkg_C0': 'hw_metric_stats/*.parquet (filter by C0 metric — NEEDS VERIFICATION)',
    'system_pkg_avg_freq_mhz': 'hw_metric_stats/*.parquet (filter by FREQ metric — NEEDS VERIFICATION)',
    'system_pkg_temp_centigrade': 'hw_metric_stats/*.parquet (filter by TEMP metric — NEEDS VERIFICATION)',
    # From dca_update_dec_2024 gzipped text
    'system_cpu_metadata': 'system_cpu_metadata.txt000.gz',
    'system_os_codename_history': 'system_os_codename_history.txt000.gz',
    'system_on_off_suspend_time_day': 'guids_on_off_suspend_time_day.txt000.gz',
    'system_mods_top_blocker_hist': 'mods_sleepstudy_top_blocker_hist.txt000.gz (92M rows)',
    'system_batt_dc_events': '__tmp_batt_dc_events.txt000.gz (12 MiB, pre-aggregated)',
}

NOT_AVAILABLE = {
    'system_mods_power_consumption': 'NOT DOWNLOADED — need mods_sleepstudy_power_estimation_data_13wks (unknown size)',
    'system_display_devices': 'NOT DOWNLOADED — display_devices.txt000.gz (6.15 GiB)',
    'system_frgnd_apps_types': 'NOT DOWNLOADED — __tmp_fgnd_apps_date (21.6 GiB) or frgnd_system_usage_by_app (338 GiB)',
    'system_userwait': 'NOT DOWNLOADED — userwait_v2 (59 GiB)',
}

In [None]:
# Classify each query
feasible = []
infeasible = []
partial = []

for q in queries:
    available = [t for t in q['tables'] if t in AVAILABLE_DATA]
    missing = [t for t in q['tables'] if t not in AVAILABLE_DATA]
    
    if not missing:
        feasible.append(q)
    elif not available:
        infeasible.append(q)
    else:
        partial.append(q)

print(f"=== FEASIBLE ({len(feasible)}/{len(queries)}) ===")
for q in feasible:
    print(f"  ✓ {q['file']}")
    print(f"    Tables: {', '.join(q['tables'])}")

print(f"\n=== INFEASIBLE — missing data ({len(infeasible)}/{len(queries)}) ===")
for q in infeasible:
    missing = [t for t in q['tables'] if t not in AVAILABLE_DATA]
    print(f"  ✗ {q['file']}")
    for t in missing:
        reason = NOT_AVAILABLE.get(t, 'Unknown source')
        print(f"    Missing: {t} — {reason}")

print(f"\n=== PARTIAL — some tables available ({len(partial)}/{len(queries)}) ===")
for q in partial:
    available = [t for t in q['tables'] if t in AVAILABLE_DATA]
    missing = [t for t in q['tables'] if t not in AVAILABLE_DATA]
    print(f"  ~ {q['file']}")
    print(f"    Have: {', '.join(available)}")
    for t in missing:
        reason = NOT_AVAILABLE.get(t, 'Unknown source')
        print(f"    Missing: {t} — {reason}")

## 6. Guid overlap check

Since we only downloaded partial parquet files for event tables,
check how many guids overlap between sysinfo (anchor) and each event table.

In [None]:
sysinfo_guids = con.execute(f"""
    SELECT COUNT(DISTINCT guid) FROM read_parquet('{sysinfo_glob}')
""").fetchone()[0]
print(f"Sysinfo unique guids: {sysinfo_guids:,}")

for name, files in parquet_tables.items():
    if name == 'system_sysinfo_unique_normalized' or name == 'data_dictionary':
        continue
    glob = str(files[0].parent / "*.parquet")
    try:
        result = con.execute(f"""
            SELECT 
                COUNT(DISTINCT e.guid) AS event_guids,
                COUNT(DISTINCT CASE WHEN s.guid IS NOT NULL THEN e.guid END) AS overlapping_guids
            FROM read_parquet('{glob}') e
            LEFT JOIN read_parquet('{sysinfo_glob}') s ON e.guid = s.guid
        """).fetchone()
        event_guids, overlap = result
        pct = (overlap / event_guids * 100) if event_guids > 0 else 0
        print(f"  {name:40s} {event_guids:>8,} guids, {overlap:>8,} overlap ({pct:.1f}%)")
    except Exception as e:
        print(f"  {name:40s} ERROR: {e}")

## 7. Summary

Quick printout of the final status.

In [None]:
print(f"Data downloaded: {sum(f.stat().st_size for f in DATA.rglob('*') if f.is_file()) / 1e9:.1f} GiB")
print(f"Parquet tables: {len(parquet_tables)}")
print(f"Gzipped text files: {len(gz_files)}")
print(f"")
print(f"Benchmark queries: {len(queries)} total")
print(f"  Feasible:   {len(feasible)}")
print(f"  Partial:    {len(partial)}")
print(f"  Infeasible: {len(infeasible)}")
print(f"")
print("Next steps:")
print("  1. Verify hw_metric_stats metric names (cell 4a above)")
print("  2. Build reporting schema aggregation SQL for feasible queries")
print("  3. Run feasible benchmark queries to get ground truth results")
print("  4. Apply DP-SGD / Private Evolution to reporting tables")