In [2]:
# If you want the nice notebook progress bars, ensure ipywidgets is installed once:
!pip -q install ipywidgets

In [3]:
from pathlib import Path

def find_project_root(start: Path) -> Path:
    """Walk up until we find a folder containing either `.git` or both `data` and `src`."""
    start = start.resolve()
    for p in [start, *start.parents]:
        if (p/".git").exists() or ((p/"data").exists() and (p/"src").exists()):
            return p
    # fallback: if we're inside src/, use its parent
    if start.name == "src":
        return start.parent
    if start.parent.name == "src":
        return start.parent.parent
    return start  # last resort

# where am i?
NB_DIR = Path.cwd().resolve()
PROJ   = find_project_root(NB_DIR)
DATA   = PROJ / "data"
OUT    = PROJ / "outputs"
OUT.mkdir(exist_ok=True, parents=True)

print("Notebook dir:", NB_DIR)
print("Project root:", PROJ)
print("Data dir    :", DATA, "exists:", DATA.exists())
print("Outputs dir :", OUT,  "exists:", OUT.exists())

Notebook dir: E:\Univer\1A\NLP\YT Title\yt-popularity\src\yt_popularity
Project root: E:\Univer\1A\NLP\YT Title\yt-popularity
Data dir    : E:\Univer\1A\NLP\YT Title\yt-popularity\data exists: True
Outputs dir : E:\Univer\1A\NLP\YT Title\yt-popularity\outputs exists: True


In [4]:
# prefer exact filenames; if you use slightly different names, glob as a fallback
RAW_VIDEOS   = DATA / "yt_metadata_en.jsonl.gz"
RAW_CHANNELS = DATA / "df_channels_en.tsv.gz"

# fallback using glob if the exact names aren't found
if not RAW_VIDEOS.exists():
    matches = sorted(DATA.glob("yt_metadata_en.jsonl*"))
    assert matches, "Couldn't find yt_metadata_en.jsonl.* in data/"
    RAW_VIDEOS = matches[0]

if not RAW_CHANNELS.exists():
    matches = sorted(DATA.glob("df_channels_en.tsv*"))
    assert matches, "Couldn't find df_channels_en.tsv.* in data/"
    RAW_CHANNELS = matches[0]

print("RAW_VIDEOS  :", RAW_VIDEOS)
print("RAW_CHANNELS:", RAW_CHANNELS)

RAW_VIDEOS  : E:\Univer\1A\NLP\YT Title\yt-popularity\data\yt_metadata_en.jsonl.gz
RAW_CHANNELS: E:\Univer\1A\NLP\YT Title\yt-popularity\data\df_channels_en.tsv.gz


In [5]:
import warnings, re, unicodedata, pandas as pd, numpy as np
warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning)

from tqdm.auto import tqdm
tqdm.pandas()  # enables df.progress_apply

In [6]:
TOP_P      = 0.90       # top-10% label
CHUNKSIZE  = 8000_000    # tune for RAM
TIME_GRAIN = "M"        # "M" month, will auto-fallback per cohort to "Q"
MIN_COHORT = 1000       # below this, use quarter instead of month

In [7]:
# text cleaning helpers
import re, unicodedata

URL_RE  = re.compile(r"https?://\S+|www\.\S+")
AT_RE   = re.compile(r"@\w+")
HASH_RE = re.compile(r"#\w+")

def clean_minimal(text: str) -> str:
    t = unicodedata.normalize("NFKC", str(text))
    t = URL_RE.sub("<URL>", t)
    t = AT_RE.sub("<USER>", t)
    t = HASH_RE.sub("<HASH>", t)
    return re.sub(r"\s+", " ", t).strip()

def clean_for_tfidf(text: str) -> str:
    t = clean_minimal(text).lower()
    t = re.sub(r"[^\w\s<>]", " ", t)
    return re.sub(r"\s+", " ", t).strip()

