In [2]:
%pip install polars
%pip install duckdb

Collecting polars
  Using cached polars-1.35.1-py3-none-any.whl.metadata (10 kB)
Collecting polars-runtime-32==1.35.1 (from polars)
  Using cached polars_runtime_32-1.35.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.5 kB)
Using cached polars-1.35.1-py3-none-any.whl (783 kB)
Using cached polars_runtime_32-1.35.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (41.3 MB)
Installing collected packages: polars-runtime-32, polars
Successfully installed polars-1.35.1 polars-runtime-32-1.35.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting duckdb
  Using cached duckdb-1.4.1-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (14 kB)
Using cached duckdb-1.4.1-cp311-cp31

In [2]:
import polars as pl

# Load your raw notes dataset (the pre-aggregated one)
path = "/home/jovyan/Shared/2025-09-27-input/notes-00000.parquet"
df = pl.read_parquet(path)

df.head()


Unique classification values (printed one per line):

‚Ä¢ 'MISINFORMED_OR_POTENTIALLY_MISLEADING'
‚Ä¢ 'NOT_MISLEADING'


In [11]:
import polars as pl
import os
from datetime import datetime

# =========================================================
# CONFIG
# =========================================================
INPUT_PATH  = "/home/jovyan/Shared/2025-09-27-input/notes-00000.parquet"
OUTPUT_DIR  = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core"
OUTPUT_NAME = "notes_writers_2023_aggregated.parquet"

CUTOFF_DATE = datetime(2023, 1, 1)

# =========================================================
# PIPELINE
# =========================================================
print("üöÄ Cleaning + aggregating Community Notes (2023+) ...")

df = pl.scan_parquet(INPUT_PATH)

# --- Rename to consistent naming ---
df = df.rename({
    "noteAuthorParticipantId": "user_id",   # ‚úÖ your schema key
    "createdAtMillis": "created_at_ms"
})

# --- Drop deprecated + metadata columns ---
drop_cols = [
    "believable", "harmful", "validationDifficulty",
    "_processing_commit_hash", "_processed_at", "_data_date"
]
df = df.drop([c for c in drop_cols if c in df.collect_schema().names()])

# --- Convert ms ‚Üí datetime ---
df = df.with_columns([
    pl.from_epoch(pl.col("created_at_ms"), time_unit="ms").alias("created_at")
])

# --- Filter to 2023+ ---
df = df.filter(pl.col("created_at") >= pl.lit(CUTOFF_DATE))

# --- Truncate to 14-day windows ---
df = df.with_columns([
    pl.col("created_at").dt.truncate("14d").alias("period_start")
])

# --- Identify binary (checkbox) columns dynamically ---
main_cols = [
    "noteId", "tweetId", "user_id", "created_at", "created_at_ms",
    "summary", "classification", "period_start"
]
binary_cols = [c for c in df.collect_schema().names() if c not in main_cols]

# --- Aggregation expressions ---
agg_exprs = [
    pl.count().alias("notes_written"),
    (pl.col("classification") == "MISINFORMED_OR_POTENTIALLY_MISLEADING")
        .sum().alias("num_misleading"),
    (pl.col("classification") == "NOT_MISLEADING")
        .sum().alias("num_not_misleading"),
    (pl.col("trustworthySources") == 1).mean().alias("avg_trustworthySources"),
    pl.col("isMediaNote").mean().alias("media_note_ratio"),
]

# --- Summed reasoning flags ---
misleading_cols = [c for c in binary_cols if c.startswith("misleading")]
not_misleading_cols = [c for c in binary_cols if c.startswith("notMisleading")]

if misleading_cols:
    agg_exprs.append(
        pl.sum_horizontal([pl.col(c) for c in misleading_cols]).alias("misleading_flag_sum")
    )
if not_misleading_cols:
    agg_exprs.append(
        pl.sum_horizontal([pl.col(c) for c in not_misleading_cols]).alias("not_misleading_flag_sum")
    )

# --- Perform aggregation ---
agg = (
    df.group_by(["user_id", "period_start"])
      .agg(agg_exprs)
      .with_columns([
          (pl.col("num_misleading") / pl.col("notes_written")).alias("share_misleading"),
          ((pl.col("misleading_flag_sum") + pl.col("not_misleading_flag_sum")) /
           pl.col("notes_written")).alias("avg_flags_per_note")
      ])
      .sort(["user_id", "period_start"])
)

# --- Execute + Save ---
result = agg.collect()
out_path = os.path.join(OUTPUT_DIR, OUTPUT_NAME)
result.write_parquet(out_path)

print(f"‚úÖ Aggregation complete ‚Äî saved to:\n{out_path}")
print(f"Rows: {result.height:,} | Columns: {len(result.columns)}")
result.head(5)



üöÄ Cleaning + aggregating Community Notes (2023+) ...


(Deprecated in version 0.20.5)
  pl.count().alias("notes_written"),


‚úÖ Aggregation complete ‚Äî saved to:
/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/notes_writers_2023_aggregated.parquet
Rows: 1,028,703 | Columns: 11


user_id,period_start,notes_written,num_misleading,num_not_misleading,avg_trustworthySources,media_note_ratio,misleading_flag_sum,not_misleading_flag_sum,share_misleading,avg_flags_per_note
str,datetime[ms],u32,u32,u32,f64,f64,list[i64],list[i64],f64,list[f64]
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-03-30 00:00:00,5,5,0,1.0,0.0,"[2, 1, ‚Ä¶ 1]","[0, 0, ‚Ä¶ 0]",1.0,"[0.4, 0.2, ‚Ä¶ 0.2]"
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-04-27 00:00:00,1,1,0,1.0,0.0,[2],[0],1.0,[2.0]
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-05-11 00:00:00,1,0,1,1.0,0.0,[0],[1],0.0,[1.0]
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-06-08 00:00:00,1,1,0,1.0,0.0,[2],[0],1.0,[2.0]
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-08-31 00:00:00,1,1,0,0.0,0.0,[3],[0],1.0,[3.0]


In [19]:
import polars as pl
import os
from datetime import datetime

# =========================================================
# CONFIG
# =========================================================
INPUT_PATH  = "/home/jovyan/Shared/2025-09-27-input/noteStatusHistory-00000.parquet"
OUTPUT_DIR  = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core"
OUTPUT_NAME = "note_status_history_2023_aggregated.parquet"

CUTOFF_DATE = datetime(2023, 1, 1)

# =========================================================
# PIPELINE
# =========================================================
print("üöÄ Cleaning + aggregating Note Status History (2023+) ...")

df = pl.scan_parquet(INPUT_PATH)

# --- Rename to consistent naming ---
df = df.rename({
    "noteAuthorParticipantId": "user_id",
    "createdAtMillis": "created_at_ms",
    "mostRecentNonNMRStatus": "latestNonNMRStatus",
})

# --- Drop metadata columns ---
drop_cols = [
    "timestampMinuteOfFinalScoringOutput",
    "timestampMillisOfRetroLock",
    "timestampMillisOfNmrDueToMinStableCrhTime",
    "timestampMillisOfFirstNmrDueToMinStableCrhTime",
    "_processing_commit_hash",
    "_processed_at",
    "_data_date",
]
df = df.drop([c for c in drop_cols if c in df.collect_schema().names()])