def first_category(x):
    # categories can be a list or a stringified list or a plain string
    if isinstance(x, list) and x:
        return x[0]
    if isinstance(x, str) and x.startswith("["):
        import ast
        try:
            lst = ast.literal_eval(x)
            return lst[0] if lst else "Unknown"
        except Exception:
            return "Unknown"
    return x if isinstance(x, str) and x else "Unknown"


In [8]:
# df_channels_en.tsv.gz has channel subscribers
usecols = ["channel", "subscribers_cc"]
chan = pd.read_csv(RAW_CHANNELS, sep="\t", usecols=usecols)
chan = chan.rename(columns={"channel": "channel_id"})
chan["channel_id"] = chan["channel_id"].astype("string")
chan.head(3)


Unnamed: 0,channel_id,subscribers_cc
0,UC-lHJZR3Gqxm24_Vd_AJ5Yw,101000000
1,UCbCmjCuTUZos6Inko4u57UQ,60100000
2,UCpEhnqL0y41EpW2TvWAHD7Q,56018869


In [9]:
# install once if needed
!pip -q install duckdb

In [18]:
SHARDS = OUT / "stage1_shards"
SHARDS.mkdir(parents=True, exist_ok=True)

keep_cols = ["display_id","title","categories","channel_id",
             "upload_date","crawl_date","view_count"]

def parse_dt(x):
    return pd.to_datetime(x, errors="coerce", utc=True)

def make_size_bin(df):
    subs = df["subscribers_cc"].fillna(-1)
    has  = subs >= 0
    df["size_bin"] = -1
    if has.any():
        df.loc[has, "size_bin"] = pd.qcut(
            subs[has], q=10, labels=False, duplicates="drop"
        )
    return df

# write skinny shards: one Parquet per chunk
shard_idx = 0
for chunk in tqdm(
    pd.read_json(RAW_VIDEOS, lines=True, chunksize=CHUNKSIZE,
                 dtype={"channel_id":"string"}, convert_dates=False),
    desc="Stage 1: extracting shards"
):
    chunk = chunk[[c for c in keep_cols if c in chunk.columns]]
    chunk = chunk.dropna(subset=["title","view_count","upload_date","crawl_date"])

    # dates, age, vpd
    chunk["upload_date"] = chunk["upload_date"].map(parse_dt)
    chunk["crawl_date"]  = chunk["crawl_date"].map(parse_dt)
    chunk = chunk[chunk["upload_date"].notna() & chunk["crawl_date"].notna()]
    age = (chunk["crawl_date"] - chunk["upload_date"]).dt.days.clip(lower=1)
    vpd = (chunk["view_count"] / age.replace(0,1))
    chunk["vpd"] = vpd.clip(upper=vpd.quantile(0.999))

    # cohorts
    chunk = chunk.merge(chan, on="channel_id", how="left")
    chunk = make_size_bin(chunk)
    ud_naive = chunk["upload_date"].dt.tz_localize(None)
    chunk["upload_period"] = ud_naive.dt.strftime("%Y-%m")  # month as string
    counts = chunk.groupby(["categories","size_bin","upload_period"])["vpd"].transform("size")
    need_q = counts < MIN_COHORT
    if need_q.any():
        chunk.loc[need_q, "upload_period"] = ud_naive[need_q].dt.to_period("Q").astype(str)

    # normalize category to single string
    chunk["category_1"] = chunk["categories"].map(first_category)

    # title variants
    chunk["title_min"]   = chunk["title"].map(clean_minimal)
    chunk["title_tfidf"] = chunk["title"].map(clean_for_tfidf)

    skinny = chunk[[
        "display_id","title","title_min","title_tfidf",
        "vpd","category_1","size_bin","upload_period"
    ]].copy()

    shard_path = SHARDS / f"part_{shard_idx:05d}.parquet"
    skinny.to_parquet(shard_path, index=False)
    shard_idx += 1

len(list(SHARDS.glob("part_*.parquet")))


Stage 1: extracting shards: 0it [00:00, ?it/s]

17

In [None]:
import duckdb

db = duckdb.connect(database=":memory:")

db.sql(f"""
    CREATE OR REPLACE TABLE vids AS
    SELECT *
    FROM read_parquet('{(SHARDS/'part_*.parquet').as_posix()}');
""")

# compute percent_rank within each cohort
db.sql("""
    CREATE OR REPLACE TABLE labeled AS
    SELECT
        display_id, title, title_min, title_tfidf,
        vpd, category_1, size_bin, upload_period,
        percent_rank() OVER (
            PARTITION BY category_1, size_bin, upload_period
            ORDER BY vpd
        ) AS pctl
    FROM vids;
""")

# # label = top-10% (>= TOP_P)
# db.sql("CREATE OR REPLACE TABLE final AS SELECT *, (pctl >= ?)::INT AS label FROM labeled;", params=[TOP_P])

# # sanity peek
# db.sql("SELECT COUNT(*) AS n, AVG(label)::DOUBLE AS pos_rate FROM final;").df()


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

TypeError: sql(): incompatible function arguments. The following argument types are supported:
    1. (self: _duckdb.DuckDBPyConnection, query: object, *, alias: str = '', params: object = None) -> _duckdb.DuckDBPyRelation

Invoked with: <_duckdb.DuckDBPyConnection object at 0x0000026A5FD78F70>, 'CREATE OR REPLACE TABLE final AS SELECT *, (pctl >= ?)::INT AS label FROM labeled;', [0.9]

In [20]:
# label = top-10% (>= TOP_P)
db.sql("CREATE OR REPLACE TABLE final AS SELECT *, (pctl >= ?)::INT AS label FROM labeled;", params=[TOP_P])

# sanity peek
db.sql("SELECT COUNT(*) AS n, AVG(label)::DOUBLE AS pos_rate FROM final;").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,n,pos_rate
0,72924140,0.100069


In [None]:
# write to out
LABELED = OUT / "labeled_all.parquet"
db.sql(f"COPY final TO '{LABELED.as_posix()}' (FORMAT PARQUET);")

In [9]:
# read back with pandas to split
df = pd.read_parquet(LABELED, engine="pyarrow")
print(df.shape, round(df['label'].mean(), 3))

from sklearn.model_selection import train_test_split
train_df, temp_df = train_test_split(df, test_size=0.30, stratify=df["label"], random_state=42)
val_df, test_df   = train_test_split(temp_df, test_size=0.50, stratify=temp_df["label"], random_state=42)

SPLITS = OUT / "splits"
SPLITS.mkdir(parents=True, exist_ok=True)
train_df.to_parquet(SPLITS/"train.parquet", index=False)
val_df.to_parquet(  SPLITS/"val.parquet",   index=False)
test_df.to_parquet( SPLITS/"test.parquet",  index=False)

manifest = {
    "top_percentile": TOP_P,
    "time_grain": "month_with_fallback_to_quarter",
    "min_cohort": MIN_COHORT,
    "rows_total": int(df.shape[0]),
    "rows_train": int(train_df.shape[0]),
    "rows_val":   int(val_df.shape[0]),
    "rows_test":  int(test_df.shape[0]),
}
pd.Series(manifest).to_json(SPLITS/"manifest.json", indent=2)
manifest


ArrowMemoryError: malloc of size 17179869184 failed

In [19]:
#Remove videos before 2017

import duckdb
from pathlib import Path

IN  = Path(r"labeled_all.parquet")         # <-- put your path
OUT = Path(r"labeled_2017plus.parquet")  # new file

db = duckdb.connect(database=":memory:")

db.sql(f"""
  COPY (
    SELECT *
    FROM read_parquet('{IN.as_posix()}')
    WHERE TRY_CAST(substr(upload_period, 1, 4) AS INTEGER) >= 2017
  ) TO '{OUT.as_posix()}' (FORMAT PARQUET);
""")