# --- Ensure timestamp columns are numeric ---
timestamp_cols = [
    "created_at_ms",
    "timestampMillisOfFirstNonNMRStatus",
    "timestampMillisOfStatusLock",
]

for col in timestamp_cols:
    if col in df.collect_schema().names():
        df = df.with_columns(pl.col(col).cast(pl.Int64, strict=False))


# --- Convert ms ‚Üí datetime ---
df = df.with_columns([
    pl.from_epoch(pl.col("created_at_ms"), time_unit="ms").alias("created_at")
])

# --- Filter to 2023+ only ---
df = df.filter(pl.col("created_at") >= pl.lit(CUTOFF_DATE))

# --- Truncate to 14-day windows ---
df = df.with_columns([
    pl.col("created_at").dt.truncate("14d").alias("period_start")
])

# --- Aggregations per user √ó period ---
agg_exprs = [
    pl.len().alias("notes_with_status"),
    (pl.col("currentStatus") == "CURRENTLY_RATED_HELPFUL")
        .sum().alias("notes_helpful"),
    (pl.col("currentStatus") == "CURRENTLY_RATED_NOT_HELPFUL")
        .sum().alias("notes_not_helpful"),
    (pl.col("currentStatus") == "NEEDS_MORE_RATINGS")
        .sum().alias("notes_nmr"),
    (pl.col("lockedStatus") != "").sum().alias("notes_locked"),
    (pl.col("firstNonNMRStatus") == "CURRENTLY_RATED_HELPFUL")
        .sum().alias("first_helpful"),
    (pl.col("latestNonNMRStatus") == "CURRENTLY_RATED_HELPFUL")
        .sum().alias("latest_helpful"),
    (pl.col("latestNonNMRStatus") == "CURRENTLY_RATED_NOT_HELPFUL")
        .sum().alias("latest_not_helpful"),

    # Timing metrics (average days)
    ((pl.col("timestampMillisOfFirstNonNMRStatus") - pl.col("created_at_ms")) / (1000 * 60 * 60 * 24))
        .mean().alias("avg_days_to_first_nonNMR"),
    ((pl.col("timestampMillisOfStatusLock") - pl.col("created_at_ms")) / (1000 * 60 * 60 * 24))
        .mean().alias("avg_days_to_lock"),
]

# --- Compute ratios ---
agg = (
    df.group_by(["user_id", "period_start"])
      .agg(agg_exprs)
      .with_columns([
          (pl.col("notes_helpful") / pl.col("notes_with_status")).alias("share_helpful"),
          (pl.col("notes_not_helpful") / pl.col("notes_with_status")).alias("share_not_helpful"),
      ])
      .sort(["user_id", "period_start"])
)

# --- Execute & save ---
result = agg.collect()
out_path = os.path.join(OUTPUT_DIR, OUTPUT_NAME)
result.write_parquet(out_path)

print(f"‚úÖ Aggregation complete ‚Äî saved to:\n{out_path}")
print(f"Rows: {result.height:,} | Columns: {len(result.columns)}")
result.head(5)



üöÄ Cleaning + aggregating Note Status History (2023+) ...
‚úÖ Aggregation complete ‚Äî saved to:
/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_status_history_2023_aggregated.parquet
Rows: 1,089,163 | Columns: 14


user_id,period_start,notes_with_status,notes_helpful,notes_not_helpful,notes_nmr,notes_locked,first_helpful,latest_helpful,latest_not_helpful,avg_days_to_first_nonNMR,avg_days_to_lock,share_helpful,share_not_helpful
str,datetime[ms],u32,u32,u32,u32,u32,u32,u32,u32,f64,f64,f64,f64
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-03-30 00:00:00,7,2,0,5,7,2,2,1,0.218733,29.193681,0.285714,0.0
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-04-27 00:00:00,1,0,0,1,1,0,0,0,,14.00007,0.0,0.0
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-05-11 00:00:00,1,0,0,1,1,0,0,0,,14.007234,0.0,0.0
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-06-08 00:00:00,1,0,0,1,1,0,0,0,,14.032289,0.0,0.0
"""000045A5FA0CF004F68CBF2913506C‚Ä¶",2023-08-31 00:00:00,1,0,0,1,1,0,0,0,,14.011199,0.0,0.0


In [20]:
import polars as pl
import os
from datetime import datetime

# =========================================================
# CONFIG
# =========================================================
INPUT_PATH  = "/home/jovyan/Shared/2025-09-27-input/userEnrollment-00000.parquet"
OUTPUT_DIR  = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core"
OUTPUT_NAME = "user_enrollment_2023_aggregated.parquet"

CUTOFF_DATE = datetime(2023, 1, 1)

# =========================================================
# PIPELINE
# =========================================================
print("üöÄ Cleaning + aggregating User Enrollment (2023+) ...")

df = pl.scan_parquet(INPUT_PATH)

# --- Rename for consistency ---
df = df.rename({"participantId": "user_id"})

# --- Drop metadata columns ---
drop_cols = ["_processing_commit_hash", "_processed_at", "_data_date"]
df = df.drop([c for c in drop_cols if c in df.collect_schema().names()])

# --- Ensure numeric timestamps ---
timestamp_cols = ["timestampOfLastStateChange", "timestampOfLastEarnOut"]
for col in timestamp_cols:
    if col in df.collect_schema().names():
        df = df.with_columns(pl.col(col).cast(pl.Int64, strict=False))

# --- Convert timestamps to datetime (for filtering & binning) ---
df = df.with_columns([
    pl.from_epoch(pl.col("timestampOfLastStateChange"), time_unit="ms").alias("last_state_change_dt")
])

# --- Filter to users active after 2023 ---
df = df.filter(pl.col("last_state_change_dt") >= pl.lit(CUTOFF_DATE))

# --- Create 14-day period bins from last_state_change_dt ---
df = df.with_columns([
    pl.col("last_state_change_dt").dt.truncate("14d").alias("period_start")
])

# --- Derived flags ---
df = df.with_columns([
    (pl.col("enrollmentState") == "newUser").cast(pl.Int8).alias("is_new_user"),
    (pl.col("enrollmentState") == "earnedIn").cast(pl.Int8).alias("is_earned_in"),
    (pl.col("enrollmentState") == "atRisk").cast(pl.Int8).alias("is_at_risk"),
    (pl.col("enrollmentState").str.contains("earnedOut")).cast(pl.Int8).alias("is_earned_out"),
    (pl.col("modelingPopulation") == "CORE").cast(pl.Int8).alias("is_core_population"),
    (pl.col("timestampOfLastEarnOut") != 1).cast(pl.Int8).alias("has_ever_earned_out")
])

# --- Compute days since state changes ---
current_time = pl.lit(datetime.now().timestamp() * 1000)
df = df.with_columns([
    ((current_time - pl.col("timestampOfLastStateChange")) / (1000 * 60 * 60 * 24))
        .alias("days_since_last_state_change"),
    (
        pl.when(pl.col("timestampOfLastEarnOut") != 1)
        .then((current_time - pl.col("timestampOfLastEarnOut")) / (1000 * 60 * 60 * 24))
        .otherwise(None)
        .alias("days_since_last_earnout")
    )
])

# --- Aggregate per user √ó period ---
agg = (
    df.group_by(["user_id", "period_start"])
      .agg([
          pl.len().alias("records"),  # modern replacement for count()
          pl.mean("successfulRatingNeededToEarnIn").alias("avg_successfulRatingNeededToEarnIn"),
          pl.mean("days_since_last_state_change").alias("avg_days_since_state_change"),
          pl.mean("days_since_last_earnout").alias("avg_days_since_earnout"),
          pl.max("is_new_user").alias("is_new_user"),
          pl.max("is_earned_in").alias("is_earned_in"),
          pl.max("is_at_risk").alias("is_at_risk"),
          pl.max("is_earned_out").alias("is_earned_out"),
          pl.max("has_ever_earned_out").alias("has_ever_earned_out"),
          pl.max("is_core_population").alias("is_core_population"),
          pl.mean("modelingGroup").alias("avg_modelingGroup"),
      ])
      .sort(["user_id", "period_start"])
)

# --- Execute + Save ---
result = agg.collect()
out_path = os.path.join(OUTPUT_DIR, OUTPUT_NAME)
result.write_parquet(out_path)

print(f"‚úÖ Aggregation complete ‚Äî saved to:\n{out_path}")
print(f"Rows: {result.height:,} | Columns: {len(result.columns)}")
result.head(5)


üöÄ Cleaning + aggregating User Enrollment (2023+) ...
‚úÖ Aggregation complete ‚Äî saved to:
/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/user_enrollment_2023_aggregated.parquet
Rows: 1,279,984 | Columns: 13


user_id,period_start,records,avg_successfulRatingNeededToEarnIn,avg_days_since_state_change,avg_days_since_earnout,is_new_user,is_earned_in,is_at_risk,is_earned_out,has_ever_earned_out,is_core_population,avg_modelingGroup
str,datetime[ms],u32,f64,f64,f64,i8,i8,i8,i8,i8,i8,f64
"""0000010BB832A9CFDF102BF7B66896‚Ä¶",2024-03-28 00:00:00,1,5.0,583.021264,,1,0,0,0,0,1,6.0
"""000011269AD6F327AED0F4086A732B‚Ä¶",2024-03-28 00:00:00,1,5.0,583.021264,,0,1,0,0,0,1,3.0
"""00001E2644DAE39EE4C52C373B921D‚Ä¶",2025-07-31 00:00:00,1,5.0,96.346167,,1,0,0,0,0,1,13.0
"""00002C7FD6E0080A69D0AB879C3D9B‚Ä¶",2025-08-28 00:00:00,1,5.0,61.836681,,0,1,0,0,0,1,1.0
"""0000315D36021A528D85155729DDBF‚Ä¶",2024-10-10 00:00:00,1,5.0,380.346775,,1,0,0,0,0,1,13.0


In [21]:
import polars as pl
import os
from datetime import datetime

# =========================================================
# CONFIG
# =========================================================
INPUT_PATH  = "/home/jovyan/Shared/2025-09-27-input/noteRequests-00000.parquet"
OUTPUT_DIR  = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core"
OUTPUT_NAME = "note_requests_2023_aggregated.parquet"

CUTOFF_DATE = datetime(2023, 1, 1)

# =========================================================
# PIPELINE
# =========================================================
print("üöÄ Cleaning + aggregating Note Requests (2023+) ...")

df = pl.scan_parquet(INPUT_PATH)

# --- Rename for consistency ---
df = df.rename({
    "userId": "user_id",
    "createdAtMillis": "created_at_ms"
})

# --- Drop metadata columns if present ---
drop_cols = ["_processing_commit_hash", "_processed_at", "_data_date"]
df = df.drop([c for c in drop_cols if c in df.collect_schema().names()])

# --- Ensure numeric timestamp ---
if "created_at_ms" in df.collect_schema().names():
    df = df.with_columns(pl.col("created_at_ms").cast(pl.Int64, strict=False))

# --- Convert to datetime ---
df = df.with_columns([
    pl.from_epoch(pl.col("created_at_ms"), time_unit="ms").alias("created_at")
])

# --- Filter to 2023+ ---
df = df.filter(pl.col("created_at") >= pl.lit(CUTOFF_DATE))

# --- 14-day bins ---
df = df.with_columns([
    pl.col("created_at").dt.truncate("14d").alias("period_start")
])

# --- Derived flags ---
df = df.with_columns([
    (pl.col("sourceLink").str.strip_chars().is_not_null() & (pl.col("sourceLink") != ""))
        .cast(pl.Int8)
        .alias("has_source_link")
])

# --- Aggregations per user √ó period ---
agg = (
    df.group_by(["user_id", "period_start"])
      .agg([
          pl.len().alias("notes_requested"),
          pl.n_unique("tweetId").alias("unique_tweets_requested"),
          pl.sum("has_source_link").alias("with_source_link")
      ])
      .with_columns([
          (pl.col("with_source_link") / pl.col("notes_requested")).alias("share_with_link")
      ])
      .sort(["user_id", "period_start"])
)

# --- Execute + Save ---
result = agg.collect()
out_path = os.path.join(OUTPUT_DIR, OUTPUT_NAME)
result.write_parquet(out_path)

print(f"‚úÖ Aggregation complete ‚Äî saved to:\n{out_path}")
print(f"Rows: {result.height:,} | Columns: {len(result.columns)}")
result.head(5)


üöÄ Cleaning + aggregating Note Requests (2023+) ...
‚úÖ Aggregation complete ‚Äî saved to:
/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_requests_2023_aggregated.parquet
Rows: 3,817,344 | Columns: 6