# (Optional) quick sanity counts (tiny results, safe to fetch)
print(db.sql(f"SELECT COUNT(*) AS n_all FROM read_parquet('{IN.as_posix()}')").df())
print(db.sql(f"SELECT COUNT(*) AS n_2017p FROM read_parquet('{OUT.as_posix()}')").df())

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

      n_all
0  72924140
    n_2017p
0  40485011


In [None]:
 # most recent year
recent_year = db.sql(f"""
    SELECT MAX(CAST(substr(upload_period,1,4) AS INTEGER)) AS most_recent_year
    FROM read_parquet('{OUT.as_posix()}');
""").df()

print("Most recent year:\n", recent_year)

Most recent year:
    most_recent_year
0              2019

Number of unique categories:
    n_categories
0            15


In [23]:
n_2019 = db.sql(f"""
    SELECT COUNT(*) AS n_2019
    FROM read_parquet('{OUT.as_posix()}')
    WHERE CAST(substr(upload_period, 1, 4) AS INTEGER) = 2019;
""").df()

print(n_2019)

     n_2019
0  12723002


In [24]:
categories = db.sql(f"""
    SELECT DISTINCT category_1
    FROM read_parquet('{OUT.as_posix()}')
    WHERE category_1 IS NOT NULL AND category_1 <> 'Unknown'
    ORDER BY category_1;
""").df()

print(f"Number of categories: {len(categories)}\n")
print("Category names:")
for name in categories["category_1"]:
    print("-", name)

Number of categories: 15

Category names:
- Autos & Vehicles
- Comedy
- Education
- Entertainment
- Film & Animation
- Gaming
- Howto & Style
- Music
- News & Politics
- Nonprofits & Activism
- People & Blogs
- Pets & Animals
- Science & Technology
- Sports
- Travel & Events


In [28]:
YEAR = 2017
where_clauses = [
    "category_1 IS NOT NULL",
    "category_1 <> 'Unknown'",
]
if YEAR is not None:
    where_clauses.append(f"CAST(substr(upload_period,1,4) AS INTEGER) = {YEAR}")

where_sql = " AND ".join(where_clauses)

# Get totals and per-category counts
total_df = db.sql(f"""
    SELECT COUNT(*) AS n
    FROM read_parquet('{OUT.as_posix()}')
    WHERE {where_sql};
""").df()

by_cat_df = db.sql(f"""
    SELECT
        category_1 AS category,
        COUNT(*)   AS n
    FROM read_parquet('{OUT.as_posix()}')
    WHERE {where_sql}
    GROUP BY category_1
    ORDER BY n DESC;
""").df()

scope = f"in {YEAR}" if YEAR is not None else "overall (2017+ subset)"
print(f"Total videos {scope}: {total_df.loc[0, 'n']:,}\n")

print("Videos per category:")
for _, row in by_cat_df.iterrows():
    print(f"- {row['category']} — {row['n']:,}")

Total videos in 2017: 12,486,216

Videos per category:
- Gaming — 2,478,332
- Entertainment — 2,176,253
- People & Blogs — 1,411,465
- Music — 1,320,469
- News & Politics — 1,290,147
- Howto & Style — 702,145
- Sports — 687,901
- Education — 659,990
- Film & Animation — 430,470
- Science & Technology — 390,602
- Autos & Vehicles — 345,724
- Comedy — 212,663
- Travel & Events — 160,395
- Nonprofits & Activism — 118,499
- Pets & Animals — 101,161


In [None]:
OUT_FILE  = Path(r"labeled_2019.parquet")
THRESHOLD = 300_000  # min videos per category
db.sql(f"""
    CREATE OR REPLACE VIEW v2019 AS
    SELECT *
    FROM read_parquet('{OUT.as_posix()}')
    WHERE TRY_CAST(substr(upload_period,1,4) AS INTEGER) = 2019;
""")

# Find categories meeting the threshold (excluding 'Music')
db.sql(f"""
    CREATE OR REPLACE TABLE eligible AS
    SELECT category_1, COUNT(*) AS n
    FROM v2019
    WHERE category_1 IS NOT NULL AND category_1 <> 'Music'
    GROUP BY category_1
    HAVING COUNT(*) >= {THRESHOLD};
""")