user_id,period_start,notes_requested,unique_tweets_requested,with_source_link,share_with_link
str,datetime[ms],u32,u32,i64,f64
"""000004CC6A9EA228A4D367C463A49B‚Ä¶",2024-08-29 00:00:00,1,1,0,0.0
"""00000D07B9A6256C0D44099CA726D3‚Ä¶",2025-04-24 00:00:00,1,1,0,0.0
"""000017720184E7316E75A7875EB214‚Ä¶",2025-07-03 00:00:00,2,2,2,1.0
"""00001E9887ECC2F2D27A2A8837939B‚Ä¶",2024-09-26 00:00:00,1,1,0,0.0
"""00002C7FD6E0080A69D0AB879C3D9B‚Ä¶",2024-11-21 00:00:00,1,1,0,0.0


In [23]:
import glob

parquet_files = sorted(glob.glob("/home/jovyan/Shared/2025-09-27-input/noteRating-*.parquet"))
print(f"Found {len(parquet_files)} files:")
for f in parquet_files[:5]:
    print(" ", f)


Found 0 files:


In [3]:
import os
import polars as pl
import duckdb
from datetime import datetime
from tqdm import tqdm
import gc

# =========================================================
# CONFIGURATION
# =========================================================
os.environ["POLARS_MAX_THREADS"] = "6"  # limit concurrency

INPUT_DIR   = "/home/jovyan/Shared/2025-09-27-input"
OUTPUT_DIR  = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings"
os.makedirs(OUTPUT_DIR, exist_ok=True)

CUTOFF_DATE = datetime(2023, 1, 1)
BATCH_SIZE  = 2_000_000  # number of rows per DuckDB chunk

# =========================================================
# HELPER DEFINITIONS
# =========================================================
helpful_cols = [
    "helpfulOther", "helpfulClear", "helpfulGoodSources",
    "helpfulAddressesClaim", "helpfulImportantContext", "helpfulUnbiasedLanguage"
]
not_helpful_cols = [
    "notHelpfulIncorrect", "notHelpfulMissingKeyPoints",
    "notHelpfulHardToUnderstand", "notHelpfulArgumentativeOrBiased",
    "notHelpfulSpamHarassmentOrAbuse", "notHelpfulIrrelevantSources",
    "notHelpfulOpinionSpeculation", "notHelpfulNoteNotNeeded"
]
keep_cols = [
    "raterParticipantId", "createdAtMillis", "noteId", "ratedOnTweetId",
    "agree", "disagree", "helpfulnessLevel"
] + helpful_cols + not_helpful_cols

def safe_cast(df, cols, dtype=pl.Int64):
    for c in cols:
        if c in df.columns:
            df = df.with_columns(pl.col(c).cast(dtype, strict=False))
    return df

# =========================================================
# MAIN LOOP
# =========================================================
for i in tqdm(range(20), desc="Processing noteRatings files", dynamic_ncols=True):
    file_name = f"noteRatings-{i:05d}.parquet"
    input_path = os.path.join(INPUT_DIR, file_name)

    if not os.path.exists(input_path):
        print(f"‚ö†Ô∏è  Skipping missing: {file_name}")
        continue

    print(f"\nüìÇ Processing {file_name} ...")
    con = duckdb.connect()
    total_rows = con.execute(f"SELECT COUNT(*) FROM '{input_path}'").fetchone()[0]
    print(f"  Rows in file: {total_rows:,}")

    chunk_results = []
    offset = 0
    batch_id = 0

    while offset < total_rows:
        batch_id += 1
        query = f"""
            SELECT {', '.join(keep_cols)}
            FROM '{input_path}'
            LIMIT {BATCH_SIZE} OFFSET {offset}
        """
        df = con.execute(query).pl()
        offset += BATCH_SIZE
        if df.is_empty():
            continue

        print(f"  üß© Batch {batch_id} ({len(df):,} rows)")

        # --- Clean and transform ---
        df = df.rename({"raterParticipantId": "user_id", "createdAtMillis": "created_at_ms"})
        df = safe_cast(df, ["created_at_ms", "agree", "disagree"] + helpful_cols + not_helpful_cols, pl.Int64)

        df = df.with_columns(pl.from_epoch(pl.col("created_at_ms"), time_unit="ms").alias("created_at"))
        df = df.filter(pl.col("created_at") >= pl.lit(CUTOFF_DATE))

        if df.is_empty():
            continue

        df = df.with_columns(pl.col("created_at").dt.truncate("14d").alias("period_start"))

        df = df.with_columns([
            (pl.col("helpfulnessLevel") == "HELPFUL").cast(pl.Int8).alias("is_helpful"),
            (pl.col("helpfulnessLevel") == "SOMEWHAT_HELPFUL").cast(pl.Int8).alias("is_somewhat_helpful"),
            (pl.col("helpfulnessLevel") == "NOT_HELPFUL").cast(pl.Int8).alias("is_not_helpful")
        ])

        df = df.with_columns([
            pl.sum_horizontal([pl.col(c) for c in helpful_cols]).alias("helpful_flags_sum"),
            pl.sum_horizontal([pl.col(c) for c in not_helpful_cols]).alias("not_helpful_flags_sum")
        ])

        # --- Group and aggregate ---
        agg = (
            df.lazy()
            .group_by(["user_id", "period_start"])
            .agg([
                pl.len().alias("ratings_given"),
                pl.n_unique("noteId").alias("notes_rated_unique"),
                pl.n_unique("ratedOnTweetId").alias("tweets_rated_unique"),
                pl.mean("agree").alias("agree_rate"),
                pl.mean("disagree").alias("disagree_rate"),
                pl.mean("is_helpful").alias("helpful_rate"),
                pl.mean("is_somewhat_helpful").alias("somewhat_helpful_rate"),
                pl.mean("is_not_helpful").alias("not_helpful_rate"),
                pl.mean("helpful_flags_sum").alias("avg_helpful_flags"),
                pl.mean("not_helpful_flags_sum").alias("avg_not_helpful_flags")
            ])
            .sort(["user_id", "period_start"])
            .collect(streaming=True)
        )

        chunk_results.append(agg)

        del df, agg
        gc.collect()

    con.close()

    if len(chunk_results) == 0:
        print(f"‚ö†Ô∏è  No valid rows in {file_name}")
        continue

    result = pl.concat(chunk_results, how="vertical_relaxed")
    out_path = os.path.join(OUTPUT_DIR, f"note_ratings_agg_{i:05d}.parquet")
    result.write_parquet(out_path)

    print(f"‚úÖ Saved {out_path} ({result.height:,} rows)")
    del result, chunk_results
    gc.collect()

print("\nüéâ All noteRatings chunks processed safely and saved!")


Processing noteRatings files:   0%|          | 0/20 [00:00<?, ?it/s]


üìÇ Processing noteRatings-00000.parquet ...
  Rows in file: 14,115,918
  üß© Batch 1 (2,000,000 rows)


  .collect(streaming=True)


  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  üß© Batch 7 (2,000,000 rows)
  üß© Batch 8 (115,918 rows)


Processing noteRatings files:   5%|‚ñå         | 1/20 [00:19<06:08, 19.41s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00000.parquet (1,720,180 rows)

üìÇ Processing noteRatings-00001.parquet ...
  Rows in file: 14,037,156
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  üß© Batch 7 (2,000,000 rows)
  üß© Batch 8 (37,156 rows)


Processing noteRatings files:  10%|‚ñà         | 2/20 [00:35<05:17, 17.63s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00001.parquet (1,724,554 rows)

üìÇ Processing noteRatings-00002.parquet ...
  Rows in file: 14,014,554
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  üß© Batch 7 (2,000,000 rows)
  üß© Batch 8 (14,554 rows)


Processing noteRatings files:  15%|‚ñà‚ñå        | 3/20 [00:52<04:49, 17.03s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00002.parquet (1,728,678 rows)

üìÇ Processing noteRatings-00003.parquet ...
  Rows in file: 14,020,408
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  üß© Batch 7 (2,000,000 rows)
  üß© Batch 8 (20,408 rows)


Processing noteRatings files:  20%|‚ñà‚ñà        | 4/20 [01:09<04:34, 17.13s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00003.parquet (1,730,174 rows)

üìÇ Processing noteRatings-00004.parquet ...
  Rows in file: 13,895,420
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  üß© Batch 7 (1,895,420 rows)


Processing noteRatings files:  25%|‚ñà‚ñà‚ñå       | 5/20 [01:24<04:06, 16.43s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00004.parquet (1,741,006 rows)

üìÇ Processing noteRatings-00005.parquet ...
  Rows in file: 13,879,060
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)
  üß© Batch 7 (1,879,060 rows)


Processing noteRatings files:  30%|‚ñà‚ñà‚ñà       | 6/20 [01:40<03:45, 16.12s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00005.parquet (1,744,671 rows)

üìÇ Processing noteRatings-00006.parquet ...
  Rows in file: 13,785,568
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)
  üß© Batch 7 (1,785,568 rows)


Processing noteRatings files:  35%|‚ñà‚ñà‚ñà‚ñå      | 7/20 [01:55<03:26, 15.86s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00006.parquet (1,744,438 rows)

üìÇ Processing noteRatings-00007.parquet ...
  Rows in file: 13,836,663
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  üß© Batch 7 (1,836,663 rows)


Processing noteRatings files:  40%|‚ñà‚ñà‚ñà‚ñà      | 8/20 [02:10<03:08, 15.71s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00007.parquet (1,740,474 rows)

üìÇ Processing noteRatings-00008.parquet ...
  Rows in file: 13,725,811
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)
  üß© Batch 7 (1,725,811 rows)


Processing noteRatings files:  45%|‚ñà‚ñà‚ñà‚ñà‚ñå     | 9/20 [02:26<02:51, 15.59s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00008.parquet (1,736,683 rows)

üìÇ Processing noteRatings-00009.parquet ...
  Rows in file: 13,771,655
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (2,000,000 rows)
  üß© Batch 3 (2,000,000 rows)
  üß© Batch 4 (2,000,000 rows)
  üß© Batch 5 (2,000,000 rows)
  üß© Batch 6 (2,000,000 rows)
  üß© Batch 7 (1,771,655 rows)


Processing noteRatings files:  50%|‚ñà‚ñà‚ñà‚ñà‚ñà     | 10/20 [02:41<02:35, 15.52s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00009.parquet (1,745,140 rows)

üìÇ Processing noteRatings-00010.parquet ...
  Rows in file: 3,495,037
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,495,037 rows)


Processing noteRatings files:  55%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå    | 11/20 [02:44<01:45, 11.75s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00010.parquet (374,449 rows)

üìÇ Processing noteRatings-00011.parquet ...
  Rows in file: 3,351,618
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,351,618 rows)


Processing noteRatings files:  60%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà    | 12/20 [02:47<01:12,  9.08s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00011.parquet (366,330 rows)

üìÇ Processing noteRatings-00012.parquet ...
  Rows in file: 3,346,634
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,346,634 rows)


Processing noteRatings files:  65%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå   | 13/20 [02:50<00:50,  7.23s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00012.parquet (363,176 rows)

üìÇ Processing noteRatings-00013.parquet ...
  Rows in file: 3,258,822
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,258,822 rows)


Processing noteRatings files:  70%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà   | 14/20 [02:53<00:35,  5.95s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00013.parquet (359,363 rows)

üìÇ Processing noteRatings-00014.parquet ...
  Rows in file: 3,300,477
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,300,477 rows)


Processing noteRatings files:  75%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå  | 15/20 [02:56<00:25,  5.04s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00014.parquet (348,012 rows)

üìÇ Processing noteRatings-00015.parquet ...
  Rows in file: 3,256,439
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,256,439 rows)


Processing noteRatings files:  80%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà  | 16/20 [02:59<00:17,  4.39s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00015.parquet (345,232 rows)

üìÇ Processing noteRatings-00016.parquet ...
  Rows in file: 3,349,365
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,349,365 rows)


Processing noteRatings files:  85%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå | 17/20 [03:02<00:11,  3.95s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00016.parquet (338,553 rows)

üìÇ Processing noteRatings-00017.parquet ...
  Rows in file: 3,245,473
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,245,473 rows)


Processing noteRatings files:  90%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà | 18/20 [03:05<00:07,  3.64s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00017.parquet (340,530 rows)

üìÇ Processing noteRatings-00018.parquet ...
  Rows in file: 3,089,033
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,089,033 rows)


Processing noteRatings files:  95%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå| 19/20 [03:08<00:03,  3.39s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00018.parquet (335,706 rows)

üìÇ Processing noteRatings-00019.parquet ...
  Rows in file: 3,102,687
  üß© Batch 1 (2,000,000 rows)
  üß© Batch 2 (1,102,687 rows)


Processing noteRatings files: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 20/20 [03:10<00:00,  9.54s/it]

‚úÖ Saved /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/note_ratings_agg_00019.parquet (342,156 rows)

üéâ All noteRatings chunks processed safely and saved!





In [2]:
import os
import polars as pl
import duckdb
from tqdm import tqdm
import gc
from datetime import datetime

# =========================================================
# CONFIGURATION
# =========================================================
os.environ["POLARS_MAX_THREADS"] = "6"

RATINGS_DIR = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings"
TEMP_DIR    = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/tmp_merge"
FINAL_PATH  = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_ratings_2023_aggregated.parquet"

os.makedirs(TEMP_DIR, exist_ok=True)
BATCH_SIZE  = 1_000_000  # adjustable
gc.collect()

# =========================================================
# FIND INPUT FILES
# =========================================================
files = sorted([
    os.path.join(RATINGS_DIR, f)
    for f in os.listdir(RATINGS_DIR)
    if f.endswith(".parquet") and f.startswith("note_ratings_agg_")
])

print(f"üì¶ Found {len(files)} aggregated rating files to merge")

# =========================================================
# STAGE 1: BATCHED INTERMEDIATE PARQUETS
# =========================================================
tmp_counter = 0
merged_batches = []

for fpath in tqdm(files, desc="Merging aggregated parquets", dynamic_ncols=True):
    print(f"\nüìÇ Reading {os.path.basename(fpath)} ...")

    con = duckdb.connect()
    total_rows = con.execute(f"SELECT COUNT(*) FROM '{fpath}'").fetchone()[0]
    print(f"  Rows in file: {total_rows:,}")

    offset = 0
    batch_id = 0

    while offset < total_rows:
        batch_id += 1
        query = f"SELECT * FROM '{fpath}' LIMIT {BATCH_SIZE} OFFSET {offset}"
        df = con.execute(query).pl()
        offset += BATCH_SIZE

        if df.is_empty():
            continue

        merged_batches.append(df)
        print(f"  üß© Added batch {batch_id} ({len(df):,} rows)")

        # Flush every ~10 batches to temporary parquet
        if len(merged_batches) >= 10:
            combined = pl.concat(merged_batches, how="vertical_relaxed")
            tmp_path = os.path.join(TEMP_DIR, f"merged_part_{tmp_counter:03d}.parquet")
            combined.write_parquet(tmp_path)
            print(f"  üíæ Flushed {combined.height:,} rows ‚Üí {tmp_path}")
            tmp_counter += 1
            del merged_batches[:]
            del combined
            gc.collect()

    con.close()
    gc.collect()

# Final flush for remaining batches
if merged_batches:
    combined = pl.concat(merged_batches, how="vertical_relaxed")
    tmp_path = os.path.join(TEMP_DIR, f"merged_part_{tmp_counter:03d}.parquet")
    combined.write_parquet(tmp_path)
    print(f"‚úÖ Final flush ({combined.height:,} rows) ‚Üí {tmp_path}")
    tmp_counter += 1
    del merged_batches[:]
    gc.collect()

print(f"\nüßæ Wrote {tmp_counter} intermediate parquet chunks to {TEMP_DIR}")

# =========================================================
# STAGE 2: FINAL CONSOLIDATION WITH DUCKDB
# =========================================================
print("\nüöÄ Consolidating all intermediate parts into final parquet...")

tmp_files = sorted([
    os.path.join(TEMP_DIR, f) for f in os.listdir(TEMP_DIR) if f.endswith(".parquet")
])

con = duckdb.connect()
query = f"""
COPY (
    SELECT * FROM read_parquet({tmp_files})
) TO '{FINAL_PATH}' (FORMAT PARQUET);
"""
con.execute(query)
con.close()

print(f"\nüéâ Successfully merged all aggregated files into:\n{FINAL_PATH}")


üì¶ Found 20 aggregated rating files to merge


Merging aggregated parquets:   0%|          | 0/20 [00:00<?, ?it/s]


üìÇ Reading note_ratings_agg_00000.parquet ...
  Rows in file: 1,720,180
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:   5%|‚ñå         | 1/20 [00:01<00:21,  1.12s/it]

  üß© Added batch 2 (720,180 rows)

üìÇ Reading note_ratings_agg_00001.parquet ...
  Rows in file: 1,724,554
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:  10%|‚ñà         | 2/20 [00:02<00:18,  1.04s/it]

  üß© Added batch 2 (724,554 rows)

üìÇ Reading note_ratings_agg_00002.parquet ...
  Rows in file: 1,728,678
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:  15%|‚ñà‚ñå        | 3/20 [00:03<00:17,  1.01s/it]

  üß© Added batch 2 (728,678 rows)

üìÇ Reading note_ratings_agg_00003.parquet ...
  Rows in file: 1,730,174
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:  20%|‚ñà‚ñà        | 4/20 [00:04<00:15,  1.01it/s]

  üß© Added batch 2 (730,174 rows)

üìÇ Reading note_ratings_agg_00004.parquet ...
  Rows in file: 1,741,006
  üß© Added batch 1 (1,000,000 rows)
  üß© Added batch 2 (741,006 rows)


Merging aggregated parquets:  25%|‚ñà‚ñà‚ñå       | 5/20 [00:09<00:37,  2.53s/it]

  üíæ Flushed 8,644,592 rows ‚Üí /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/tmp_merge/merged_part_000.parquet

üìÇ Reading note_ratings_agg_00005.parquet ...
  Rows in file: 1,744,671
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:  30%|‚ñà‚ñà‚ñà       | 6/20 [00:10<00:28,  2.01s/it]

  üß© Added batch 2 (744,671 rows)

üìÇ Reading note_ratings_agg_00006.parquet ...
  Rows in file: 1,744,438
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:  35%|‚ñà‚ñà‚ñà‚ñå      | 7/20 [00:11<00:21,  1.68s/it]

  üß© Added batch 2 (744,438 rows)

üìÇ Reading note_ratings_agg_00007.parquet ...
  Rows in file: 1,740,474
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:  40%|‚ñà‚ñà‚ñà‚ñà      | 8/20 [00:12<00:17,  1.46s/it]

  üß© Added batch 2 (740,474 rows)

üìÇ Reading note_ratings_agg_00008.parquet ...
  Rows in file: 1,736,683
  üß© Added batch 1 (1,000,000 rows)


Merging aggregated parquets:  45%|‚ñà‚ñà‚ñà‚ñà‚ñå     | 9/20 [00:13<00:14,  1.30s/it]

  üß© Added batch 2 (736,683 rows)

üìÇ Reading note_ratings_agg_00009.parquet ...
  Rows in file: 1,745,140
  üß© Added batch 1 (1,000,000 rows)
  üß© Added batch 2 (745,140 rows)
  üíæ Flushed 8,711,406 rows ‚Üí /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/tmp_merge/merged_part_001.parquet


Merging aggregated parquets:  50%|‚ñà‚ñà‚ñà‚ñà‚ñà     | 10/20 [00:17<00:23,  2.36s/it]


üìÇ Reading note_ratings_agg_00010.parquet ...
  Rows in file: 374,449


Merging aggregated parquets:  55%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå    | 11/20 [00:18<00:15,  1.75s/it]

  üß© Added batch 1 (374,449 rows)

üìÇ Reading note_ratings_agg_00011.parquet ...
  Rows in file: 366,330


Merging aggregated parquets:  60%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà    | 12/20 [00:18<00:10,  1.33s/it]

  üß© Added batch 1 (366,330 rows)

üìÇ Reading note_ratings_agg_00012.parquet ...
  Rows in file: 363,176


Merging aggregated parquets:  65%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå   | 13/20 [00:19<00:07,  1.03s/it]

  üß© Added batch 1 (363,176 rows)

üìÇ Reading note_ratings_agg_00013.parquet ...
  Rows in file: 359,363


Merging aggregated parquets:  70%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà   | 14/20 [00:19<00:04,  1.21it/s]

  üß© Added batch 1 (359,363 rows)

üìÇ Reading note_ratings_agg_00014.parquet ...
  Rows in file: 348,012


Merging aggregated parquets:  75%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå  | 15/20 [00:19<00:03,  1.47it/s]

  üß© Added batch 1 (348,012 rows)

üìÇ Reading note_ratings_agg_00015.parquet ...
  Rows in file: 345,232


Merging aggregated parquets:  80%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà  | 16/20 [00:20<00:02,  1.72it/s]

  üß© Added batch 1 (345,232 rows)

üìÇ Reading note_ratings_agg_00016.parquet ...
  Rows in file: 338,553


Merging aggregated parquets:  85%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå | 17/20 [00:20<00:01,  1.96it/s]

  üß© Added batch 1 (338,553 rows)

üìÇ Reading note_ratings_agg_00017.parquet ...
  Rows in file: 340,530


Merging aggregated parquets:  90%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà | 18/20 [00:20<00:00,  2.15it/s]

  üß© Added batch 1 (340,530 rows)

üìÇ Reading note_ratings_agg_00018.parquet ...
  Rows in file: 335,706


Merging aggregated parquets:  95%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå| 19/20 [00:21<00:00,  2.34it/s]

  üß© Added batch 1 (335,706 rows)

üìÇ Reading note_ratings_agg_00019.parquet ...
  Rows in file: 342,156
  üß© Added batch 1 (342,156 rows)


Merging aggregated parquets: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 20/20 [00:23<00:00,  1.15s/it]

  üíæ Flushed 3,513,507 rows ‚Üí /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/tmp_merge/merged_part_002.parquet

üßæ Wrote 3 intermediate parquet chunks to /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/ratings/tmp_merge

üöÄ Consolidating all intermediate parts into final parquet...





FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))


üéâ Successfully merged all aggregated files into:
/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_ratings_2023_aggregated.parquet


In [None]:
import os
import polars as pl
import duckdb
from datetime import datetime
from tqdm import tqdm
import gc

# =========================================================
# CONFIG
# =========================================================
os.environ["POLARS_MAX_THREADS"] = "6"

INPUT_PATH  = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_ratings_2023_aggregated.parquet"
TEMP_DIR    = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/tmp_reagg"
OUTPUT_PATH = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_ratings_2023_reaggregated.parquet"

os.makedirs(TEMP_DIR, exist_ok=True)
BATCH_SIZE  = 2_000_000  # adjust if needed
gc.collect()

# =========================================================
# STAGE 1 ‚Äî PARTIAL REAGGREGATION PER BATCH
# =========================================================
con = duckdb.connect()
total_rows = con.execute(f"SELECT COUNT(*) FROM '{INPUT_PATH}'").fetchone()[0]
print(f"üì¶ Total rows in file: {total_rows:,}")

offset = 0
batch_id = 0
tmp_paths = []

while offset < total_rows:
    batch_id += 1
    query = f"SELECT * FROM '{INPUT_PATH}' LIMIT {BATCH_SIZE} OFFSET {offset}"
    df = con.execute(query).pl()
    offset += BATCH_SIZE

    if df.is_empty():
        continue

    print(f"üß© Processing batch {batch_id} ({len(df):,} rows)")

    # --- Local groupby to shrink batch size ---
    agg = (
        df.lazy()
        .group_by(["user_id", "period_start"])
        .agg([
            pl.sum("ratings_given").alias("ratings_given"),
            pl.sum("notes_rated_unique").alias("notes_rated_unique"),
            pl.sum("tweets_rated_unique").alias("tweets_rated_unique"),
            pl.mean("agree_rate").alias("agree_rate"),
            pl.mean("disagree_rate").alias("disagree_rate"),
            pl.mean("helpful_rate").alias("helpful_rate"),
            pl.mean("somewhat_helpful_rate").alias("somewhat_helpful_rate"),
            pl.mean("not_helpful_rate").alias("not_helpful_rate"),
            pl.mean("avg_helpful_flags").alias("avg_helpful_flags"),
            pl.mean("avg_not_helpful_flags").alias("avg_not_helpful_flags")
        ])
        .sort(["user_id", "period_start"])
        .collect(streaming=True)
    )

    tmp_path = os.path.join(TEMP_DIR, f"reagg_part_{batch_id:03d}.parquet")
    agg.write_parquet(tmp_path)
    tmp_paths.append(tmp_path)

    print(f"  üíæ Wrote partial reagg ‚Üí {tmp_path} ({agg.height:,} rows)")

    del df, agg
    gc.collect()

con.close()

print(f"\nüßæ Wrote {len(tmp_paths)} temporary partial reaggregations to {TEMP_DIR}")

# =========================================================
# STAGE 2 ‚Äî FINAL MERGE & GLOBAL REAGGREGATION
# =========================================================
print("\nüöÄ Merging partial reaggregations into final dataset ...")

con = duckdb.connect()
query = f"""
COPY (
    SELECT
        user_id,
        period_start,
        SUM(ratings_given) AS ratings_given,
        SUM(notes_rated_unique) AS notes_rated_unique,
        SUM(tweets_rated_unique) AS tweets_rated_unique,
        AVG(agree_rate) AS agree_rate,
        AVG(disagree_rate) AS disagree_rate,
        AVG(helpful_rate) AS helpful_rate,
        AVG(somewhat_helpful_rate) AS somewhat_helpful_rate,
        AVG(not_helpful_rate) AS not_helpful_rate,
        AVG(avg_helpful_flags) AS avg_helpful_flags,
        AVG(avg_not_helpful_flags) AS avg_not_helpful_flags
    FROM read_parquet({tmp_paths})
    GROUP BY user_id, period_start
) TO '{OUTPUT_PATH}' (FORMAT PARQUET);
"""
con.execute(query)
con.close()

print(f"\nüéâ Global reaggregation complete!\nFinal file saved to:\n{OUTPUT_PATH}")

In [1]:
import os, gc, duckdb, polars as pl
from tqdm import tqdm

os.environ["POLARS_MAX_THREADS"] = "6"

TEMP_DIR    = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/tmp_reagg"
OUTPUT_PATH = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_ratings_2023_reaggregated.parquet"

# All partial parquet paths
tmp_files = sorted([
    os.path.join(TEMP_DIR, f) for f in os.listdir(TEMP_DIR)
    if f.endswith(".parquet")
])

print(f"üßæ Found {len(tmp_files)} partial aggregates")

# ---------------------------------------------------------
# 1Ô∏è‚É£  Incremental merge in small groups to limit memory
# ---------------------------------------------------------
INTERMEDIATE = os.path.join(TEMP_DIR, "stage2_parts")
os.makedirs(INTERMEDIATE, exist_ok=True)
group_size = 5               # merge 5 temp files at a time
stage2_parts = []

for i in tqdm(range(0, len(tmp_files), group_size), desc="Stage-2 partial merges"):
    subset = tmp_files[i:i+group_size]
    out_path = os.path.join(INTERMEDIATE, f"stage2_{i//group_size:03d}.parquet")

    con = duckdb.connect()
    # Force external processing and temp directory for spills
    con.execute(f"PRAGMA memory_limit='2GB';")
    con.execute(f"PRAGMA temp_directory='{TEMP_DIR}';")

    query = f"""
    COPY (
        SELECT
            user_id,
            period_start,
            SUM(ratings_given) AS ratings_given,
            SUM(notes_rated_unique) AS notes_rated_unique,
            SUM(tweets_rated_unique) AS tweets_rated_unique,
            AVG(agree_rate) AS agree_rate,
            AVG(disagree_rate) AS disagree_rate,
            AVG(helpful_rate) AS helpful_rate,
            AVG(somewhat_helpful_rate) AS somewhat_helpful_rate,
            AVG(not_helpful_rate) AS not_helpful_rate,
            AVG(avg_helpful_flags) AS avg_helpful_flags,
            AVG(avg_not_helpful_flags) AS avg_not_helpful_flags
        FROM read_parquet({subset})
        GROUP BY user_id, period_start
    ) TO '{out_path}' (FORMAT PARQUET);
    """
    con.execute(query)
    con.close()

    stage2_parts.append(out_path)
    gc.collect()

print(f"‚úÖ Stage-2 wrote {len(stage2_parts)} mid-level files")

# ---------------------------------------------------------
# 2Ô∏è‚É£  Final consolidation (tiny set now)
# ---------------------------------------------------------
print("üöÄ Running final external aggregation ...")
con = duckdb.connect()
con.execute(f"PRAGMA memory_limit='2GB';")
con.execute(f"PRAGMA temp_directory='{TEMP_DIR}';")

query = f"""
COPY (
    SELECT
        user_id,
        period_start,
        SUM(ratings_given) AS ratings_given,
        SUM(notes_rated_unique) AS notes_rated_unique,
        SUM(tweets_rated_unique) AS tweets_rated_unique,
        AVG(agree_rate) AS agree_rate,
        AVG(disagree_rate) AS disagree_rate,
        AVG(helpful_rate) AS helpful_rate,
        AVG(somewhat_helpful_rate) AS somewhat_helpful_rate,
        AVG(not_helpful_rate) AS not_helpful_rate,
        AVG(avg_helpful_flags) AS avg_helpful_flags,
        AVG(avg_not_helpful_flags) AS avg_not_helpful_flags
    FROM read_parquet({stage2_parts})
    GROUP BY user_id, period_start
) TO '{OUTPUT_PATH}' (FORMAT PARQUET);
"""
con.execute(query)
con.close()

print(f"üéâ Final reaggregation complete ‚Üí {OUTPUT_PATH}")


üßæ Found 11 partial aggregates


Stage-2 partial merges:   0%|          | 0/3 [00:00<?, ?it/s]

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Stage-2 partial merges:  33%|‚ñà‚ñà‚ñà‚ñé      | 1/3 [00:34<01:09, 34.55s/it]

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Stage-2 partial merges: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 3/3 [01:02<00:00, 20.93s/it]


‚úÖ Stage-2 wrote 3 mid-level files
üöÄ Running final external aggregation ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üéâ Final reaggregation complete ‚Üí /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/note_ratings_2023_reaggregated.parquet


In [5]:
import os
import polars as pl
import duckdb
from datetime import datetime
from tqdm import tqdm
import gc

# =========================================================
# CONFIG
# =========================================================
os.environ["POLARS_MAX_THREADS"] = "6"


INPUT_PATH = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/aggregates/note_ratings_2023_reaggregated.parquet"

con = duckdb.connect()
total_rows = con.execute(f"SELECT COUNT(1) FROM '{INPUT_PATH}'").fetchone()[0]
print(f"üì¶ Total rows in file: {total_rows:,}")

üì¶ Total rows in file: 20,869,319


In [2]:
import os, gc, duckdb
from tqdm import tqdm

os.environ["POLARS_MAX_THREADS"] = "6"

BASE_DIR = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/aggregates"
RATINGS  = os.path.join(BASE_DIR, "note_ratings_2023_reaggregated.parquet")
NOTES    = os.path.join(BASE_DIR, "notes_writers_2023_aggregated.parquet")
STATUS   = os.path.join(BASE_DIR, "note_status_history_2023_aggregated.parquet")
ENROLL   = os.path.join(BASE_DIR, "user_enrollment_2023_aggregated.parquet")
REQUESTS = os.path.join(BASE_DIR, "note_requests_2023_aggregated.parquet")

TEMP_DIR  = os.path.join(BASE_DIR, "super_merge_tmp2")
os.makedirs(TEMP_DIR, exist_ok=True)

MEM_LIMIT_GB = 2
BATCH_SIZE   = 2_000_000   # tune down if memory tight
part_files   = []

con = duckdb.connect()
total_rows = con.execute(f"SELECT COUNT(*) FROM '{RATINGS}'").fetchone()[0]
con.close()
print(f"Total ratings rows: {total_rows:,}")

offset = 0
batch_idx = 0

while offset < total_rows:
    batch_idx += 1
    out_path = os.path.join(TEMP_DIR, f"merged_part_{batch_idx:03d}.parquet")

    con = duckdb.connect()
    con.execute(f"PRAGMA memory_limit='{MEM_LIMIT_GB}GB';")
    con.execute(f"PRAGMA temp_directory='{TEMP_DIR}';")

    query = f"""
    COPY (
        WITH r AS (
            SELECT * FROM read_parquet('{RATINGS}')
            LIMIT {BATCH_SIZE} OFFSET {offset}
        )
        SELECT 
            r.*,
            n.* EXCLUDE (user_id, period_start),
            s.* EXCLUDE (user_id, period_start),
            e.* EXCLUDE (user_id, period_start),
            q.* EXCLUDE (user_id, period_start)
        FROM r
        LEFT JOIN read_parquet('{NOTES}')   n USING (user_id, period_start)
        LEFT JOIN read_parquet('{STATUS}')  s USING (user_id, period_start)
        LEFT JOIN read_parquet('{ENROLL}')  e USING (user_id, period_start)
        LEFT JOIN read_parquet('{REQUESTS}') q USING (user_id, period_start)
    ) TO '{out_path}' (FORMAT PARQUET);
    """
    print(f"üß© Joining ratings batch {batch_idx} (offset {offset:,}) ...")
    con.execute(query)
    con.close()

    part_files.append(out_path)
    offset += BATCH_SIZE
    gc.collect()

print(f"‚úÖ Wrote {len(part_files)} joined chunks to {TEMP_DIR}")

# -------- final consolidation (external mode) --------
print("üöÄ Consolidating all joined parts...")
con = duckdb.connect()
con.execute(f"PRAGMA memory_limit='{MEM_LIMIT_GB}GB';")
con.execute(f"PRAGMA temp_directory='{TEMP_DIR}';")
final_path = os.path.join(BASE_DIR, "user_period_master.parquet")
con.execute(f"""
COPY (
  SELECT * FROM read_parquet({part_files})
) TO '{final_path}' (FORMAT PARQUET);
""")
con.close()

print(f"üéâ Master dataset ready ‚Üí {final_path}")


Total ratings rows: 20,869,319
üß© Joining ratings batch 1 (offset 0) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 2 (offset 2,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 3 (offset 4,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 4 (offset 6,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 5 (offset 8,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 6 (offset 10,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 7 (offset 12,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 8 (offset 14,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 9 (offset 16,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 10 (offset 18,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üß© Joining ratings batch 11 (offset 20,000,000) ...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Wrote 11 joined chunks to /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/aggregates/super_merge_tmp2
üöÄ Consolidating all joined parts...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

üéâ Master dataset ready ‚Üí /home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/aggregates/user_period_master.parquet


In [1]:
import duckdb

path = "/home/jovyan/Shared/project1-group1/info-470-project-1/langdata/time-series-core/aggregates/user_period_master.parquet"

con = duckdb.connect()

# Read the parquet into DuckDB
df = con.execute(f"SELECT * FROM read_parquet('{path}') LIMIT 0").fetchdf()

# Show columns
print(df.columns)

Index(['user_id', 'period_start', 'ratings_given', 'notes_rated_unique',
       'tweets_rated_unique', 'agree_rate', 'disagree_rate', 'helpful_rate',
       'somewhat_helpful_rate', 'not_helpful_rate', 'avg_helpful_flags',
       'avg_not_helpful_flags', 'notes_written', 'num_misleading',
       'num_not_misleading', 'avg_trustworthySources', 'media_note_ratio',
       'misleading_flag_sum', 'not_misleading_flag_sum', 'share_misleading',
       'avg_flags_per_note', 'notes_with_status', 'notes_helpful',
       'notes_not_helpful', 'notes_nmr', 'notes_locked', 'first_helpful',
       'latest_helpful', 'latest_not_helpful', 'avg_days_to_first_nonNMR',
       'avg_days_to_lock', 'share_helpful', 'share_not_helpful', 'records',
       'avg_successfulRatingNeededToEarnIn', 'avg_days_since_state_change',
       'avg_days_since_earnout', 'is_new_user', 'is_earned_in', 'is_at_risk',
       'is_earned_out', 'has_ever_earned_out', 'is_core_population',
       'avg_modelingGroup', 'notes_requeste