# Keep only rows in eligible categories (and not 'Music'), then write out
db.sql(f"""
    COPY (
      SELECT v.*
      FROM v2019 v
      INNER JOIN eligible e
      ON v.category_1 = e.category_1
    ) TO '{OUT_FILE.as_posix()}'
    (FORMAT PARQUET);
""")

# summary 
before = db.sql("SELECT COUNT(*) AS n FROM v2019;").df().iloc[0,0]
after  = db.sql(f"SELECT COUNT(*) AS n FROM read_parquet('{OUT_FILE.as_posix()}');").df().iloc[0,0]
cats   = db.sql(f"SELECT category_1, n FROM eligible ORDER BY n DESC;").df()

print(f"Rows in 2019 before filtering: {before:,}")
print(f"Rows kept after filtering:     {after:,}")
print(f"Eligible categories (>= {THRESHOLD:,} vids, excluding 'Music'):")
for _, r in cats.iterrows():
    print(f"- {r['category_1']} — {int(r['n']):,}")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Rows in 2019 before filtering: 12,723,002
Rows kept after filtering:     10,866,563
Eligible categories (>= 300,000 vids, excluding 'Music'):
- News & Politics — 2,566,824
- Entertainment — 2,226,959
- Gaming — 2,014,444
- People & Blogs — 1,107,551
- Sports — 683,717
- Education — 604,234
- Howto & Style — 594,860
- Film & Animation — 392,300
- Science & Technology — 364,092
- Autos & Vehicles — 311,582


In [10]:
latest = Path('labeled_2019.parquet')
src = latest.as_posix()
db = duckdb.connect(database=":memory:")

In [29]:
# one example 
example = db.sql(f"""
    SELECT *
    FROM read_parquet('{src}')
    ORDER BY random()
    LIMIT 1;
""").df()

for col, val in example.items():
    print(f"{col}:\n  {val}\n")

display_id:
  0    Wf66Rb6fE8w
Name: display_id, dtype: object

title:
  0    OMG! Salman Khan's Being Human CEO Controversy...
Name: title, dtype: object

title_min:
  0    OMG! Salman Khan's Being Human CEO Controversy...
Name: title_min, dtype: object

title_tfidf:
  0    omg salman khan s being human ceo controversy ...
Name: title_tfidf, dtype: object

vpd:
  0    2.753304
Name: vpd, dtype: float64

category_1:
  0    Entertainment
Name: category_1, dtype: object

size_bin:
  0    7
Name: size_bin, dtype: int64

upload_period:
  0    2019-03
Name: upload_period, dtype: object

pctl:
  0    0.223648
Name: pctl, dtype: float64

label:
  0    0
Name: label, dtype: int32



In [12]:
!pip install langid



In [None]:
import duckdb, langid, pyarrow as pa
import pandas as pd
from pathlib import Path

src  = Path(r"labeled_2019.parquet").as_posix()
dest = Path(r"labeled_all_english.parquet").as_posix()

# clean up old table
db.sql("DROP TABLE IF EXISTS keep_ids;")



rel = db.sql(f"""
    SELECT display_id, title
    FROM read_parquet('{src}')
    WHERE title IS NOT NULL AND length(title) >= 3
""")

obj = rel.arrow()
reader = pa.RecordBatchReader.from_batches(obj.schema, obj.to_batches()) if isinstance(obj, pa.Table) else obj

def is_english(t: str) -> bool:
    lang, score = langid.classify(t.replace("\n", " "))
    return lang == "en"


keep_ids = []
for batch in reader:
    tbl = pa.Table.from_batches([batch]) if isinstance(batch, pa.RecordBatch) else batch
    for vid, t in zip(tbl.column("display_id").to_pylist(),
                      tbl.column("title").to_pylist()):
        if t and is_english(t):
            keep_ids.append(vid)

print(f"kept english titles: {len(keep_ids):,}")

if not keep_ids:
    raise RuntimeError("No rows passed the English filter — relax the predicate or check the input.")

# register kept IDs and write filtered parquet
keep_df = pd.DataFrame({"display_id": keep_ids})
db.register("keep_ids_df", keep_df)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

kept english titles: 7,701,389


<_duckdb.DuckDBPyConnection at 0x247183db5b0>

In [None]:
dest = Path(r"labeled_2019_english.parquet").as_posix()

db.sql(f"""
  COPY (
    SELECT v.*
    FROM read_parquet('{src}') AS v
    JOIN keep_ids_df USING(display_id)
  ) TO '{dest}' (FORMAT PARQUET);
""")

print(db.sql(f"SELECT COUNT(*) AS n FROM read_parquet('{dest}')").df())

         n
0  7701389


In [34]:
# one example 
example = db.sql(f"""
  SELECT display_id, title
  FROM read_parquet('{dest}')
  ORDER BY random()
  LIMIT 3
""").df()

for col, val in example.items():
    print(f"{col}:\n  {val}\n")

display_id:
  0    zrEEvp3wI7o
1    bTBgTeCQ1B4
2    sDa1IkBoDCY
Name: display_id, dtype: object

title:
  0    SAINA NEHWAL & MEENAKSHI DIXITON RAMP FOR KANC...
1             How To Draw The Loot Llama From Fortnite
2             Ping G400 LST vs Ping G410+ | TEST DRIVE
Name: title, dtype: object



In [None]:
# number of cohorts
n_coh = db.sql(f"""
    SELECT COUNT(*) AS n_cohorts
    FROM (
        SELECT DISTINCT category_1, size_bin, upload_period
        FROM read_parquet('{dest}')
    )
""").df().iloc[0, 0]

print(f"Number of cohorts: {n_coh:,}\n")

# example
ex = db.sql(f"""
    SELECT *
    FROM read_parquet('{dest}')
    ORDER BY random()
    LIMIT 1
""").df()

if ex.empty:
    print("No rows found in the dataset.")
else:
    row = ex.iloc[0].to_dict()
    print("Example video:\n" + "="*40)
    for k, v in row.items():
        print(f"{k}:\n  {v}\n")

Number of cohorts: 859

Example video:
display_id:
  jxe6sjvWtP4

title:
  Maibatsu Revolution SG-RX Customization & Showcase - Grand Theft Auto 5 Mods

title_min:
  Maibatsu Revolution SG-RX Customization & Showcase - Grand Theft Auto 5 Mods

title_tfidf:
  maibatsu revolution sg rx customization showcase grand theft auto 5 mods

vpd:
  14.27946127946128

category_1:
  Gaming

size_bin:
  6

upload_period:
  2019-01

pctl:
  0.31518296638302745

label:
  0



In [None]:
FILE_IN  = Path("labeled_2019_english.parquet").as_posix()        
FILE_OUT = Path("labeled_2019_english_reranked.parquet").as_posix() # output with new pctl label
TOP_P = 0.80  # top 20


db.sql(f"CREATE OR REPLACE VIEW en AS SELECT * FROM read_parquet('{FILE_IN}');")

# Recompute percentile within cohorts and write out
db.sql("""
  CREATE OR REPLACE TABLE en_labeled AS
  SELECT
    display_id, title, title_min, title_tfidf,
    vpd, category_1, size_bin, upload_period,
    percent_rank() OVER (
      PARTITION BY category_1, size_bin, upload_period
      ORDER BY vpd NULLS LAST
    ) AS pctl
  FROM en
  WHERE vpd IS NOT NULL;
""")

db.sql("""
  CREATE OR REPLACE TABLE en_final AS
  SELECT *,
         CAST(pctl >= ? AS INTEGER) AS label
  FROM en_labeled;
""", params=[TOP_P])

db.sql(f"COPY en_final TO '{FILE_OUT}' (FORMAT PARQUET);")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))