## Glamlytics: L'Oréal MYSG x Monash Datathon CommentSense Analyser

In [None]:
# This code was written by Leticia Lariche for the Glamlytics 
# Commentsense submission under the L'Oréal MYSG x Monash Datathon

# Code Reuse Acknowledgement: Some code from my Monash Unit ECE4179 assignments 
# were used in this notebook, as the unit touches on neural networks and concepts used here.

# AI Use Acknowledgement: ChatGPT was used to brainstorm ideas and debug functions. 
# Data was never uploaded to the site to prevent confidentiality breaches, 
# though all the data are publically available metrics.

# Outside Source Acknowledgement: DistilBERT was used in training several models: 
# https://huggingface.co/docs/transformers/en/model_doc/distilbert

In [None]:
# how to safely install any new packages into the virtual environment

import sys; print(sys.version); print(sys.executable)
%pip install --upgrade wikipedia
import wikipedia, sys; print(wikipedia.__version__); print(sys.executable)

In [None]:
#all imports 

import matplotlib.pyplot as plt
import textwrap
from collections import defaultdict
from sklearn.model_selection import train_test_split
import csv 
import os 
import re 
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import AutoTokenizer, AutoModel
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from langdetect import detect, DetectorFactory, LangDetectException
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sentence_transformers import SentenceTransformer, util

from sklearn.model_selection import train_test_split

device = 'cuda' if torch.cuda.is_available() else 'cpu'

def seed_all(seed=0):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

In [None]:
pretrained_model_name = 'distilbert-base-uncased'

tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name)
transformer_model = AutoModel.from_pretrained(pretrained_model_name)
# print(transformer_model.config)

### Fixing File Encoding

In [None]:
def fix_encoding(val):
    if isinstance(val, str):
        try:
            return val.encode("latin1").decode("utf-8")
        except UnicodeDecodeError:
            return val
    return val

files = [
    "comments1.csv",
    "comments2.csv",
    "comments3.csv",
    "comments4.csv",
    "comments5.csv",
    "videos.csv"
]

for fname in files:
    df = pd.read_csv(fname, encoding="latin1")
    df_fixed = df.applymap(fix_encoding)
    outname = fname.replace(".csv", "_fixed.csv")
    df_fixed.to_csv(outname, index=False, encoding="utf-8-sig")
    print(f"Fixed and saved: {outname}")

#### Removal of Spam Comments

In [None]:
#LINK SPAM REMOVAL

input_file = "Fixed Datasets/comments5_fixed.csv"
spam_file = "link_spam_new.csv"
not_spam_file = "comments5_not_spam.csv"

spam_exists = os.path.exists(spam_file)

with open(input_file, newline="", encoding="utf-8") as infile, \
     open(spam_file, "a", newline="", encoding="utf-8") as spam_out, \
     open(not_spam_file, "w", newline="", encoding="utf-8") as not_spam_out:

    reader = csv.DictReader(infile)
    fieldnames = reader.fieldnames + ["is_link"] #, "filtered"]

    spam_writer = csv.DictWriter(spam_out, fieldnames=fieldnames)
    not_spam_writer = csv.DictWriter(not_spam_out, fieldnames=fieldnames)

    if not spam_exists:
        spam_writer.writeheader()
    not_spam_writer.writeheader()

    spam_count = 0
    not_spam_count = 0

    for row in reader:
        text = row.get("textOriginal", "")
        if text is None or (isinstance(text, float) and math.isnan(text)):
            text = ""

        text_lower = text.lower()

        has_link = (".com" in text_lower and ("http" in text_lower or "www" in text_lower))

        row["is_link"] = "yes" if has_link else "no"

        if has_link: # or filtered:
            spam_writer.writerow(row)
            spam_count += 1
        else:
            not_spam_writer.writerow(row)
            not_spam_count += 1

print(f"Finished. Spam: {spam_count} | Not spam: {not_spam_count}")



#### Indonesian/Malay Language Filtering
I filtered these using keywords as langdetect doesn't always have the best results, and I am able to read the language to manually catch more occurances through keyword searching. These were later merged with the other non-english comments and marked as id for Indonesian, though some may be Malay, since the languages are mutually intelligible.

In [None]:
#INDO FILTERING 

input_file = "comments1_filtered_r3.csv"
spam_file = "indo_comments.csv"
not_spam_file = "comments1_filtered.csv"

spam_exists = os.path.exists(spam_file)


def apply_rules(text: str) > (bool, str):
    text_lower = text.lower()
    
    indo = ["gila", "baik", "cantik", "mantap", "kakak", "banget", "sekali", 
            "Masyaallah", "Masya Allah", "belum", "Orang ", "banyak", " itu", 
            "asli", "soal", "bidadari", "²", "memang", "Wah", "cerah", 
            "muka", "perempuan", "laki", "Salut", "percaya", "lagu", 
            "negeri", "negara", "ibu ", "kerana", "karena", "lucu", 
            "tebal", "Alloh", "mirip", "emang", "hadir"]
    if any(indo in text_lower for indo in indo):
        return True, "Indo/Malay"

    return False, ""

with open(input_file, newline="", encoding="utf-8") as infile, \
     open(spam_file, "a", newline="", encoding="utf-8") as spam_out, \
     open(not_spam_file, "w", newline="", encoding="utf-8") as not_spam_out:

    reader = csv.DictReader(infile)
    fieldnames = reader.fieldnames + ["spam_reason"]

    spam_writer = csv.DictWriter(spam_out, fieldnames=fieldnames)
    not_spam_writer = csv.DictWriter(not_spam_out, fieldnames=fieldnames)

    if not spam_exists:
        spam_writer.writeheader()
    not_spam_writer.writeheader()

    spam_count = 0
    not_spam_count = 0

    for row in reader:
        text = row.get("textOriginal", "")
        if text is None or (isinstance(text, float) and math.isnan(text)):
            text = ""

        is_spam, reason = apply_rules(text)
        row["spam_reason"] = reason if is_spam else ""

        if is_spam:
            spam_writer.writerow(row)
            spam_count += 1
        else:
            not_spam_writer.writerow(row)
            not_spam_count += 1

print(f"Finished. Indo: {spam_count} | Not spam: {not_spam_count}")

### Langdetect non-english comment removal

In [None]:
input_file = "comments1_filtered.csv"
english_file = "comments1_filtered_english.csv"
non_english_file = "comments_non_english.csv" 

DetectorFactory.seed = 0

emoji_pattern = re.compile(
    "["                     
    "\U0001F600-\U0001F64F" 
    "\U0001F300-\U0001F5FF" 
    "\U0001F680-\U0001F6FF" 
    "\U0001F1E0-\U0001F1FF" 
    "]",
    flags=re.UNICODE
)

url_pattern = re.compile(r"https?://\S+|www\.\S+")
handle_pattern = re.compile(r"@\w+|#\w+")

def normalize_for_detection(text: str) > str:
    text = emoji_pattern.sub("", text)
    text = url_pattern.sub("", text)
    text = handle_pattern.sub("", text)
    return text.strip()

def detect_language(text: str) > str:
    if text is None or (isinstance(text, float) and math.isnan(text)):
        return "unknown"
    cleaned = normalize_for_detection(str(text))
    if not cleaned:
        return "unknown"
    try:
        return detect(cleaned)
    except LangDetectException:
        return "unknown"


with open(input_file, newline="", encoding="utf-8") as infile, \
     open(english_file, "a", newline="", encoding="utf-8") as eng_out, \
     open(non_english_file, "a", newline="", encoding="utf-8") as non_eng_out:

    reader = csv.DictReader(infile)
    base_fields = reader.fieldnames or []
    fieldnames = base_fields if "language" in base_fields else base_fields + ["language"]

    eng_writer = csv.DictWriter(eng_out, fieldnames=fieldnames)
    non_eng_writer = csv.DictWriter(non_eng_out, fieldnames=fieldnames)

    if not non_eng_writer:
        non_eng_writer.writeheader()
    if not eng_writer:
        eng_writer.writeheader()

    eng_count = 0
    non_eng_count = 0
    count = 0

    for count, row in enumerate(reader, start=1):
        text = row.get("textOriginal", "")
        lang = detect_language(text)

        row["language"] = lang

        if lang == "en":
            eng_writer.writerow(row)
            eng_count += 1
        else:
            non_eng_writer.writerow(row)
            non_eng_count += 1

        if count % 10000 == 0:
            print(f"{count:,} rows completed", flush=True)

print(f"Done. English: {eng_count} | Non-English: {non_eng_count}")


In [None]:
# EMOJI SPAM REMOVAL

input_file = "comments5_not_spam.csv"
spam_file = "emoji_spam_new.csv"
not_spam_file = "comments5_not_spam_or_emojis.csv"

spam_exists = os.path.exists(spam_file)

emoji_pattern = re.compile(
    "["                     
    "\U0001F600-\U0001F64F" 
    "\U0001F300-\U0001F5FF" 
    "\U0001F680-\U0001F6FF" 
    "\U0001F1E0-\U0001F1FF" 
    "]",
    flags=re.UNICODE
)

def count_emojis(text: str) > int:
    return len(emoji_pattern.findall(text))

def count_words(text: str) > int:
    return len(text.split())

def has_emoji_spam(text: str) > bool:
    word_count = count_words(text)
    emoji_count = count_emojis(text)

    return word_count < 3 and emoji_count > 5

with open(input_file, newline="", encoding="utf-8") as infile, \
     open(spam_file, "a", newline="", encoding="utf-8") as spam_out, \
     open(not_spam_file, "w", newline="", encoding="utf-8") as not_spam_out:

    reader = csv.DictReader(infile)
    fieldnames = reader.fieldnames + ["is_emoji_spam"]

    spam_writer = csv.DictWriter(spam_out, fieldnames=fieldnames)
    not_spam_writer = csv.DictWriter(not_spam_out, fieldnames=fieldnames)

    if not spam_exists:
        spam_writer.writeheader()
    not_spam_writer.writeheader()

    spam_count = 0
    not_spam_count = 0

    for row in reader:
        text = row.get("textOriginal", "")
        if text is None or (isinstance(text, float) and math.isnan(text)):
            text = ""

        emoji_spam = has_emoji_spam(text)
        row["is_emoji_spam"] = "yes" if emoji_spam else "no"

        if emoji_spam:
            spam_writer.writerow(row)
            spam_count += 1
        else:
            not_spam_writer.writerow(row)
            not_spam_count += 1

print(f"Finished. Emoji spam: {spam_count} | Not spam: {not_spam_count}")

In [None]:
#KEYWORDS SPAM REMOVAL

input_file = "comments1_filtered_r2.csv"
spam_file = "keywords_spam_r2.csv"
not_spam_file = "comments1_filtered_r3.csv"

spam_exists = os.path.exists(spam_file)

emoji_pattern = re.compile(
    "["                     
    "\U0001F600-\U0001F64F" 
    "\U0001F300-\U0001F5FF" 
    "\U0001F680-\U0001F6FF" 
    "\U0001F1E0-\U0001F1FF" 
    "]",
    flags=re.UNICODE
)

def count_emojis(text: str) > int:
    return len(emoji_pattern.findall(text))

def count_words(text: str) > int:
    return len(text.split())

def has_emoji_spam(text: str) > bool:
    return count_words(text) < 2 and count_emojis(text) > 3

word_pattern = re.compile(r"[A-Za-z0-9]+")

def count_real_words(text: str) > int:
    return len(word_pattern.findall(text))

def only_emojis(text: str) > bool:
    return count_real_words(text) == 0 and count_emojis(text) > 1

def apply_rules(text: str) > (bool, str):
    text_lower = text.lower()

    # 1) Contains ".com"
    if ".com" in text_lower:
        return True, "contains .com"

    # 2) Contains "our website"
    if " our website" in text_lower:
        return True, "contains 'our website'"

    # 3) Countries with no context 
    countries = ["india", "Lndia", "china","indonesia","indo","pakistan",
                 "nigeria","brazil","bangladesh","russie","ethiopia",
                 "mexico","japan","china","egypt","philippines","vietnam",
                 "iran","turkey","germany","thailand","the UK","france",
                 "south africa","italy","kenya","myanmar","spain", "america", "korea"]

    if any(country in text_lower for country in countries):
        if count_words(text) <= 3 and "love" not in text_lower:
            return True, "Country pride"

    # 4) more emoji spam
    if has_emoji_spam(text) or only_emojis(text):
        return True, "emoji spam"

    return False, ""

with open(input_file, newline="", encoding="utf-8") as infile, \
     open(spam_file, "a", newline="", encoding="utf-8") as spam_out, \
     open(not_spam_file, "w", newline="", encoding="utf-8") as not_spam_out:

    reader = csv.DictReader(infile)
    fieldnames = reader.fieldnames + ["spam_reason"]

    spam_writer = csv.DictWriter(spam_out, fieldnames=fieldnames)
    not_spam_writer = csv.DictWriter(not_spam_out, fieldnames=fieldnames)

    if not spam_exists:
        spam_writer.writeheader()
    not_spam_writer.writeheader()

    spam_count = 0
    not_spam_count = 0

    for row in reader:
        text = row.get("textOriginal", "")
        if text is None or (isinstance(text, float) and math.isnan(text)):
            text = ""

        is_spam, reason = apply_rules(text)
        row["spam_reason"] = reason if is_spam else ""

        if is_spam:
            spam_writer.writerow(row)
            spam_count += 1
        else:
            not_spam_writer.writerow(row)
            not_spam_count += 1

print(f"Finished. Spam: {spam_count} | Not spam: {not_spam_count}")

### Category Tags

In [None]:
# First, we will categorise them based on keyword hits 
# for top terms in each category
# From here, we will train the CNN to recognise 
# titles in each category

In [None]:
INPUT_VIDEOS = "videos_fixed.csv"
OUTPUT_HITS = "videos_with_tags.csv"
OUTPUT_NONHITS = "videos_no_tags.csv"

CATEGORIES = {
    "makeup": [
        r"\bmake[-\s]?up\b", r"\bgrwm\b", r"\bglam\b", r"\bprimer\b", r"\bfoundation\b",
        r"\bconcealer\b", r"\bblush\b", r"\bbronzer\b", r"\bcontour\b", r"\bhighlighter\b",
        r"\bsetting (?:spray|powder)\b", r"\bpowder\b", r"\bpalette\b", r"\beyeshadow\b",
        r"\bmascara\b", r"\beyeliner\b", r"\bbrow(?:s| gel| pencil| soap)?\b",
        r"\blip(?:stick| gloss| oil| liner)?\b",
        r"\bmakeup\s+(?:tutorial|routine|haul|dupe|review)\b",
    ],
    "skincare": [
        r"\bskin[-\s]?care\b", r"\bskincare\b", r"\bglowing skin\b", r"\banti[-\s]?aging\b",
        r"\bretino(?:l|id)s?\b", r"\bniacinamide\b", r"\bvitamin\s*c\b", r"\bhyaluronic acid\b",
        r"\bceramide[s]?\b", r"\bcleanser\b", r"\bmoisturi[sz]er\b", r"\bsunscreen\b", r"\bspf\b",
        r"\btoner\b", r"\bserum\b", r"\bessence\b", r"\bexfoliat(?:e|or|ing)\b",
        r"\b(?:aha|bha|pha)\b", r"\bsalicylic\b", r"\bglycolic\b", r"\bacne\b|\bpimple\b|\bpores?\b",
        r"\bslugging\b", r"\bskin(?!ny)\b",
    ],
    "fragrance": [
        r"\bfragrance\b", r"\bperfume\b", r"\bparfum\b", r"\bcologne\b",
        r"\beau de (?:parfum|toilette)\b", r"\b(?:edp|edt)\b", r"\bbody mist\b",
        r"\bsillage\b", r"\bnotes?\b", r"\bdupe\b.*\b(perfume|fragrance)\b",
    ],
    "hair": [
        r"\bhair\b", r"\bshampoo\b", r"\bconditioner\b", r"\bleave[-\s]?in\b",
        r"\bhair serum\b", r"\bheat protect(?:ant)?\b",
        r"\b(?:dye|dyeing|color(?:ing)?|bleach|toner)\b",
        r"\bombre\b|\bbalayage\b|\bhighlights?\b",
        r"\bcurl(?:s|ing)?\b|\bcurl(?:er|ing iron)\b|\bperm\b",
        r"\bstraighten(?:ing)?\b|\bflat iron\b|\bblowout\b|\bkeratin\b",
        r"\bbraid(?:s|ing)?\b|\bwig\b|\bweave\b|\bextensions?\b",
        r"\bsalon\b|\bhaircut\b|\bfringe\b|\bbangs\b",
    ],
    "skills": [
        r"\bediting\b", r"\bedit\b", r"\bcapcut\b", r"\bpremiere pro\b", r"\bfinal cut\b",
        r"\bafter effects\b", r"\bcolor grading\b", r"\bthumbnail\b", r"\btransition[s]?\b",
        r"\bworkflow\b", r"\bfilming\b", r"\blighting\b", r"\bb[-\s]?roll\b",
    ],
    "nails": [
        r"\bnails?\b", r"\bnail polish\b", r"\bmani(?:cure)?\b", r"\bgel[-\s]?x\b|\bgelx\b",
        r"\bgel\b", r"\bacrylics?\b", r"\bshellac\b", r"\bpress[-\s]?ons?\b", r"\bnail art\b",
    ],
    "fashion": [
        r"\bfashion\b", r"\boutfit[s]?\b|\bootd\b|\bfit check\b", r"\bclothes\b|\bwardrobe\b",
        r"\bpetite\b", r"\bhaul\b|\btry[-\s]?on\b|\blookbook\b|\bcapsule\b",
        r"\bworkout set\b|\bsports bra\b|\bleggings\b",
        r"\bjeans\b|\bdress\b|\bheels\b|\bsneakers\b",
        r"\bstyling\b|\btrend[s]?\b|\baesthetic\b|\bstreetwear\b",
        r"\bdupe\b.*\b(outfit|clothes|fashion)\b",
    ],
    "general lifestyle": [
        r"\bvlog\b", r"\bday in (?:my|the) life\b", r"\bweek in (?:my|the) life\b",
        r"\bmorning routine\b|\bnight routine\b|\broutine\b|\breset\b",
        r"\bself[-\s]?care\b", r"\bproductivity\b", r"\bclean with me\b",
        r"\bget ready with me\b|\bgrwm\b", r"\bdeclutter\b", r"\borganize\b",
        r"\bhome\b|\bapartment\b|\bdecor(?:ating|)\b", r"\bgrocery\b|\bmeal prep\b",
    ],
    # "misc": [
    #     r"\bq\s*&\s*a\b|\bq and a\b|\bqa\b",
    #     r"\basmr\b", r"\bgiveaway\b", r"\bannouncement\b", r"\bstorytime\b",
    # ],
}

def compile_category_patterns(categories: dict):
    return {cat: [re.compile(p, re.IGNORECASE) for p in pats] for cat, pats in categories.items()}

def count_hits(title: str, rx_list):
    if not isinstance(title, str) or not title:
        return 0
    return sum(len(rx.findall(title)) for rx in rx_list)

def tag_title(title: str, compiled_pats: dict, category_order=None):
    hits = {cat: count_hits(title, rxs) for cat, rxs in compiled_pats.items()}
    order = category_order or list(compiled_pats.keys())
    tags = [cat for cat in order if hits.get(cat, 0) > 0]
    return hits, "; ".join(tags) if tags else ""

df = pd.read_csv(INPUT_VIDEOS, engine="python", on_bad_lines="skip", dtype=str, encoding="utf-8")

compiled = compile_category_patterns(CATEGORIES)

hit_cols = [f"hits_{cat.replace(' ', '_')}" for cat in CATEGORIES]
for col in hit_cols:
    df[col] = 0

tags_out = []
for i, title in enumerate(df["title"].fillna("").astype(str)):
    hits, tags = tag_title(title, compiled, category_order=list(CATEGORIES.keys()))
    for cat, h in hits.items():
        df.at[i, f"hits_{cat.replace(' ', '_')}"] = h
    tags_out.append(tags)
df["tags"] = tags_out  

has_hit = df[hit_cols].sum(axis=1) > 0
df_hits = df.loc[has_hit].copy()
df_non  = df.loc[~has_hit].copy()

df_hits.to_csv(OUTPUT_HITS, index=False, encoding="utf-8")
df_non.to_csv(OUTPUT_NONHITS, index=False, encoding="utf-8")

print(f"Wrote {len(df_hits):,} videos with ≥1 category hit to {OUTPUT_HITS}")
print(f"Wrote {len(df_non):,} videos with no hits to {OUTPUT_NONHITS}")


In [None]:
# Accept everything with hits in only 1 category 
# as the golden standard

INPUT = "videos_with_tags_defining.csv"
NO_TAGS_OUTPUT = "videos_no_tags.csv"

df = pd.read_csv(INPUT, engine="python", on_bad_lines="skip", dtype=str, encoding="utf-8")

def_col = next(
    (c for c in df.columns if re.sub(r"\s+", "", c.strip().lower()) == "definingcategory"),
    None
)

mask_no = df[def_col].isna() | df[def_col].astype(str).str.strip().eq("")
df_no   = df.loc[mask_no].copy()
df_keep = df.loc[~mask_no].copy()
need_header = not Path(NO_TAGS_OUTPUT).exists() or Path(NO_TAGS_OUTPUT).stat().st_size == 0
if not df_no.empty:
    df_no.to_csv(NO_TAGS_OUTPUT, mode="a", index=False, encoding="utf-8", header=need_header)

df_keep.to_csv(INPUT, index=False, encoding="utf-8")
print(f"Total rows in input: {len(df):,}")
print(f"Appended to '{NO_TAGS_OUTPUT}': {len(df_no):,} rows without a defining category.")
print(f"Remaining in '{INPUT}': {len(df_keep):,} rows with a defining category.")


In [None]:
# now, we match the videos to the comments so we 
# can have the video and comment category together


COMMENT_FILES = [
    "comments1_filtered_english.csv",
    "comments2_filtered_english.csv",
    "comments3_filtered_english.csv",
    "comments4_filtered_english.csv",
    "comments5_filtered_english.csv",
]
VIDEOS_CSV = "videos_fixed.csv"
OUTPUT_SUFFIX = "_with_video_details.csv"

READ_KW = dict(engine="python", on_bad_lines="skip", dtype=str, encoding="utf-8")

COMMENT_ORDER = ["kind","commentId","channelId","videoId","authorId",
                 "textOriginal","parentCommentId","likeCount","publishedAt","updatedAt"]
VIDEO_ORDER   = ["kind","videoId","publishedAt","channelId","title","description","tags",
                 "defaultLanguage","defaultAudioLanguage","contentDuration","viewCount",
                 "likeCount","favouriteCount","commentCount","topicCategories"]

videos = pd.read_csv(VIDEOS_CSV, **READ_KW)
videos = videos.drop_duplicates(subset=["videoId"], keep="first")
ordered_video_cols = [c for c in VIDEO_ORDER if c in videos.columns] + \
                     [c for c in videos.columns if c not in VIDEO_ORDER]
video_rename = {c: f"video_{c}" for c in ordered_video_cols if c != "videoId"}
videos_pref = videos.rename(columns=video_rename)


for cpath in COMMENT_FILES:
    if not os.path.exists(cpath):
        print(f"Warning: {cpath} not found; skipping.")
        continue

    comments = pd.read_csv(cpath, **READ_KW)
    if "videoId" not in comments.columns:
        print(f"Warning: {cpath} missing 'videoId'; writing unchanged.")
        out_df = comments
    else:
        merged = comments.merge(videos_pref, on="videoId", how="left", copy=False)
        comment_cols_in_file_order = list(comments.columns)
        video_cols_pref = [video_rename[c] for c in ordered_video_cols if c != "videoId" and video_rename[c] in merged.columns]
        final_cols = comment_cols_in_file_order + [c for c in video_cols_pref if c not in comment_cols_in_file_order]

        out_df = merged.loc[:, final_cols]

    out_path = Path(cpath).with_name(Path(cpath).stem + OUTPUT_SUFFIX)
    out_df.to_csv(out_path, index=False, encoding="utf-8")

    probe_col = "video_title"
    matched = int(out_df[probe_col].notna().sum()) if probe_col in out_df.columns else 0
    print(f"Wrote {len(out_df):,} rows to {out_path.name} | matched videos: {matched:,}")


In [None]:
def _norm(s: str) > str:
    if not isinstance(s, str):
        return ""
    s = s.strip().lower().replace("_", " ")
    s = re.sub(r"\s+", " ", s)
    return s

class VideoCategoryDataset(Dataset):
    def __init__(
        self,
        csv_file,
        tokenizer,
        max_length=128,
        title_col="title",
        label_col="defining category",    
        categories=None,  
        **kwargs,
    ):
        df = pd.read_csv(csv_file, engine="python", on_bad_lines="skip", dtype=str)
        df.columns = [c.strip() if isinstance(c, str) else c for c in df.columns]

        if label_col not in df.columns:
            alt = label_col.replace("_", " ").replace("-", " ")
            if alt in df.columns:
                label_col = alt
            else:
                raise ValueError(f"Expected label column '{label_col}' (or '{alt}') in {csv_file}. Found: {list(df.columns)}")

        if title_col not in df.columns:
            raise ValueError(f"Expected title column '{title_col}' in {csv_file}. Found: {list(df.columns)}")

        if categories is None:
            categories = ["makeup","skincare","fragrance","hair","skills","nails","fashion","general lifestyle"]

        self.class_names = categories[:]
        canon_classes = [_norm(c) for c in categories]
        self.label2id = {c: i for i, c in enumerate(canon_classes)}
        self.id2label = {i: self.class_names[i] for i in range(len(categories))}
        self.num_labels = len(categories)

        df[title_col] = df[title_col].fillna("").astype(str)
        df["_label_norm"] = df[label_col].apply(_norm)

        mask_valid = (df[title_col].str.strip() != "") & (df["_label_norm"].isin(canon_classes))
        df = df[mask_valid]

        if df.empty:
            raise ValueError("No usable rows after filtering, check columns and categories")

        df["_label_id"] = df["_label_norm"].map(self.label2id).astype(int)

        self.titles = df[title_col].tolist()
        self.labels = df["_label_id"].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

        counts = df["_label_id"].value_counts().reindex(range(self.num_labels), fill_value=0)
        freq = counts.to_numpy()
        with torch.no_grad():
            inv = torch.tensor([0.0 if f == 0 else 1.0 / f for f in freq], dtype=torch.float)
            self.class_weights = (inv * (self.num_labels / inv.sum())) if inv.sum() > 0 else None

    def __len__(self):
        return len(self.titles)

    def __getitem__(self, idx):
        title = self.titles[idx]
        label = self.labels[idx]  

        enc = self.tokenizer(
            title,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )

        input_ids = enc["input_ids"].squeeze(0)
        attention_mask = enc["attention_mask"].squeeze(0)
        label = torch.tensor(label, dtype=torch.long)

        return input_ids, attention_mask, label


In [None]:
max_length = 48
batch_size = 32

SOURCE     = "videos_with_tags_defining.csv"
TRAIN_OUT  = "tags_train.csv"
TEST_OUT   = "tags_trainer_test.csv"

LABEL_COL  = "defining_category"
TITLE_COL  = "title"

df = pd.read_csv(SOURCE, engine="python", on_bad_lines="skip", dtype=str)

label_norm = (
    df[LABEL_COL].astype(str)
      .str.strip().str.lower()
      .str.replace("_", " ")
      .str.replace(r"\s+", " ", regex=True)
)
has_title = df[TITLE_COL].astype(str).str.strip() != ""
mask = label_norm.ne("") & label_norm.notna() & has_title

df_clean = df.loc[mask].copy()
y = label_norm.loc[mask]

train_df, test_df = train_test_split(
    df_clean, test_size=0.2, stratify=y, random_state=42
)

train_df.to_csv(TRAIN_OUT, index=False, encoding="utf-8")
test_df.to_csv(TEST_OUT, index=False, encoding="utf-8")
print(f"Wrote {len(train_df):,} rows to {TRAIN_OUT} and {len(test_df):,} rows to {TEST_OUT}")

CATS = ["makeup","skincare","fragrance","hair","skills","nails","fashion","general lifestyle"]

dataset_train = VideoCategoryDataset(
    TRAIN_OUT, tokenizer=tokenizer, max_length=max_length,
    title_col=TITLE_COL, label_col=LABEL_COL, categories=CATS
)
dataset_test  = VideoCategoryDataset(
    TEST_OUT, tokenizer, max_length=max_length,
    title_col=TITLE_COL, label_col=LABEL_COL, categories=CATS
)

loader_train = DataLoader(dataset_train, batch_size, shuffle=True)
loader_test  = DataLoader(dataset_test,  batch_size, shuffle=False)

In [None]:
class VideoCategoriser(nn.Module):

    def __init__(self, transformer, num_class, device='cpu',
                 freeze_transformer=True, dropout=0.1):
        super().__init__()
        self.device = device
        self.transformer = transformer
        self.hidden = transformer.config.hidden_size

        if freeze_transformer:
            for p in self.transformer.parameters():
                p.requires_grad = False

        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(self.hidden, num_class)

        self.to(device=self.device)

    def forward(self, input_ids, attention_mask, token_type_ids=None):
        kwargs = dict(input_ids=input_ids, attention_mask=attention_mask)
        if token_type_ids is not None:
            kwargs["token_type_ids"] = token_type_ids
        out = self.transformer(**kwargs)
        cls = out.last_hidden_state[:, 0, :]
        logits = self.fc(self.dropout(cls))
        return logits

    def _unpack_batch(self, batch):
        if len(batch) == 3:
            input_ids, attention_mask, y = batch
            token_type_ids = None
        elif len(batch) == 4:
            input_ids, attention_mask, token_type_ids, y = batch
        else:
            raise ValueError(f"Unexpected batch format with {len(batch)} items.")
        return input_ids, attention_mask, token_type_ids, y

    def Train(self, epochs, optimizer, loader_train, loader_test, verbose=True, class_weights=None):
        self.loss_train_log = []
        self.loss_test_log  = []
        self.best_loss = float('inf')
        best_epoch = -1

        loss_fn = nn.CrossEntropyLoss(
            weight=(class_weights.to(self.device) if class_weights is not None else None)
        )

        for epoch in range(epochs):
            self.train()
            # (1) loop over loader_train
            for batch in loader_train:
                input_ids, attention_mask, token_type_ids, y = self._unpack_batch(batch)
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)
                y = y.long().to(self.device)
                if token_type_ids is not None:
                    token_type_ids = token_type_ids.long().to(self.device)

                optimizer.zero_grad()
                yhat = self.forward(input_ids, attention_mask, token_type_ids)
                loss = loss_fn(yhat, y)
                loss.backward()
                optimizer.step()

            # (2) training loss
            loss_train = self.evaluate(loader_train, class_weights=class_weights)
            self.loss_train_log.append(loss_train)

            # (3) validation loss
            loss_test = self.evaluate(loader_test, class_weights=class_weights)
            self.loss_test_log.append(loss_test)

            # (4) print progress
            if verbose:
                print('Epochs %d/%d' % (epoch+1, epochs))
                print('Train Loss = %.4f' % loss_train, end=', ')
                print('Val Loss = %.4f' % loss_test)

            # (5) save best
            if loss_test < self.best_loss:
                self.best_loss = loss_test
                best_epoch = epoch + 1
                torch.save(self.state_dict(), 'video_categoriser_best_params.pt')

        print(f'Best model saved at epoch {best_epoch} with loss {self.best_loss:.4f}.')

    def evaluate(self, loader, class_weights=None):
        self.eval()
        loss_fn = nn.CrossEntropyLoss(
            weight=(class_weights.to(self.device) if class_weights is not None else None)
        )
        total_loss, total_n = 0.0, 0

        with torch.no_grad():
            for batch in loader:
                input_ids, attention_mask, token_type_ids, y = self._unpack_batch(batch)
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)
                y = y.long().to(self.device)
                if token_type_ids is not None:
                    token_type_ids = token_type_ids.long().to(self.device)

                y_pred = self.forward(input_ids, attention_mask, token_type_ids)
                loss = loss_fn(y_pred, y)

                bs = y.size(0)
                total_loss += loss.item() * bs
                total_n += bs

        return total_loss / max(total_n, 1)

    def predict(self, loader):
        self.eval()
        x_all, y_all, logit = [], [], []
        with torch.no_grad():
            for batch in loader:
                input_ids, attention_mask, token_type_ids, y = self._unpack_batch(batch)
                x_all.append(input_ids) 
                y_all.append(y)
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)
                if token_type_ids is not None:
                    token_type_ids = token_type_ids.long().to(self.device)

                yhat = self.forward(input_ids, attention_mask, token_type_ids)
                logit.append(yhat.cpu())

        x_all  = torch.cat(x_all).cpu()
        y_all  = torch.cat(y_all).cpu()
        logit  = torch.cat(logit).cpu()
        return x_all, y_all, logit


In [None]:
lr = 5e-3
epochs = 5
model = VideoCategoriser(transformer=transformer_model, num_class=8, device = 'cpu')
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model.Train(epochs, optimizer, loader_train, loader_test)

In [None]:
# Previous output: 
# Epochs 1/5
# Train Loss = 0.3614, Val Loss = 0.3713
# Epochs 2/5
# Train Loss = 0.3217, Val Loss = 0.3334
# Epochs 3/5
# Train Loss = 0.3221, Val Loss = 0.3417
# Epochs 4/5
# Train Loss = 0.3361, Val Loss = 0.3500
# Epochs 5/5
# Train Loss = 0.2884, Val Loss = 0.3070
# Best model saved at epoch 5 with loss 0.3070.

In [None]:
best_params = torch.load('video_categoriser_best_params.pt')
model_best = VideoCategoriser(device=device, transformer=transformer_model, num_class = 8)
model_best.load_state_dict(best_params)
x_all, y_all, logit = model_best.predict(loader_test)

probs = F.softmax(logit, dim=1)
y_pred = torch.argmax(probs, dim=1)
accuracy = (y_pred == y_all).float().mean().item()
print(f'Accuracy: {accuracy:.4f}')

In [None]:
INPUT_CSV   = "leftover_videos.csv"
OUTPUT_CSV  = "leftover_videos_tagged.csv"
PRETRAINED  = "distilbert-base-uncased"
BEST_WEIGHTS = "video_categoriser_best_params.pt" 
MAX_LENGTH  = 128
BATCH_SIZE  = 64
PRINT_EVERY = 1000

CATS        = ["makeup","skincare","fragrance","hair","skills","nails","fashion","general lifestyle"]
PRETRAINED  = "distilbert-base-uncased"
BEST_WEIGHTS = "video_categoriser_best_params.pt"
MAX_LENGTH  = 128
BATCH_SIZE  = 64


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# If model/tokenizer aren't already defined in the notebook, quickly load them:
if "tokenizer" not in globals():
    tokenizer = AutoTokenizer.from_pretrained(PRETRAINED)
if "model" not in globals():
    backbone = AutoModel.from_pretrained(PRETRAINED)
    model = VideoCategoriser(backbone, num_class=len(CATS), device=device,
                             freeze_transformer=True) #, id2label=CATS)
    state = torch.load(BEST_WEIGHTS, map_location="cpu")
    model.load_state_dict(state, strict=True)
    model.eval()

def _slug(c: str) > str:
    return c.lower().replace(" ", "_")

@torch.no_grad()
def _predict_probs(titles, descs, batch_size=BATCH_SIZE):
    probs_all = []
    n = len(titles)
    for i in range(0, n, batch_size):
        enc = tokenizer(
            titles[i:i+batch_size],
            text_pair=descs[i:i+batch_size],
            truncation="only_second",
            padding=True,
            max_length=MAX_LENGTH,
            return_tensors="pt",
        )
        input_ids = enc["input_ids"].to(device)
        attn      = enc["attention_mask"].to(device)
        tti       = enc.get("token_type_ids")
        if tti is not None: tti = tti.to(device)

        logits = model(input_ids, attn, tti)     # (B, C)
        probs  = F.softmax(logits, dim=1).cpu().numpy()
        probs_all.append(probs)
    return np.vstack(probs_all) if probs_all else np.zeros((0, len(CATS)))

def tag_no_tags(input_csv=INPUT_CSV, output_csv=OUTPUT_CSV, k_ambiguous=100):
    df = pd.read_csv(input_csv, engine="python", on_bad_lines="skip", dtype=str, encoding="utf-8")
    
    titles = df["title"].fillna("").astype(str).tolist()
    descs  = df["description"].fillna("").astype(str).tolist()
    probs  = _predict_probs(titles, descs)  # (N, C)

    for j, cat in enumerate(CATS):
        df[f"prob_{_slug(cat)}"] = probs[:, j].round(6)

    top3_idx = np.argsort(-probs, axis=1)[:, :3]
    n = len(df)
    df["pred1"] = [CATS[i] for i in top3_idx[:, 0]]
    df["pred1_prob"] = probs[np.arange(n), top3_idx[:, 0]].round(6)
    df["pred2"] = [CATS[i] for i in top3_idx[:, 1]]
    df["pred2_prob"] = probs[np.arange(n), top3_idx[:, 1]].round(6)
    df["pred3"] = [CATS[i] for i in top3_idx[:, 2]]
    df["pred3_prob"] = probs[np.arange(n), top3_idx[:, 2]].round(6)

    prob_range = probs.max(axis=1) - probs.min(axis=1)
    df["prob_range_all8"] = prob_range.round(6)

    df["defining category"] = df["pred1"]
    k = min(k_ambiguous, n)
    ambiguous_idx = np.argsort(prob_range)[:k]
    df["ambiguous_misc_override"] = False
    df.loc[ambiguous_idx, ["defining category", "ambiguous_misc_override"]] = ["misc", True]

    df.to_csv(output_csv, index=False, encoding="utf-8")
    print(f"Wrote {len(df):,} rows with probabilities and tags to '{output_csv}'")
    return df

_ = tag_no_tags(INPUT_CSV, OUTPUT_CSV, k_ambiguous=100)

#32 mb = 70mins processing

### Quality Rating
Quality: Comments that keep others on the video for longer to like or reply to it

In [None]:
INPUT_FILES = [f"comments{i}_filtered_english.csv" for i in range(1, 6)]
OUTPUT = "comments_over_1000.csv"
MIN_LIKES = 1000

kept = []

for path in INPUT_FILES:
    try:
        df = pd.read_csv(path, engine="python", on_bad_lines="skip", encoding="utf-8")
    except FileNotFoundError:
        print(f"Warning: {path} not found; skipping.")
        continue

    if "likeCount" not in df.columns:
        print(f"Warning: {path} is missing 'likeCount'; skipping.")
        continue

    df["likeCount"] = pd.to_numeric(df["likeCount"], errors="coerce").fillna(0)

    keep = df[df["likeCount"] > MIN_LIKES]
    kept.append(keep)
    print(f"{path}: kept {len(keep)} of {len(df)} rows (> {MIN_LIKES})")

if kept:
    out = pd.concat(kept, ignore_index=True)

    out.to_csv(OUTPUT, index=False, encoding="utf-8")
    print(f"Wrote {len(out)} rows to {OUTPUT}")
else:
    print("No matching rows found or no valid files read.")


In [None]:
# Set top liked comments as high quality, while spam is low quality
# and decreases video interest when users read it

In [None]:
INPUTS = [
    ("Quality_scoring/emoji_spam_comments.csv", "Poor"),
    ("Quality_scoring/keywords_spam.csv", "Poor"),
    ("Quality_scoring/link_spam_comments.csv", "Poor"),
    ("Quality_scoring/top_liked_comments.csv", "High"),
]

OUTPUT = "quality_trainer.csv"
PREFERRED_COLS = ["kind", "commentId", "channelId", "videoId", "authorId",
                  "textOriginal", "parentCommentId", "likeCount", "publishedAt", "updatedAt"]

POOR_LIMIT = 5500
poor_seen = 0
dfs = []

for path, label in INPUTS:
    df = pd.read_csv(path, engine="python", on_bad_lines="skip", encoding="utf-8", dtype=str)
    df.columns = [c.strip() if isinstance(c, str) else c for c in df.columns]

    if label == "Poor":
        poor_seen += 1
        if poor_seen <= 3 and len(df) > POOR_LIMIT:
            df = df.sample(n=POOR_LIMIT, random_state=42).reset_index(drop=True)

    df["quality"] = label
    dfs.append(df)

out = pd.concat(dfs, ignore_index=True, sort=False)

cols = [c for c in PREFERRED_COLS if c in out.columns] + ["quality"]
out = out.loc[:, cols]

if "likeCount" in out.columns:
    lk = pd.to_numeric(out["likeCount"], errors="coerce")
    out["likeCount"] = lk.astype("Int64").astype(str)
    out.loc[out["likeCount"] == "<NA>", "likeCount"] = ""

out.to_csv(OUTPUT, index=False, encoding="utf-8")
print(f"Wrote {len(out):,} rows and {len(out.columns)} columns to {OUTPUT}")


In [None]:
class QualityDataset(Dataset):
    def __init__(self, csv_file, tokenizer, max_length=128,
                 text_col="textOriginal", label_col="quality"):
        

        df = pd.read_csv(csv_file, engine="python", on_bad_lines="skip", dtype=str)
        df.columns = [c.strip() if isinstance(c, str) else c for c in df.columns]

        if text_col not in df.columns or label_col not in df.columns:
            raise ValueError(f"Expected columns '{text_col}' and '{label_col}' in {csv_file}. "
                             f"Found: {list(df.columns)}")

        # High > 1, Poor > 0
        mapping = {"high": 1, "poor": 0}
        df["label"] = df[label_col].str.strip().str.lower().map(mapping)

        df[text_col] = df[text_col].fillna("").astype(str)
        df = df[(df["label"].isin([0, 1])) & (df[text_col].str.strip() != "")]

        self.texts = df[text_col].tolist()
        self.labels = df["label"].astype(int).tolist()

        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        inputs = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        input_ids = inputs["input_ids"].squeeze(0)
        attention_mask = inputs["attention_mask"].squeeze(0)
        label = torch.tensor(label, dtype=torch.float).unsqueeze(0)  # shape (1,)

        return input_ids, attention_mask, label


In [None]:
max_length = 128
batch_size = 32

all_df = pd.read_csv("quality_trainer.csv", engine="python", on_bad_lines="skip", dtype=str)
all_df = all_df[["textOriginal", "quality"]].dropna()
train_df, test_df = train_test_split(
    all_df, test_size=0.2, stratify=all_df["quality"].str.strip().str.lower(), random_state=42
)
train_df.to_csv("quality_trainer_train.csv", index=False)
test_df.to_csv("quality_trainer_test.csv", index=False)

dataset_train = QualityDataset("quality_trainer_train.csv", tokenizer, max_length=max_length)
dataset_test  = QualityDataset("quality_trainer_test.csv",  tokenizer, max_length=max_length)

Q_loader_train = DataLoader(dataset_train, batch_size, shuffle=True)
Q_loader_test  = DataLoader(dataset_test,  batch_size, shuffle=False)



In [None]:
class QualityScorer(nn.Module):
    def __init__(self, transformer, hidden_dim, device='cpu'):
        super().__init__()
        self.device = device
        
        self.transformer = transformer
        transformer_output_dim = transformer.config.hidden_size

        for param in self.transformer.parameters():
            param.requires_grad = False

        self.classifier = nn.Sequential(
            nn.Linear(transformer_output_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )

        self.to(device=device)

    def forward(self, input_ids, attention_mask):
        out = self.transformer(input_ids=input_ids, attention_mask=attention_mask)
        cls_token = out.last_hidden_state[:, 0, :]  # CLS token
        out = self.classifier(cls_token)
        return out
    

    def Train(self, epochs, optimizer, loader_train, loader_test, verbose=True):
        self.loss_train_log = []
        self.loss_test_log = []
        self.best_loss = np.inf
        loss_fn = nn.BCELoss()

        for epoch in range(epochs):
            self.train() 
            epoch_loss = 0

            # Step (1)        
            for input_ids, attention_mask, labels in loader_train:
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)
                labels = labels.float().to(self.device)

                optimizer.zero_grad()
                outputs = self.forward(input_ids, attention_mask)
                loss = loss_fn(outputs, labels) 

                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()

            avg_train_loss = epoch_loss / len(loader_train)
            avg_test_loss = self.evaluate(loader_test)

            self.loss_train_log.append(avg_train_loss)
            self.loss_test_log.append(avg_test_loss)
            
            if verbose:
                print('Epochs %d/%d' % (epoch+1, epochs))
                print('Train Loss = %.4f' % avg_train_loss, end=', ')
                print('Val Loss = %.4f' % avg_test_loss)

        # Step (5) save the best model
            if avg_test_loss < self.best_loss:
                self.best_loss = avg_test_loss
                self.best_epoch = epoch + 1
                torch.save(self.state_dict(), 'quality_best_params.pt')
        
        print(f'Best model saved at epoch {self.best_epoch} with loss {self.best_loss:.4f}.')


    def evaluate(self, loader):
        self.eval() 
        loss_fn = nn.BCELoss()
        loss = 0
        total_loss = 0
        
        with torch.no_grad():

            total_loss = 0

            for input_ids, attention_mask, labels in loader:
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)
                labels = labels.float().to(self.device)

                outputs = self.forward(input_ids, attention_mask)
                loss = loss_fn(outputs, labels)
                total_loss += loss.item()

        loss = total_loss / len(loader)
        return loss
    
    def predict(self, loader):
        self.eval() 

        x_all, y_all, pred = [], [], [] 
        with torch.no_grad():
            for input_ids, attention_mask, labels in loader:
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)

                outputs = self.forward(input_ids, attention_mask)
                preds = (outputs > 0.5).long().cpu().numpy()

                x_all.extend(input_ids.cpu().numpy())
                y_all.extend(labels.cpu().numpy())
                pred.extend(preds)

            return x_all, y_all, pred

In [None]:
Q_lr = 1e-3
Q_epochs = 10
Q_model = QualityScorer(transformer = transformer_model, hidden_dim = transformer_model.config.hidden_dim)
Q_optimizer = torch.optim.Adam(Q_model.parameters(), lr=lr)
Q_model.Train(Q_epochs, Q_optimizer, Q_loader_train, Q_loader_test)

In [None]:
_, y_all, pred = model.predict(loader_test)
y_all = np.array(y_all).flatten()
pred = np.array(pred).flatten()
accuracy = np.mean(y_all == pred)
print(accuracy)

# low accuracy of 0.29 but works well on the actual dataset

### Sentiment Rating
Is the comment positive or negative?

In [None]:
# Due to limited time, keywords will be used to make the 
# datasets even though these are not foolproof, 
# especially with modern slang

In [None]:
INPUT_FILES = [
    "comments1_filtered_english.csv",
    "comments2_filtered_english.csv",
    "comments3_filtered_english.csv",
    "comments4_filtered_english.csv",
    "comments5_filtered_english.csv",
]
CHUNK = 10_000
PRINT_EVERY = 100000
PER_FILE_SUFFIX = "_positivity_scored.csv"
HITS_OUTPUT = "positivity_hits_all.csv"   

SCORING_FIELDS = ["sentiment", "pos_hits", "neg_hits", "source_file"]

def dedup(seq):
    seen = set(); out = []
    for x in seq:
        if x not in seen:
            seen.add(x); out.append(x)
    return out


url_re     = re.compile(r"https?://\S+|www\.\S+", flags=re.IGNORECASE)
handle_re  = re.compile(r"[@#]\w+", flags=re.IGNORECASE)
emoji_re   = re.compile("["                              
    "\U0001F600-\U0001F64F"  
    "\U0001F300-\U0001F5FF"  
    "\U0001F680-\U0001F6FF"  
    "\U0001F1E0-\U0001F1FF"  
    "]", flags=re.UNICODE)
punct_re   = re.compile(r"[^\w\s]")
space_re   = re.compile(r"\s+")

def sanitise(text: str) > str:
    if not isinstance(text, str):
        text = "" if text is None else str(text)
    text = url_re.sub(" ", text)
    text = handle_re.sub(" ", text)
    text = emoji_re.sub(" ", text)
    text = text.replace("’", "'")
    text = text.lower().replace("'", "")  # "you're" > "youre"
    text = punct_re.sub(" ", text)
    text = space_re.sub(" ", text).strip()
    return text

_positive = [
    r"\bi love (?:her|him|you|u|ya|this(?: video)?|it)\b",
    r"\blove (?:this|it|you|u|ya|her|him)\b",
    r"\byoure\s+(?:the\s+)?best\b",
    r"\bur\s+(?:the\s+)?best\b",
    r"\byoure\s+my\s+fav(?:ou?rite)\b",
    r"\bur\s+my\s+fav(?:ou?rite)\b",
    r"\bmy\s+fav(?:ou?rite)\b",
]
_negative = [
    r"\bthis\s+is\s+(?:so\s+)?(?:dumb|stupid|trash|terrible|awful|garbage|cringe|bad)\b",
    r"\b(?:he|she|you|u|they|it)\s+(?:is|are|so)\s+(?:dumb|stupid|trash|terrible|awful|garbage|cringe|idiotic)\b",
    r"\b(?:dumb|stupid|idiot|garbage|trash|terrible|awful|cringe)\b",
    r"\bhate\s+(?:this|it|you|u|her|him)\b",
    r"\bworst\b",
]
POS_RE = [re.compile(p) for p in _positive]
NEG_RE = [re.compile(p) for p in _negative]

def score_text(text: str):
    s = sanitise(text)
    pos_hits = sum(1 for rx in POS_RE if rx.search(s))
    neg_hits = sum(1 for rx in NEG_RE if rx.search(s))
    if pos_hits and not neg_hits:
        sentiment = "positive"
    elif neg_hits and not pos_hits:
        sentiment = "negative"
    elif pos_hits and neg_hits:
        sentiment = "positive" if pos_hits > neg_hits else "negative"
    else:
        sentiment = "neutral"
    return sentiment, pos_hits, neg_hits


union_cols = []
seen = set()
for path in INPUT_FILES:
    if not os.path.exists(path):
        continue
    try:
        cols = list(pd.read_csv(path, engine="python", on_bad_lines="skip",
                                dtype=str, encoding="utf-8", nrows=0).columns)
    except Exception:
        with open(path, newline="", encoding="utf-8") as f:
            cols = next(csv.reader(f), [])
    for c in cols:
        if c not in seen:
            seen.add(c)
            union_cols.append(c)

union_cols = [c for c in union_cols if c not in SCORING_FIELDS]
HITS_FIELDS = dedup(union_cols + SCORING_FIELDS)

hits_fh = open(HITS_OUTPUT, "w", newline="", encoding="utf-8")
hits_writer = csv.DictWriter(hits_fh, fieldnames=HITS_FIELDS, extrasaction="ignore")
hits_writer.writeheader()


for path in INPUT_FILES:
    if not os.path.exists(path):
        print(f"Warning: {path} not found; skipping.")
        continue

    in_path = Path(path)
    out_path = in_path.with_name(in_path.stem + PER_FILE_SUFFIX)
    print(f"\nScoring {in_path.name} > {out_path.name}")

    processed = 0
    per_file_writer = None
    per_file_fh = open(out_path, "w", newline="", encoding="utf-8")

    try:
        for chunk in pd.read_csv(
            in_path,
            engine="python",
            on_bad_lines="skip",
            dtype=str,
            encoding="utf-8",
            chunksize=CHUNK,
        ):
            text_col = "textOriginal"

            if per_file_writer is None:
                file_fields = dedup(list(chunk.columns) + ["sentiment", "pos_hits", "neg_hits"])
                per_file_writer = csv.DictWriter(per_file_fh, fieldnames=file_fields, extrasaction="ignore")
                per_file_writer.writeheader()

            for _, row in chunk.iterrows():
                sentiment, pos_hits, neg_hits = score_text(row.get(text_col, ""))

                out_row = {k: row.get(k, "") for k in per_file_writer.fieldnames if k in row.index}
                out_row.update({
                    "sentiment": sentiment,
                    "pos_hits": pos_hits,
                    "neg_hits": neg_hits,
                })
                per_file_writer.writerow(out_row)

                if sentiment != "neutral":
                    hits_row = {k: row.get(k, "") for k in union_cols}
                    hits_row.update({
                        "sentiment": sentiment,
                        "pos_hits": pos_hits,
                        "neg_hits": neg_hits,
                        "source_file": in_path.name,
                    })
                    hits_writer.writerow(hits_row)

                processed += 1
                if processed % PRINT_EVERY == 0:
                    print(f"  [{in_path.name}] processed {processed:,} lines", flush=True)
    finally:
        per_file_fh.close()

    print(f"Done: wrote {processed:,} rows to {out_path.name}")

hits_fh.close()
print(f"\nWrote combined positive/negative hits to {HITS_OUTPUT}")


In [None]:
class PositivityDataset(Dataset):
    def __init__(self, csv_file, tokenizer, max_length=128,
                 text_col="textOriginal", label_col="sentiment"):
        

        df = pd.read_csv(csv_file, engine="python", on_bad_lines="skip", dtype=str)
        df.columns = [c.strip() if isinstance(c, str) else c for c in df.columns]

        if text_col not in df.columns or label_col not in df.columns:
            raise ValueError(f"Expected columns '{text_col}' and '{label_col}' in {csv_file}. "
                             f"Found: {list(df.columns)}")

        mapping = {"positive": 1, "negative": 0}
        df["label"] = df[label_col].str.strip().str.lower().map(mapping)

        df[text_col] = df[text_col].fillna("").astype(str)
        df = df[(df["label"].isin([0, 1])) & (df[text_col].str.strip() != "")]

        self.texts = df[text_col].tolist()
        self.labels = df["label"].astype(int).tolist()

        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        inputs = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        input_ids = inputs["input_ids"].squeeze(0)
        attention_mask = inputs["attention_mask"].squeeze(0)
        label = torch.tensor(label, dtype=torch.float).unsqueeze(0)  # shape (1,)

        return input_ids, attention_mask, label


In [None]:
max_length = 128
batch_size = 32

all_df = pd.read_csv("positivity_hits_all.csv", engine="python", on_bad_lines="skip", dtype=str)
all_df = all_df[["textOriginal", "sentiment"]].dropna()
train_df, test_df = train_test_split(
    all_df, test_size=0.2, stratify=all_df["sentiment"].str.strip().str.lower(), random_state=42
)
train_df.to_csv("positivity_hits_train.csv", index=False)
test_df.to_csv("positivity_hits_test.csv", index=False)

dataset_train = PositivityDataset("positivity_hits_train.csv", tokenizer, max_length=max_length)
dataset_test  = PositivityDataset("positivity_hits_test.csv",  tokenizer, max_length=max_length)

P_loader_train = DataLoader(dataset_train, batch_size, shuffle=True)
P_loader_test  = DataLoader(dataset_test,  batch_size, shuffle=False)

In [None]:
class PositivityScorer(nn.Module):
    def __init__(self, transformer, hidden_dim, device='cpu'):
        super().__init__()
        self.device = device
        
        self.transformer = transformer
        transformer_output_dim = transformer.config.hidden_size

        for param in self.transformer.parameters():
            param.requires_grad = False

        self.classifier = nn.Sequential(
            nn.Linear(transformer_output_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )

        self.to(device=device)

    def forward(self, input_ids, attention_mask):
        out = self.transformer(input_ids=input_ids, attention_mask=attention_mask)
        cls_token = out.last_hidden_state[:, 0, :]  # CLS token
        out = self.classifier(cls_token)
        return out
    

    def Train(self, epochs, optimizer, loader_train, loader_test, verbose=True):
        self.loss_train_log = []
        self.loss_test_log = []
        self.best_loss = np.inf
        loss_fn = nn.BCELoss()

        for epoch in range(epochs):
            self.train() 
            epoch_loss = 0

            # Step (1)        
            for input_ids, attention_mask, labels in loader_train:
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)
                labels = labels.float().to(self.device)

                optimizer.zero_grad()
                outputs = self.forward(input_ids, attention_mask)
                loss = loss_fn(outputs, labels) 

                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()

            avg_train_loss = epoch_loss / len(loader_train)
            avg_test_loss = self.evaluate(loader_test)

            self.loss_train_log.append(avg_train_loss)
            self.loss_test_log.append(avg_test_loss)
            
            if verbose:
                print('Epochs %d/%d' % (epoch+1, epochs))
                print('Train Loss = %.4f' % avg_train_loss, end=', ')
                print('Val Loss = %.4f' % avg_test_loss)

        # Step (5) save the best model
            if avg_test_loss < self.best_loss:
                self.best_loss = avg_test_loss
                self.best_epoch = epoch + 1
                torch.save(self.state_dict(), 'positivity_best_params.pt')
        
        print(f'Best model saved at epoch {self.best_epoch} with loss {self.best_loss:.4f}.')


    def evaluate(self, loader):
        self.eval() 
        loss_fn = nn.BCELoss()
        loss = 0
        total_loss = 0
        
        with torch.no_grad():

            total_loss = 0

            for input_ids, attention_mask, labels in loader:
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)
                labels = labels.float().to(self.device)

                outputs = self.forward(input_ids, attention_mask)
                loss = loss_fn(outputs, labels)
                total_loss += loss.item()

        loss = total_loss / len(loader)
        return loss
    
    def predict(self, loader):
        self.eval() 

        x_all, y_all, pred = [], [], [] 
        with torch.no_grad():
            for input_ids, attention_mask, labels in loader:
                input_ids = input_ids.long().to(self.device)
                attention_mask = attention_mask.long().to(self.device)

                outputs = self.forward(input_ids, attention_mask)
                preds = (outputs > 0.5).long().cpu().numpy()

                x_all.extend(input_ids.cpu().numpy())
                y_all.extend(labels.cpu().numpy())
                pred.extend(preds)

            return x_all, y_all, pred

In [None]:
P_lr = 1e-3
P_epochs = 10
P_model = PositivityScorer(transformer = transformer_model, hidden_dim = transformer_model.config.hidden_dim)
P_optimizer = torch.optim.Adam(P_model.parameters(), lr=lr)
P_model.Train(P_epochs, P_optimizer, P_loader_train, P_loader_test)

In [None]:
# Previous Output: 

# Epochs 1/10
# Train Loss = 0.1570, Val Loss = 0.1190
# Epochs 2/10
# Train Loss = 0.1254, Val Loss = 0.1245
# Epochs 3/10
# Train Loss = 0.1187, Val Loss = 0.1600
# Epochs 4/10
# Train Loss = 0.1159, Val Loss = 0.1237
# Epochs 5/10
# Train Loss = 0.1062, Val Loss = 0.1282
# Epochs 6/10
# Train Loss = 0.1038, Val Loss = 0.1084
# Epochs 7/10
# Train Loss = 0.1027, Val Loss = 0.1073
# Epochs 8/10
# Train Loss = 0.0989, Val Loss = 0.1084
# Epochs 9/10
# Train Loss = 0.0918, Val Loss = 0.1024
# Epochs 10/10
# Train Loss = 0.0887, Val Loss = 0.0976
# Best model saved at epoch 10 with loss 0.0976.

In [None]:
FILES = [
    "comments1_filtered_english.csv",
    "comments2_filtered_english.csv",
    "comments3_filtered_english.csv",
    "comments4_filtered_english.csv",
    "comments5_filtered_english.csv",
]
N_PER_FILE   = 2000
PRINT_EVERY  = 20
MAX_LENGTH   = 128
PRETRAINED   = "distilbert-base-uncased"
BEST_WEIGHTS = "positivity_best_params.pt"
OUTPUT_CSV   = "positivity_scored_batch.csv"

TEXT_COL_CANDIDATES = ["textOriginal"]  

tokenizer = AutoTokenizer.from_pretrained(PRETRAINED)
transformer_model = AutoModel.from_pretrained(PRETRAINED)

state = torch.load(BEST_WEIGHTS, map_location="cpu")

P_model.load_state_dict(state, strict=True)
P_model.eval()


def rescale_positivity(arr, lo=0.35):
    a = np.asarray(arr, dtype=float)
    # piecewise linear
    below = a < lo
    above = ~below
    out = np.empty_like(a, dtype=float)
    out[below] = (a[below] / lo) * 0.5
    out[above] = 0.5 + ((a[above] - lo) / (1.0 - lo)) * 0.5
    # numeric safety
    np.clip(out, 0.0, 1.0, out=out)
    return out

def predict_scores(texts, batch_size=BATCH_SIZE):
    scores = []
    n = len(texts)
    for i in range(0, n, batch_size):
        chunk = texts[i:i+batch_size]
        enc = tokenizer(
            chunk,
            truncation=True,
            padding=True,
            max_length=MAX_LENGTH,
            return_tensors="pt"
        )
        input_ids = enc["input_ids"].to(device)
        attention_mask = enc["attention_mask"].to(device)
        probs = model(input_ids, attention_mask)         
        scores.extend(probs.detach().cpu().tolist())

        done = min(i + batch_size, n)
        if done % PRINT_EVERY == 0:
            print(f"processed {done:,}/{n:,}")
    return scores


all_results = []

for fpath in FILES:
    if not os.path.exists(fpath):
        print(f"Warning: {fpath} not found; skipping.")
        continue

    df = pd.read_csv(fpath, engine="python", on_bad_lines="skip", dtype=str, encoding="utf-8", nrows=N_PER_FILE)
    if df.empty:
        print(f"{fpath}: no rows read; skipping.")
        continue

    text_col = pick_text_col(df)
    df[text_col] = df[text_col].fillna("").astype(str)

    n = len(df)
    block_scores = []
    for start in range(0, n, PRINT_EVERY):
        end = min(start + PRINT_EVERY, n)
        texts_block = df[text_col].iloc[start:end].tolist()
        block_scores.extend(predict_scores(texts_block))
        print(f"[{os.path.basename(fpath)}] processed {end}/{n}")

    raw = np.array(block_scores[:n], dtype=float)
    scaled = rescale_positivity(raw, lo=0.35)

    df_out = df.copy()
    df_out["positivity_score_raw"] = raw.astype(float)
    df_out["positivity_score"] = scaled.astype(float)

    # Keep threshold at 0.5 on the *scaled* score
    df_out["predicted_sentiment"] = (df_out["positivity_score"] >= 0.5).map({True: "positive", False: "negative"})
    df_out["source_file"] = os.path.basename(fpath)

    all_results.append(df_out)



results = pd.concat(all_results, ignore_index=True)
results["positivity_score_raw"] = results["positivity_score_raw"].map(lambda x: float(f"{x:.6f}"))
results["positivity_score"]     = results["positivity_score"].map(lambda x: float(f"{x:.6f}"))

results.to_csv("positivity_scored_batch.csv", index=False, encoding="utf-8")
print(f"\nSaved {len(results)} scored rows to positivity_scored_batch.csv")



In [None]:
# Positivity was slightly skeweed, so I manually shifted it

INPUT_CSV = "positivity_scored_batch.csv"
PIVOT = 0.35


def rescale_positivity(arr, lo=0.35):
    a = np.asarray(arr, dtype=float)
    out = np.empty_like(a, dtype=float)
    below = a < lo
    out[below] = (a[below] / lo) * 0.5
    out[~below] = 0.5 + ((a[~below] - lo) / (1.0 - lo)) * 0.5
    np.clip(out, 0.0, 1.0, out=out)
    return out

SCORE_CANDIDATES = ["positivity_score_raw", "positivity_score", "quality_score", "quality_score_raw"]

df = pd.read_csv(INPUT_CSV, engine="python", on_bad_lines="skip", dtype=str)
score_col = next((c for c in SCORE_CANDIDATES if c in df.columns), None)

raw = pd.to_numeric(df[score_col], errors="coerce").fillna(0.0).to_numpy(dtype=float)
df["positivity_score_raw"] = raw
df["positivity_score"] = rescale_positivity(raw, lo=PIVOT)

df["predicted_sentiment"] = np.where(df["positivity_score"] >= 0.5, "positive", "negative")

root, ext = os.path.splitext(INPUT_CSV)
output_csv = f"{root}_rescaled{ext}"
df.to_csv(output_csv, index=False, encoding="utf-8")

pos = int((df["positivity_score"] >= 0.5).sum())
neg = len(df) - pos
print(f"Saved: {output_csv}  |  positives={pos:,}  negatives={neg:,}")


### Relevance in terms of product interest

In [None]:
INPUT_FILES = [
    f"comments{i}_filtered_english_with_video_details_tagged_positivity_quality.csv"
    for i in (1, 2, 3, 4, 5)
]

LOREAL_BRANDS = {
    "l'oreal": [r"\bl['’]?or[ée]al\b", r"\bloreal\b"],
    "l'oreal paris": [r"\bl['’]?or[ée]al\s+paris\b"],
    "garnier": [r"\bgarnier\b"],
    "maybelline": [r"\bmaybelline\b", r"\bmaybelline\s+new\s+york\b"],
    "nyx": [r"\bnyx\b", r"\bnyx\s+professional\s+makeup\b"],
    "essie": [r"\bessie\b"],
    "lancome": [r"\blanc[ôo]me\b"],
    "ysl beauty": [r"\b(yves\s+saint\s+laurent|ysl)\b", r"\bysl\s+beauty\b"],
    "giorgio armani / armani beauty": [r"\barmani\b", r"\bgiorgio\s+armani\b", r"\barmani\s+beauty\b"],
    "valentino beauty": [r"\bvalentino\b"],
    "prada beauty": [r"\bprada\b"],
    "mugler": [r"\bmugler\b"],
    "azzaro": [r"\bazzaro\b"],
    "maison margiela": [r"\bmaison\s+margiela\b", r"\bmargiela\b"],
    "ralph lauren fragrances": [r"\bralph\s+lauren\b"],
    "diesel fragrances": [r"\bdiesel\b"],
    "cacharel": [r"\bcacharel\b"],
    "urban decay": [r"\burban\s+decay\b"],
    "kiehl's": [r"\bkiehl['’]s\b", r"\bkiehls\b"],
    "helena rubinstein": [r"\bhelena\s+rubinstein\b"],
    "it cosmetics": [r"\bit\s+cosmetics\b"],
    "biotherm": [r"\bbiotherm\b"],
    "aesop": [r"\baesop\b"],
    "la roche-posay": [r"\bla\s*roche[-\s]*posay\b", r"\blrp\b"],
    "vichy": [r"\bvichy\b"],
    "skinceuticals": [r"\bskinceuticals\b"],
    "cerave": [r"\bcer[aà]?\s*ve\b", r"\bcerave\b"],
    "thayers": [r"\bthayers\b"],
    "youth to the people": [r"\byouth\s+to\s+the\s+people\b"],
    "l'oreal professionnel": [r"\bl['’]?or[ée]al\s+professionnel\b"],
    "kerastase": [r"\bk[ée]rastase\b", r"\bkerastase\b"],
    "redken": [r"\bredken\b"],
    "matrix": [r"\bmatrix\b"],
    "pureology": [r"\bpureology\b"],
    "mizani": [r"\bmizani\b"],
    "biolage": [r"\bbiolage\b"],
    "shu uemura": [r"\bshu\s+uemura\b"],
}

OTHER_BEAUTY = {
    "fenty": [r"\bfenty\b", r"\bfenty\s+(beauty|skin)\b"],
    "glossier": [r"\bglossier\b"],
    "nars": [r"\bnars\b"],
    "huda beauty": [r"\bhuda\s+beauty\b", r"\bhuda\b"],
    "rare beauty": [r"\brare\s*beauty\b", r"\brarebeauty\b"],
    "charlotte tilbury": [r"\bcharlotte\s+tilbury\b"],
    "milk makeup": [r"\bmilk\s+makeup\b"],
    "olaplex": [r"\bolaplex\b"],
    "drunk elephant": [r"\bdrunk\s+elephant\b"],
    "the ordinary": [r"\bthe\s+ordinary\b"],
    "kosas": [r"\bkosas\b"],
    "clinique": [r"\bclinique\b"],
    "estee lauder": [r"\best[ée]e\s+lauder\b", r"\bestee\s+lauder\b"],
    "shiseido": [r"\bshiseido\b"],
    "too faced": [r"\btoo\s+faced\b"],
    "tatcha": [r"\btatcha\b"],
    "bobbi brown": [r"\bbobbi\s+brown\b"],
    "la mer": [r"\bla\s+mer\b"],
    "pat mcgrath": [r"\bpat\s+mcgrath\b"],
    "colourpop": [r"\bcolour\s*pop\b", r"\bcolourpop\b"],
    "morphe": [r"\bmorphe\b"],
    "kvd beauty": [r"\bkvd\b", r"\bkvd\s+beauty\b"],
    "benefit": [r"\bbenefit\b"],
    "tarte": [r"\btarte\b"],
    "e.l.f.": [r"\be\.?l\.?f\.?\b", r"\belf\b"],
    "dior": [r"\bdior\b"],
    "chanel": [r"\bchanel\b"],
    "hourglass": [r"\bhourglass\b"],
    "laura mercier": [r"\blaura\s+mercier\b"],
    "natasha denona": [r"\bnatasha\s+denona\b"],
    "skims": [r"\bskims\b"],
    "dyson": [r"\bdyson\b"],
    "ghd": [r"\bghd\b"],
    "olay": [r"\bolay\b"],
    "neutrogena": [r"\bneutrogena\b"],
    "sephora": [r"\bsephora\b"],
    "ulta": [r"\bulta\b"],
}

LOREAL_PATTERNS = {k: [re.compile(p, re.I) for p in v] for k, v in LOREAL_BRANDS.items()}
OTHER_PATTERNS  = {k: [re.compile(p, re.I) for p in v] for k, v in OTHER_BEAUTY.items()}

INTENT_PATTERNS = [
    r"\bhow\s+much\b", r"\bprice\b", r"\bcost\b", r"\bmsrp\b", r"[$£€]\s*\d",
    r"\brelease\b", r"\blaunch\b", r"\bcoming\s+out\b", r"\bdrop\b", r"\bavailable\b",
    r"\bback\s+in\s+stock\b", r"\brestock\b",
    r"\bwhere\s+(can\s+)?(i\s+)?(buy|get|purchase)\b",
    r"\bwhere\s+to\s+(buy|get|purchase|cop)\b",
    r"\bbuy\b", r"\bpurchase\b", r"\border\b",
    r"\blink\b", r"\burl\b", r"link\s*(pls|please)?",
    r"\bdeliver(y|ies|ing)?\b", r"\bship(ping|s|ped)?\b",
    r"\bavailable\s+in\b",
    r"\b(in|to)\s+(US|USA|UK|Canada|Australia|India|Singapore|Malaysia|EU|Europe)\b",
]
INTENT_REGEXES = [re.compile(p, re.I) for p in INTENT_PATTERNS]

COMMENT_COLS = ["textOriginal", "comment_text", "text"]
def pick_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

def norm_text(x):
    if x is None or (isinstance(x, float) and pd.isna(x)):
        return ""
    return str(x).lower()

def any_match(pattern_dict, text):
    hits = []
    for name, regs in pattern_dict.items():
        if any(r.search(text) for r in regs):
            hits.append(name)
    return hits

def has_intent(text):
    return any(rx.search(text) for rx in INTENT_REGEXES)

def out_name_from_in(path: str) > str:
    base = Path(path).name
    m = re.search(r"comments(\d+)", base)
    if m:
        return f"comments{m.group(1)}_filtered_all_comment_metrics.csv"
    return base.replace(".csv", "_all_comment_metrics.csv")


for path in INPUT_FILES:
    if not Path(path).exists():
        print(f"Missing: {path} (skipping)")
        continue

    df = pd.read_csv(path, engine="python", on_bad_lines="skip", dtype=str)
    if df.empty:
        print(f"Empty file: {path}")
        continue

    comment_col = pick_col(df, COMMENT_COLS)

    text_lc = df[comment_col].fillna("").astype(str).map(str).str.lower()

    loreal_hits = text_lc.apply(lambda t: any_match(LOREAL_PATTERNS, t))
    other_hits  = text_lc.apply(lambda t: any_match(OTHER_PATTERNS, t))
    intent_hit  = text_lc.apply(lambda t: has_intent(t))

    rel = []
    rel_rule = []
    for lhits, ohits, ih in zip(loreal_hits, other_hits, intent_hit):
        if lhits:
            rel.append(1.0)
            rel_rule.append("loreal_brand")
        elif ohits:
            rel.append(0.75)
            rel_rule.append("other_beauty_brand")
        elif ih:
            rel.append(0.5)
            rel_rule.append("buy_intent")
        else:
            rel.append(0.0)
            rel_rule.append("none")

    out = df.copy()
    out["relevance"] = rel
    out["relevance_rule"] = rel_rule 

    out_path = out_name_from_in(path)
    out.to_csv(out_path, index=False, encoding="utf-8")
    print(f"{Path(path).name} → {out_path} | wrote {len(out):,} rows")


### Tracking comment replies

In [None]:
INPUTS = [f"comments{i}_filtered_all_comment_metrics.csv" for i in (1, 2, 3, 4, 5)]
CHUNKSIZE = 10_000

def dedup_preserve_order(lst):
    seen, out = set(), []
    for x in lst:
        if x not in seen:
            out.append(x)
            seen.add(x)
    return out

def build_global_children_lookup(paths):
    mapping = defaultdict(list)
    for path in paths:
        p = Path(path)
        if not p.exists():
            print(f"Missing: {p.name} (skip in mapping)")
            continue

        usecols = ["commentId", "parentCommentId"]
        for ch in pd.read_csv(p, engine="python", on_bad_lines="skip", dtype=str,
                              usecols=usecols, chunksize=CHUNKSIZE):
            if ch.empty:
                continue
            ch["commentId"] = ch["commentId"].fillna("").astype(str).str.strip()
            ch["parentCommentId"] = ch["parentCommentId"].fillna("").astype(str).str.strip()
            ch = ch[ch["parentCommentId"] != ""]
            if ch.empty:
                continue
            for parent, grp in ch.groupby("parentCommentId")["commentId"]:
                mapping[parent].extend(grp.tolist())

    for k, v in list(mapping.items()):
        mapping[k] = dedup_preserve_order(v)

    lookup_df = (
        pd.DataFrame({
            "parentCommentId": list(mapping.keys()),
            "num_replies": [len(v) for v in mapping.values()],
            "reply_comment_ids": [",".join(v) for v in mapping.values()],
        })
        .sort_values("num_replies", ascending=False)
        .reset_index(drop=True)
    )
    return mapping, lookup_df

def enrich_with_global_mapping(path, mapping, out_suffix="_with_replies_global.csv"):
    p = Path(path)
    if not p.exists():
        print(f"Missing: {p.name} (skip enrich)")
        return

    out_path = p.with_name(p.stem + out_suffix)
    wrote_header = False
    orig_cols_order = None

    for ch in pd.read_csv(p, engine="python", on_bad_lines="skip", dtype=str, chunksize=CHUNKSIZE):
        if ch.empty:
            continue

        if orig_cols_order is None:
            orig_cols_order = list(ch.columns)

        ch["commentId"] = ch["commentId"].fillna("").astype(str).str.strip()
        ch["num_replies"] = ch["commentId"].map(lambda cid: len(mapping.get(cid, []))).astype(int)
        ch["reply_comment_ids"] = ch["commentId"].map(lambda cid: " | ".join(mapping.get(cid, [])))

        new_cols = ["num_replies", "reply_comment_ids"]
        ch = ch.reindex(columns=orig_cols_order + [c for c in new_cols if c not in orig_cols_order])

        ch.to_csv(out_path, mode=("a" if wrote_header else "w"),
                  header=(not wrote_header), index=False, encoding="utf-8")
        wrote_header = True

    print(f"{p.name} → {out_path.name}")

global_map, global_lookup = build_global_children_lookup(INPUTS)
global_lookup.to_csv("comments_global_reply_lookup.csv", index=False, encoding="utf-8")
print(f"Wrote global lookup: comments_global_reply_lookup.csv ({len(global_lookup):,} parents)")

for path in INPUTS:
    enrich_with_global_mapping(path, global_map)


### Comment Scoring

In [None]:
INPUT_TPL = "comments{idx}_filtered_all_comment_metrics_with_replies_global.csv"
OUTPUT_TPL = "comments{idx}_scored.csv"
INDEXES = [1, 2, 3, 4, 5]
CHUNKSIZE = 5000  
ROUND = 6             


# Helpers

def coerce_int(s):
    x = pd.to_numeric(s, errors="coerce")
    x = x.fillna(0).astype(float)  
    return x.astype(int)

def coerce_float(s, default=1.0):
    x = pd.to_numeric(s, errors="coerce")
    x = x.fillna(default)
    return x.astype(float)

def parse_dt(s):
    return pd.to_datetime(s, errors="coerce", utc=True)

def like_score(likes: pd.Series) > pd.Series:
    s = pd.Series(0.0, index=likes.index, dtype=float)
    s = s.mask((likes >= 20)   & (likes <= 99),   0.025)
    s = s.mask((likes >= 100)  & (likes <= 499),  0.050)
    s = s.mask((likes >= 500)  & (likes <= 2499), 0.075)
    s = s.mask((likes >= 2500),                  0.100)
    return s.fillna(0.0)

def published_updated_score(pub_dt: pd.Series, upd_dt: pd.Series) > pd.Series:
    diffs = (pub_dt.notna() & upd_dt.notna() & (pub_dt != upd_dt))
    return diffs.astype(float) * 0.08

def video_comment_window_score(video_dt: pd.Series, cmt_dt: pd.Series) > pd.Series:
    delta = (cmt_dt - video_dt)
    cond = video_dt.notna() & cmt_dt.notna() & (delta.dt.total_seconds() >= 0) & (delta.dt.total_seconds() <= 48 * 3600)
    return cond.astype(float) * 0.07

def relevance_add_score(rel: pd.Series) > pd.Series:
    r = coerce_float(rel, default=np.nan).round(2)
    out = pd.Series(0.0, index=r.index, dtype=float)
    out = out.mask(r == 1.00, 0.15)
    out = out.mask(r == 0.75, 0.15)
    out = out.mask(r == 0.50, 0.10)
    return out.fillna(0.0)

def replies_score(nrep: pd.Series) > pd.Series:
    n = coerce_int(nrep)
    s = pd.Series(0.0, index=n.index, dtype=float)
    s = s.mask((n >= 1) & (n <= 2), 0.05)
    s = s.mask((n >= 3) & (n <= 4), 0.075)
    s = s.mask((n >= 5),           0.10)
    return s.fillna(0.0)

def ensure_col(df, wanted): 
    if getattr(df, "_stripped_cols_done", False) is False:
        df.columns = [c.strip() if isinstance(c, str) else c for c in df.columns]
        df._stripped_cols_done = True
    return wanted if wanted in df.columns else None

def pick_video_published_col(df):
    
    for name in ["video_publishedAt", "videoPublishedAt", "publishedAt_video", "video_publishedAt_utc"]:
        if ensure_col(df, name):
            return name
    return None

for idx in INDEXES:
    in_path  = INPUT_TPL.format(idx=idx)
    out_path = OUTPUT_TPL.format(idx=idx)

    if not Path(in_path).exists():
        print(f"Skip (missing): {in_path}")
        continue

    print(f"\nScoring {in_path} → {out_path}")

    wrote_header = False
    orig_cols_order = None

    for chunk in pd.read_csv(in_path, engine="python", on_bad_lines="skip", dtype=str, chunksize=CHUNKSIZE):
        if chunk.empty:
            continue

        chunk.columns = [c.strip() if isinstance(c, str) else c for c in chunk.columns]
        if orig_cols_order is None:
            orig_cols_order = list(chunk.columns)

        like_col  = ensure_col(chunk, "likeCount")
        pub_col   = ensure_col(chunk, "publishedAt")
        upd_col   = ensure_col(chunk, "updatedAt")
        nrep_col  = ensure_col(chunk, "num_replies")
        rel_col   = ensure_col(chunk, "relevance")
        q_col     = ensure_col(chunk, "quality_score")       # multiplier
        p_col     = ensure_col(chunk, "positivity_score")    # multiplier

        vid_pub_col = pick_video_published_col(chunk)

        # Base score
        base = pd.Series(0.5, index=chunk.index, dtype=float)

        # likeCountScore
        likes = coerce_int(chunk[like_col]) if like_col else pd.Series(0, index=chunk.index, dtype=int)
        likeScore = like_score(likes)

        # publishedVsUpdatedScore
        if pub_col and upd_col:
            pub_dt = parse_dt(chunk[pub_col])
            upd_dt = parse_dt(chunk[upd_col])
            puScore = published_updated_score(pub_dt, upd_dt)
        else:
            puScore = pd.Series(0.0, index=chunk.index, dtype=float)

        # videoPublishedCommentPublishedScore
        if vid_pub_col and pub_col:
            vid_dt = parse_dt(chunk[vid_pub_col])
            cmt_dt = parse_dt(chunk[pub_col])
            vcScore = video_comment_window_score(vid_dt, cmt_dt)
        else:
            vcScore = pd.Series(0.0, index=chunk.index, dtype=float)

        # relevanceScore
        relScore = relevance_add_score(chunk[rel_col]) if rel_col else pd.Series(0.0, index=chunk.index, dtype=float)

        # replyCountScore
        repScore = replies_score(chunk[nrep_col]) if nrep_col else pd.Series(0.0, index=chunk.index, dtype=float)

        # Sum additive parts
        intermediate = base + likeScore + puScore + vcScore + relScore + repScore

        # Multipliers
        q_mult = coerce_float(chunk[q_col], default=1.0) if q_col else pd.Series(1.0, index=chunk.index, dtype=float)
        p_mult = coerce_float(chunk[p_col], default=1.0) if p_col else pd.Series(1.0, index=chunk.index, dtype=float)

        final = intermediate * q_mult * p_mult

        out = chunk.copy()
        out["likeCountScore"]                       = likeScore.round(ROUND)
        out["publishedVsUpdatedScore"]              = puScore.round(ROUND)
        out["videoPublishedCommentPublishedScore"]  = vcScore.round(ROUND)
        out["relevanceScore"]                       = relScore.round(ROUND)
        out["replyCountScore"]                      = repScore.round(ROUND)
        out["intermediateScore"]                    = intermediate.round(ROUND)
        out["commentScorefinal"]                    = final.round(ROUND)

        new_cols = [
            "likeCountScore",
            "publishedVsUpdatedScore",
            "videoPublishedCommentPublishedScore",
            "relevanceScore",
            "replyCountScore",
            "intermediateScore",
            "commentScorefinal",
        ]
        out = out.reindex(columns=orig_cols_order + [c for c in new_cols if c not in orig_cols_order])

        out.to_csv(out_path, mode=("a" if wrote_header else "w"),
                   header=(not wrote_header), index=False, encoding="utf-8")
        wrote_header = True

    print(f"Done → {out_path}")


### Video Scoring

In [None]:
COMBINE_VIDS = [
    "videos_tagged_algo.csv",
    "videos_tagged_keywords.csv",
    "leftover_videos_tagged.csv",
]

NORMAL_COMMENT_LOOKUP = [f"comments{i}_scored.csv" for i in (1, 2, 3, 4, 5)]

SPAM_COMMENT_LOOKUP = [
    "Quality_scoring/Base_datasets/emoji_spam_comments.csv",
    "Quality_scoring/Base_datasets/keywords_spam.csv",
    "Quality_scoring/Base_datasets/link_spam_comments.csv",
]

VIDEO_ID_COL = "videoId"
SCORE_COL    = "commentScorefinal"

NEG_THRESH    = 0.30   
BOOST_THRESH  = 0.60

CHUNKSIZE_COMMENTS = 200_000
CHUNKSIZE_SPAM     = 200_000

OUT_PATH = "videos_with_comment_metrics.csv"
ROUND = 6

def coerce_float(x):
    v = pd.to_numeric(x, errors="coerce")
    return v.astype(float)

def safe_str(s):
    return "" if s is None or (isinstance(s, float) and math.isnan(s)) else str(s)



vid_frames = []
for path in COMBINE_VIDS:
    if not Path(path).exists():
        print(f"Warning: missing {path} (skipped)")
        continue
    df = pd.read_csv(path, engine="python", on_bad_lines="skip", dtype=str)
    vid_frames.append(df)

videos = pd.concat(vid_frames, ignore_index=True, sort=False)
videos.columns = [c.strip() if isinstance(c, str) else c for c in videos.columns]
videos = videos.drop_duplicates(subset=[VIDEO_ID_COL], keep="first").reset_index(drop=True)
print(f"Combined videos: {len(videos):,} unique by {VIDEO_ID_COL}")


agg = {
    "count_normal": defaultdict(int),
    "sum_scores": defaultdict(float),
    "min_score": defaultdict(lambda: float("inf")),
    "max_score": defaultdict(lambda: float("-inf")),
    "count_lt_neg": defaultdict(int),
    "count_ge_boost": defaultdict(int),
    "scores_pipe": defaultdict(list),  # may be large
}

for cpath in NORMAL_COMMENT_LOOKUP:
    p = Path(cpath)
    if not p.exists():
        print(f"Warning: missing {cpath} (skipped)")
        continue
    print(f"Scanning normal comments: {p.name}")
    for chunk in pd.read_csv(p, engine="python", on_bad_lines="skip",
                             dtype=str, chunksize=CHUNKSIZE_COMMENTS):
        if chunk.empty: 
            continue
        cols = [c.strip() if isinstance(c, str) else c for c in chunk.columns]
        chunk.columns = cols

        vids  = chunk[VIDEO_ID_COL].astype(str)
        scores = coerce_float(chunk[SCORE_COL])

        mask = vids.str.strip().ne("") & scores.notna()
        vids = vids[mask]; scores = scores[mask]

        for v, s in zip(vids.tolist(), scores.tolist()):
            agg["count_normal"][v] += 1
            agg["sum_scores"][v] += s
            if s < agg["min_score"][v]:
                agg["min_score"][v] = s
            if s > agg["max_score"][v]:
                agg["max_score"][v] = s
            if s < NEG_THRESH:
                agg["count_lt_neg"][v] += 1
            if s >= BOOST_THRESH:  
                agg["count_ge_boost"][v] += 1
            agg["scores_pipe"][v].append(f"{s:.{ROUND}f}")



spam_counts = defaultdict(int)
seen_spam_comments = set()  

for spath in SPAM_COMMENT_LOOKUP:
    p = Path(spath)
    if not p.exists():
        print(f"Warning: missing spam file {spath} (skipped)")
        continue
    print(f"Scanning spam: {p.name}")
    for chunk in pd.read_csv(p, engine="python", on_bad_lines="skip",
                             dtype=str, chunksize=CHUNKSIZE_SPAM):
        if chunk.empty: 
            continue
        chunk.columns = [c.strip() if isinstance(c, str) else c for c in chunk.columns]

        if VIDEO_ID_COL not in chunk.columns:
            missing = set(["videoId"]) - set(chunk.columns)
            print(f"  {p.name}: missing {missing}; skipping rows without videoId.")
            continue

        if "commentId" in chunk.columns:
            chunk["commentId"] = chunk["commentId"].astype(str).str.strip()
            chunk = chunk[chunk["commentId"].str.strip().ne("")]
            for cid, vid in zip(chunk["commentId"], chunk[VIDEO_ID_COL].astype(str)):
                key = ("spam", cid)
                if key in seen_spam_comments:
                    continue
                seen_spam_comments.add(key)
                if vid.strip():
                    spam_counts[vid] += 1
        else:
            counts = chunk[VIDEO_ID_COL].astype(str).str.strip().value_counts()
            for vid, n in counts.items():
                if vid:
                    spam_counts[vid] += int(n)



def get_norm(v):
    return agg["count_normal"].get(v, 0)

def get_spam(v):
    return spam_counts.get(v, 0)

def get_total(v):
    return get_norm(v) + get_spam(v)

def get_ratio(v):
    tot = get_total(v)
    return (get_spam(v) / tot) if tot > 0 else 0.0

def mean_score(v):
    n = get_norm(v)
    return (agg["sum_scores"].get(v, 0.0) / n) if n > 0 else np.nan

def min_score(v):
    m = agg["min_score"].get(v, float("inf"))
    return (np.nan if m == float("inf") else m)

def max_score(v):
    m = agg["max_score"].get(v, float("-inf"))
    return (np.nan if m == float("-inf") else m)

def count_lt_neg(v):
    return agg["count_lt_neg"].get(v, 0)

def count_ge_boost(v):
    return agg["count_ge_boost"].get(v, 0)

# Compute columns
vids = videos[VIDEO_ID_COL].astype(str)
videos_out = videos.copy()

videos_out["normal_comment_count"] = vids.map(get_norm).astype(int)
videos_out["spam_comment_count"]   = vids.map(get_spam).astype(int)
videos_out["total_comment_count"]  = vids.map(get_total).astype(int)

videos_out["spam_ratio"]     = vids.map(get_ratio).astype(float).round(ROUND)
videos_out["spam_ratio_pct"] = (videos_out["spam_ratio"] * 100.0).round(2)

videos_out["comment_score_mean"] = vids.map(mean_score).round(ROUND)
videos_out["comment_score_min"]  = vids.map(min_score).round(ROUND)
videos_out["comment_score_max"]  = vids.map(max_score).round(ROUND)

videos_out["num_comments_lt_0_3"]   = vids.map(count_lt_neg).astype(int)
videos_out["num_comments_ge_0_6"]   = vids.map(count_ge_boost).astype(int) 


videos_out.to_csv(OUT_PATH, index=False, encoding="utf-8")
print(f"Wrote {len(videos_out):,} rows to {OUT_PATH}")


In [None]:
INPUT  = "videos_with_comment_metrics.csv"
OUTPUT = "videos_scored.csv"



def _to_int(s, default=0):
    return pd.to_numeric(s, errors="coerce").fillna(default).astype(np.int64)

def _to_float(s, default=np.nan):
    out = pd.to_numeric(s, errors="coerce").astype(float)
    return out if not np.isnan(default) else out



def base_from_views_vec(views: pd.Series) > pd.Series:
    v = views.values
    base = np.where(v < 10_000,       0.45,
            np.where(v < 100_000,     0.48,
            np.where(v < 1_000_000,   0.52, 0.55)))
    return pd.Series(base, index=views.index)


def like_add_and_bucket(like_ratio: pd.Series):
    lr = like_ratio.values
    bucket = np.full(lr.shape, ">=5.00%", dtype=object)
    add    = np.full(lr.shape, 0.25, dtype=float)

    mask3 = (lr < 0.0499)
    bucket[mask3] = "2.50 - 4.99%"
    add[mask3]    = 0.20

    mask2 = (lr < 0.0249)
    bucket[mask2] = "1.0 - 2.49%"
    add[mask2]    = 0.15

    mask1 = (lr < 0.0099)
    bucket[mask1] = "<0.99%"
    add[mask1]    = -0.15

    return pd.Series(add, index=like_ratio.index), pd.Series(bucket, index=like_ratio.index)

def comment_add_and_bucket(comment_ratio: pd.Series):
    cr = comment_ratio.values
    bucket = np.full(cr.shape, ">=0.05%", dtype=object)
    add    = np.full(cr.shape, 0.20, dtype=float)

    mask2 = (cr < 0.00049)
    bucket[mask2] = "0.025 - 0.049%"
    add[mask2]    = 0.15

    mask1 = (cr < 0.000024)
    bucket[mask1] = "<0.024%"
    add[mask1]    = -0.10

    return pd.Series(add, index=comment_ratio.index), pd.Series(bucket, index=comment_ratio.index)

def spam_penalty_and_reason(spam_ratio: pd.Series):
    sr = spam_ratio.values
    pen = np.zeros_like(sr, dtype=float)
    reason = np.where(sr > 0.25, "over 25%", "≤ 25% (none)")
    # slope 0.20 beyond 25%, cap at -0.15
    over = np.maximum(0.0, sr - 0.25)
    pen = -np.minimum(0.15, 0.20 * over)
    return pd.Series(pen, index=spam_ratio.index), pd.Series(reason, index=spam_ratio.index)



def mean_multiplier_asym_series(m: pd.Series, n: pd.Series, K=80, min_mult=0.50, max_mult=1.15):
    n_val = n.values.astype(float)
    m_val = m.values.astype(float)
    mult = np.where((n_val <= 0) | np.isnan(m_val), 0.25, np.nan)

    mask = (n_val > 0) & ~np.isnan(m_val)
    shrink = np.zeros_like(n_val)
    shrink[mask] = np.sqrt(n_val[mask] / (n_val[mask] + K))
    m_adj = np.full_like(m_val, np.nan, dtype=float)
    m_adj[mask] = 0.5 + (m_val[mask] - 0.5) * shrink[mask]

    mult_raw = np.ones_like(m_val, dtype=float)
    abv = mask & (m_adj >= 0.5)
    mult_raw[abv] = 1.0 + 0.15 * ((m_adj[abv] - 0.5) / 0.5)
    bel = mask & (m_adj < 0.5)
    mult_raw[bel] = 1.0 - 0.50 * ((0.5 - m_adj[bel]) / 0.5)

    vol_gate = np.minimum(1.0, np.sqrt(np.maximum(n_val, 0.0)) / 20.0)
    mult_calc = 1.0 + (mult_raw - 1.0) * vol_gate
    mult[mask] = np.clip(mult_calc[mask], min_mult, max_mult)

    return (pd.Series(mult, index=m.index),
            pd.Series(shrink, index=m.index),
            pd.Series(m_adj, index=m.index),
            pd.Series(vol_gate, index=m.index))


def balance_multiplier_asym_series(g: pd.Series, b: pd.Series):
    gi = g.values.astype(float); bi = b.values.astype(float)
    R = (gi + 1.0) / (bi + 1.0)
    t = np.tanh(np.log(R))# [-1, 1]
    w = np.where(t >= 0, 0.06, 0.12) # heavier negative side
    mult = 1.0 + w * t
    return pd.Series(mult, index=g.index), pd.Series(R, index=g.index), pd.Series(t, index=g.index)



def bad_mass_penalty_series(bad_count: pd.Series, normal_count: pd.Series, max_penalty=0.15):
    n = normal_count.values.astype(float)
    b = bad_count.values.astype(float)
    frac_bad = np.divide(b, np.maximum(n, 1.0))
    raw = 0.25 * frac_bad
    vol_gate = np.minimum(1.0, n / 200.0)  
    pen = -np.minimum(max_penalty, raw * vol_gate)
    return pd.Series(pen, index=bad_count.index), pd.Series(frac_bad, index=bad_count.index), pd.Series(vol_gate, index=bad_count.index)

df = pd.read_csv(INPUT, engine="python", on_bad_lines="skip", dtype=str)


views   = _to_int(df.get("viewCount", 0))
likes   = _to_int(df.get("likeCount", 0))
c_rep   = _to_int(df.get("commentCount", 0))
n_norm  = _to_int(df.get("normal_comment_count", 0))
n_spam  = _to_int(df.get("spam_comment_count", 0))
n_total = _to_int(df.get("total_comment_count", n_norm + n_spam)) 

comments_used = pd.Series(np.maximum.reduce([c_rep.values, n_total.values, (n_norm + n_spam).values]),
                          index=df.index)
comments_used_src = np.where(comments_used.values == c_rep.values,   "commentCount(meta)",
                      np.where(comments_used.values == n_total.values, "total_comment_count(observed)",
                               "normal+spam(observed)"))


like_ratio    = likes / np.maximum(views, 1)
comment_ratio = comments_used / np.maximum(views, 1)

if "spam_ratio" in df.columns:
    spam_ratio = _to_float(df["spam_ratio"])
    missing = spam_ratio.isna()
    if missing.any():
        spam_ratio.loc[missing] = (n_spam[missing] / np.maximum(comments_used[missing], 1)).astype(float)
else:
    spam_ratio = (n_spam / np.maximum(comments_used, 1)).astype(float)

base = base_from_views_vec(views)

like_add, like_bucket = like_add_and_bucket(like_ratio)
cmt_add,  cmt_bucket  = comment_add_and_bucket(comment_ratio)
sp_pen,   sp_reason   = spam_penalty_and_reason(spam_ratio)

m_mean = _to_float(df.get("comment_score_mean", np.nan))
g_cnt  = _to_int(df.get("num_comments_ge_0_6", 0))
b_cnt  = _to_int(df.get("num_comments_lt_0_3", 0))

mean_mult, shrink_val, m_adj, mean_vol_gate = mean_multiplier_asym_series(m_mean, n_norm)
bal_mult,  gb_ratio, tanh_lnR               = balance_multiplier_asym_series(g_cnt, b_cnt)
bad_pen,   frac_bad, bad_vol_gate           = bad_mass_penalty_series(b_cnt, n_norm)

pre = base + like_add + cmt_add + sp_pen + bad_pen
pre_clamped = np.clip(pre, 0.0, 1.0)
final = pre_clamped * mean_mult * bal_mult
final = np.clip(final, 0.0, 1.0)

out = df.copy()


out["like_ratio"]                = like_ratio.round(6)
out["like_ratio_bucket"]         = like_bucket

out["comment_ratio"]             = comment_ratio.round(6)
out["comment_ratio_bucket"]      = cmt_bucket

out["comments_balancer_mean"]           = mean_mult.round(6)

out["comments_balancer_over_0.6"]            = bal_mult.round(6)
out["video_score"]             = final.round(6)

orig_cols = list(df.columns)
new_cols = [c for c in out.columns if c not in orig_cols]
out = out[orig_cols + new_cols]

out.to_csv(OUTPUT, index=False, encoding="utf-8")
print(f"Wrote {len(out):,} rows → {OUTPUT}")


### Creator Scoring

In [None]:
#finding all the unique creators in the system 
INPUT  = "videos_scored.csv"
OUTPUT = "unique_video_authors.csv"

def pick_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

def to_int(s):
    return pd.to_numeric(s, errors="coerce").fillna(0).astype(np.int64)

def to_float(s):
    return pd.to_numeric(s, errors="coerce").astype(float)

df = pd.read_csv(INPUT, engine="python", on_bad_lines="skip", dtype=str)

author_col = pick_col(df, ["authorId", "channelId", "channel_id"])

video_id_col = pick_col(df, ["videoId", "video_id"])

cat_col = pick_col(df, ["defining_category", "defining category"])
if cat_col is None:
    df["__no_category__"] = ""
    cat_col = "__no_category__"

views  = to_int(df.get("viewCount", 0))
likes  = to_int(df.get("likeCount", 0))
if "like_ratio" in df.columns:
    like_ratio = to_float(df["like_ratio"]).fillna(0.0)
else:
    like_ratio = (likes / np.maximum(views, 1)).astype(float)

if "comment_ratio" in df.columns:
    comment_ratio = to_float(df["comment_ratio"]).fillna(0.0)
else:
    comments = to_int(df.get("commentCount", 0))
    comment_ratio = (comments / np.maximum(views, 1)).astype(float)

if "spam_ratio" in df.columns:
    spam_ratio = to_float(df["spam_ratio"]).fillna(0.0)
elif "spam_ratio_pct" in df.columns:
    spam_ratio = (to_float(df["spam_ratio_pct"]) / 100.0).fillna(0.0)
else:
    n_spam = to_int(df.get("spam_comment_count", 0))
    n_tot  = to_int(df.get("total_comment_count", 0))
    spam_ratio = (n_spam / np.maximum(n_tot, 1)).astype(float)

video_score = to_float(df["video_score"]).fillna(0.0)

df["_views"]        = views
df["_like_ratio"]   = like_ratio
df["_comment_ratio"]= comment_ratio
df["_spam_ratio"]   = spam_ratio
df["_video_score"]  = video_score

def resolve_main_category(group: pd.DataFrame) > str:
    counts = group[cat_col].fillna("").value_counts()
    if counts.empty or counts.index[0] == "":
        return ""
    top_count = counts.max()
    tied = counts[counts == top_count].index.tolist()
    if len(tied) == 1:
        return tied[0]
    sub = group[group[cat_col].isin(tied)]
    by_views = (
        sub.groupby(cat_col, dropna=False)["_views"]
           .sum()
           .sort_values(ascending=False)
    )
    return by_views.index[0] if len(by_views) else ""

def join_videos(group: pd.DataFrame) > str:
    vids = group.sort_values("_views", ascending=False)[video_id_col].astype(str)
    return "|".join(vids.tolist())

g = df.groupby(author_col, dropna=False)

out = pd.DataFrame({
    "author_id": g.apply(lambda x: x.name),
    "video_count": g.size().astype(int),
    "videos": g.apply(join_videos),
    "avg_video_score": g["_video_score"].mean().round(6),
    "avg_spam_ratio": g["_spam_ratio"].mean().round(6),
    "avg_like_view_ratio": g["_like_ratio"].mean().round(6),
    "avg_comment_view_ratio": g["_comment_ratio"].mean().round(6),
}).reset_index(drop=True)

out["main_category"] = g.apply(resolve_main_category).values

out = out.sort_values(["video_count", "avg_video_score"], ascending=[False, False]).reset_index(drop=True)

out.to_csv(OUTPUT, index=False, encoding="utf-8")
print(f"Wrote {len(out):,} creators → {OUTPUT}")


### Trends Analysis - Outputs

In [None]:
INPUT  = "videos_scored.csv"
OUT_VIDS = "Video_trends/top_combined_titles_by_category_filtered_nodup.csv"
OUT_TAGS = "Video_trends/top_tags_by_category_filtered.csv"
TOP_K  = 20

# thresholds
LIKE_MIN    = 0.05
COMMENT_MIN = 0.0005

CATS = ["makeup","skincare","fragrance","hair","skills","nails","fashion","general lifestyle"]

def pick_col(df, names):
    for n in names:
        if n in df.columns:
            return n
    return None

def to_int(s, default=0):
    return pd.to_numeric(s, errors="coerce").fillna(default).astype(np.int64)

def to_float(s):
    return pd.to_numeric(s, errors="coerce").astype(float)

def normalize_category(s: str) > str:
    if not isinstance(s, str):
        return ""
    s = s.strip().lower().replace("_"," ")
    s = re.sub(r"\s+", " ", s)
    return s

def norm_title(s: str) > str:
    if not isinstance(s, str):
        return ""
    s = s.strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s

def parse_tags(cell: str):
    if not isinstance(cell, str) or not cell.strip():
        return []
    txt = cell.strip().strip("[]")
    parts = re.split(r"[|,]", txt)
    out = []
    for p in parts:
        t = p.strip().strip("'").strip('"')
        t = re.sub(r"\s+", " ", t).lower()
        if t:
            out.append(t)
    return out


df = pd.read_csv(INPUT, engine="python", on_bad_lines="skip", dtype=str)

vid_col    = pick_col(df, ["videoId", "video_id"])
title_col  = pick_col(df, ["title"])
cat_col    = pick_col(df, ["defining_category", "defining category"])
views_col  = pick_col(df, ["viewCount", "views"])
likes_col  = pick_col(df, ["likeCount", "likes"])
tags_col   = pick_col(df, ["video_tags", "tags"])

if "like_ratio" in df.columns:
    like_ratio = to_float(df["like_ratio"]).fillna(0.0)
else:
    views = to_int(df[views_col], 0)
    likes = to_int(df[likes_col], 0)
    like_ratio = (likes / np.maximum(views, 1)).astype(float)

if "comment_ratio" in df.columns:
    comment_ratio = to_float(df["comment_ratio"]).fillna(0.0)
else:
    v   = to_int(df[views_col], 0)
    tot = to_int(df.get("total_comment_count", 0))
    nrm = to_int(df.get("normal_comment_count", 0))
    spm = to_int(df.get("spam_comment_count", 0))
    rep = to_int(df.get("commentCount", 0))
    used = np.maximum.reduce([tot.values, (nrm+spm).values, rep.values])
    comment_ratio = (used / np.maximum(v.values, 1)).astype(float)

df["_cat_norm"]      = df[cat_col].map(normalize_category)
df["_like_ratio"]    = like_ratio
df["_comment_ratio"] = comment_ratio

mask = (df["_like_ratio"] >= LIKE_MIN) & (df["_comment_ratio"] >= COMMENT_MIN)
df_f = df.loc[mask].copy()
df_f["_combined_ratio"] = df_f["_like_ratio"] + df_f["_comment_ratio"]

rows = []

for cat in CATS:
    sub = df_f[df_f["_cat_norm"] == normalize_category(cat)].copy()
    if sub.empty:
        continue

    sub = sub.sort_values("_combined_ratio", ascending=False)

    kept_rows = []
    title_dupe_counts = {}
    seen_titles = set()

    for _, r in sub.iterrows():
        t_norm = norm_title(r.get(title_col, ""))
        if not t_norm:
            continue

        if t_norm in seen_titles:
            title_dupe_counts[t_norm] = title_dupe_counts.get(t_norm, 0) + 1
            continue

        seen_titles.add(t_norm)
        title_dupe_counts.setdefault(t_norm, 0)

        kept_rows.append({
            "category": cat,
            "videoId": r.get(vid_col, ""),
            "title": r.get(title_col, ""),
            "like_ratio": float(r["_like_ratio"]) if pd.notna(r["_like_ratio"]) else np.nan,
            "comment_ratio": float(r["_comment_ratio"]) if pd.notna(r["_comment_ratio"]) else np.nan,
            "combined_ratio": float(r["_combined_ratio"]) if pd.notna(r["_combined_ratio"]) else np.nan,
            "dulicate_videos": 0, 
        })

        if len(kept_rows) >= TOP_K:
            break

    for row in kept_rows:
        row["dulicate_videos"] = title_dupe_counts.get(norm_title(row["title"]), 0)

    rows.extend(kept_rows)

out_vids = pd.DataFrame(rows)
out_vids.to_csv(OUT_VIDS, index=False, encoding="utf-8")
print(f"Wrote {len(out_vids):,} rows → {OUT_VIDS}")

if tags_col is None:
    print("No tags column found; skipping tag leaderboard.")
else:
    df_tags = df_f[[tags_col, "_cat_norm"]].copy()
    df_tags["__tags_list__"] = df_tags[tags_col].apply(parse_tags)

    df_ex = df_tags.explode("__tags_list__")
    df_ex = df_ex[df_ex["__tags_list__"].notna() & (df_ex["__tags_list__"].str.strip() != "")]
    if df_ex.empty:
        print("No usable tags after parsing; skipping tag leaderboard.")
    else:
        tag_counts = (
            df_ex.groupby(["_cat_norm", "__tags_list__"])
                 .size()
                 .reset_index(name="count")
                 .sort_values(["_cat_norm", "count"], ascending=[True, False])
        )

        top_tag_rows = []
        for cat in CATS:
            cat_norm = normalize_category(cat)
            sub = tag_counts[tag_counts["_cat_norm"] == cat_norm].head(20)
            for _, r in sub.iterrows():
                top_tag_rows.append({
                    "category": cat,
                    "tag": r["__tags_list__"],
                    "count": int(r["count"]),
                })

        out_tags = pd.DataFrame(top_tag_rows)
        out_tags.to_csv(OUT_TAGS, index=False, encoding="utf-8")
        print(f"Wrote {len(out_tags):,} tag rows → {OUT_TAGS}")


In [None]:
INPUT  = "unique_video_authors.csv"
OUTPUT = "Video_trends/top_influencers_by_category.csv"

LIKE_MIN    = 0.05 
COMMENT_MIN = 0.0005  

def _norm_cat(s: str) > str:
    if not isinstance(s, str):
        return ""
    s = s.strip().lower().replace("_"," ")
    return re.sub(r"\s+"," ", s)


df = pd.read_csv(INPUT, engine="python", on_bad_lines="skip", dtype=str)

need = [
    "author_id","video_count","videos","avg_video_score",
    "avg_spam_ratio","avg_like_view_ratio","avg_comment_view_ratio","main_category"
]
missing = [c for c in need if c not in df.columns]
print(missing)

df["video_count"]              = pd.to_numeric(df["video_count"], errors="coerce").fillna(0).astype(np.int64)
df["avg_video_score"]          = pd.to_numeric(df["avg_video_score"], errors="coerce").astype(float)
df["avg_spam_ratio"]           = pd.to_numeric(df["avg_spam_ratio"], errors="coerce").astype(float)
df["avg_like_view_ratio"]      = pd.to_numeric(df["avg_like_view_ratio"], errors="coerce").astype(float)
df["avg_comment_view_ratio"]   = pd.to_numeric(df["avg_comment_view_ratio"], errors="coerce").astype(float)

df["_cat_norm"]   = df["main_category"].map(_norm_cat)
df["category"]    = df["main_category"]  # keep original label spelling/case as-is for output

mask = (df["avg_like_view_ratio"] >= LIKE_MIN) & (df["avg_comment_view_ratio"] >= COMMENT_MIN)
df_pass = df.loc[mask].copy()

tops = []
for cat_norm, sub in df_pass.groupby("_cat_norm", dropna=False):
    sub = sub.sort_values(
        by=["avg_video_score","video_count"],
        ascending=[False, False]
    ).head(20).copy()
    sub.insert(0, "rank_in_category", range(1, len(sub) + 1))
    tops.append(sub)

if tops:
    out = pd.concat(tops, ignore_index=True)

    for c in ["avg_like_view_ratio","avg_comment_view_ratio","avg_video_score","avg_spam_ratio"]:
        out[c] = out[c].round(6)

    cols = [
        "rank_in_category","category","author_id","video_count",
        "avg_like_view_ratio","avg_comment_view_ratio","avg_spam_ratio","avg_video_score",
        "videos"
    ]
    out = out[cols]

    out.to_csv(OUTPUT, index=False, encoding="utf-8")
    print(f"Wrote {len(out):,} rows → {OUTPUT}")
else:
    print("No authors met the average ratio thresholds; no top list written.")


In [None]:
INPUT  = "videos_scored.csv"
OUTPUT = "Video_trends/top_video_features_by_category.csv"

CATS = ["makeup","skincare","fragrance","hair","skills","nails","fashion","general lifestyle"]

LIKE_MIN    = 0.05
COMMENT_MIN = 0.0005
SHORTFORM_CUTOFF_SEC = 3 * 60

CAT_COL   = "defining_category"
VIEWS_COL = "viewCount"
LIKES_COL = "likeCount"
CMT_META  = "commentCount"
CMT_NORM  = "normal_comment_count"
CMT_SPAM  = "spam_comment_count"
CMT_TOTAL = "total_comment_count"
DUR_COL   = "contentDuration"

def _to_int(s, default=0):
    return pd.to_numeric(s, errors="coerce").fillna(default).astype(np.int64)

def _norm_cat(s: str) > str:
    if not isinstance(s, str):
        return ""
    s = s.strip().lower().replace("_"," ")
    return re.sub(r"\s+"," ", s)

_iso = re.compile(
    r"^P(?:(?P<days>\d+)D)?(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)?$",
    re.IGNORECASE
)

def iso8601_to_seconds(s):
    if not isinstance(s, str):
        return np.nan
    s = s.strip()
    m = _iso.match(s)
    if not m:
        return np.nan
    d = int(m.group("days") or 0)
    h = int(m.group("hours") or 0)
    mnt = int(m.group("minutes") or 0)
    sec = int(m.group("seconds") or 0)
    return d*86400 + h*3600 + mnt*60 + sec

def series_stats(x: pd.Series):
    if x.empty:
        return (np.nan, np.nan, np.nan, np.nan)
    return (x.min(), x.median(), x.mean(), x.max())

def fmt_mm_ss(val):
    if pd.isna(val):
        return ""
    sec = int(round(float(val)))
    if sec < 60:
        return f"{sec}S"
    m, s = divmod(sec, 60)
    return f"{m}M {s}S"

df = pd.read_csv(INPUT, engine="python", on_bad_lines="skip", dtype=str)

canon = {_norm_cat(c): c for c in CATS}
df["_cat_norm"] = df[CAT_COL].map(_norm_cat)
df = df[df["_cat_norm"].isin(canon.keys())].copy()
df[CAT_COL] = df["_cat_norm"].map(canon)

views = _to_int(df.get(VIEWS_COL, 0))
likes = _to_int(df.get(LIKES_COL, 0))
c_rep = _to_int(df.get(CMT_META, 0))
n_norm = _to_int(df.get(CMT_NORM, 0))
n_spam = _to_int(df.get(CMT_SPAM, 0))
n_total = _to_int(df.get(CMT_TOTAL, 0)) if (CMT_TOTAL in df.columns) else (n_norm + n_spam)

comments_used = pd.Series(
    np.maximum.reduce([c_rep.values, n_total.values, (n_norm + n_spam).values]),
    index=df.index
)

like_ratio    = likes / np.maximum(views, 1)
comment_ratio = comments_used / np.maximum(views, 1)

dur_sec = df.get(DUR_COL, pd.Series(index=df.index, dtype=object)).apply(iso8601_to_seconds)

df["like_ratio"] = like_ratio
df["comment_ratio"] = comment_ratio
df["_dur_sec"] = dur_sec

rows = []
for cat in CATS:
    sub = df[df[CAT_COL] == cat].copy()
    if sub.empty:
        rows.append({
            "category": cat,
            "no_of_top_vids": 0,
            "short_count": 0,
            "long_count": 0,
            "short_to_long_ratio": np.nan,
            "short_min": "", "short_median": "", "short_mean": "", "short_max": "",
            "long_min": "", "long_median": "", "long_mean": "", "long_max": "",
        })
        continue

    ok = (sub["like_ratio"] >= LIKE_MIN) & (sub["comment_ratio"] >= COMMENT_MIN)
    sub_ok = sub.loc[ok].copy()

    if sub_ok.empty:
        rows.append({
            "category": cat,
            "no_of_top_vids": 0,
            "short_count": 0,
            "long_count": 0,
            "short_to_long_ratio": np.nan,
            "short_min": "", "short_median": "", "short_mean": "", "short_max": "",
            "long_min": "", "long_median": "", "long_mean": "", "long_max": "",
        })
        continue

    sub_ok["_eng_key"] = sub_ok["like_ratio"].astype(float) + sub_ok["comment_ratio"].astype(float)
    sub_ok["_views"] = views.loc[sub_ok.index].astype(int)
    sub_top = sub_ok.sort_values(by=["_eng_key","_views"], ascending=[False, False]).head(300).copy()

    n_top = len(sub_top)

    short_mask = (sub_top["_dur_sec"] <= SHORTFORM_CUTOFF_SEC)
    long_mask  = (sub_top["_dur_sec"] >  SHORTFORM_CUTOFF_SEC)

    short_secs = sub_top.loc[short_mask, "_dur_sec"].dropna().astype(float)
    long_secs  = sub_top.loc[long_mask,  "_dur_sec"].dropna().astype(float)

    short_count = int(short_mask.sum())
    long_count  = int(long_mask.sum())
    ratio = (short_count / long_count) if long_count > 0 else (np.inf if short_count > 0 else np.nan)

    s_min, s_med, s_mean, s_max = series_stats(short_secs)
    l_min, l_med, l_mean, l_max = series_stats(long_secs)

    rows.append({
        "category": cat,
        "no_of_top_vids": n_top,
        "short_count": short_count,
        "long_count": long_count,
        "short_to_long_ratio": round(ratio, 6) if np.isfinite(ratio) else ("inf" if short_count>0 and long_count==0 else np.nan),
    
        "short_min":   fmt_mm_ss(s_min),
        "short_median":fmt_mm_ss(s_med),
        "short_mean":  fmt_mm_ss(s_mean),
        "short_max":   fmt_mm_ss(s_max),
        "long_min":    fmt_mm_ss(l_min),
        "long_median": fmt_mm_ss(l_med),
        "long_mean":   fmt_mm_ss(l_mean),
        "long_max":    fmt_mm_ss(l_max),
    })

out = pd.DataFrame(rows, columns=[
    "category","no_of_top_vids",
    "short_count","long_count","short_to_long_ratio",
    "short_min","short_median","short_mean","short_max",
    "long_min","long_median","long_mean","long_max",
])
out.to_csv(OUTPUT, index=False, encoding="utf-8")
print(f"Wrote {len(out)} rows → {OUTPUT}")


### CommentSense Dashboard Outputs
We could not work on displaying these too well due to time constraints, but it's likely the code from the Trends Analysis section would be used behind a website with better graphics.

In [None]:
# Choose category
CATEGORY = "makeup"  # "makeup","skincare","fragrance","hair","skills","nails","fashion","general lifestyle"

PATH_TAGS      = "Video_trends/top_tags_by_category_filtered.csv"
PATH_TITLES    = "Video_trends/top_combined_titles_by_category_filtered_nodup.csv"
PATH_INFLU     = "Video_trends/top_influencers_by_category.csv"
PATH_FEATURES  = "Video_trends/top_video_features_by_category.csv"

def _lower_norm(s): 
    return str(s).strip().lower()

def _pick_col(df, options, required=True):
    for c in options:
        if c in df.columns:
            return c
    if required:
        raise KeyError(f"None of {options} found. Available: {list(df.columns)}")
    return None

def _as_num(s):
    return pd.to_numeric(s, errors="coerce")

def _wrap(labels, width=28):
    return [textwrap.fill(str(x), width=width) for x in labels]

PASTELS = [
    "#A6CEE3","#B2DF8A","#FDBF6F","#CAB2D6","#FB9A99","#F4C2C2",
    "#CCEBC5","#B3CDE3","#DECBE4","#FED9A6","#FFFFCC","#E5D8BD"
]

df_tags   = pd.read_csv(PATH_TAGS)
df_titles = pd.read_csv(PATH_TITLES)
df_influ  = pd.read_csv(PATH_INFLU)
df_feat   = pd.read_csv(PATH_FEATURES)

for df in (df_tags, df_titles, df_influ, df_feat):
    if "category" not in df.columns:
        for alt in ["defining_category", "defining category", "cat", "Category"]:
            if alt in df.columns:
                df.rename(columns={alt: "category"}, inplace=True)
                break
    df["category_norm"] = df["category"].map(_lower_norm)

cat_key = _lower_norm(CATEGORY)

fig = make_subplots(
    rows=4, cols=1,
    specs=[[{"type":"domain"}],# row 1: pie
           [{"type":"xy"}],# row 2: bar
           [{"type":"domain"}],# row 3: pie
           [{"type":"xy"}]],# row 4: text panel via blank scatter
    vertical_spacing=0.08,
    subplot_titles=(
        f"Top 10 Tags — {CATEGORY}",
        f"Top 10 Titles — {CATEGORY}",
        f"Short vs Long Videos — {CATEGORY}",# medians appended later
        f"Top 10 Influencers — {CATEGORY}"
    )
)

# 1) Top 10 tags (pie)
try:
    tag_col   = _pick_col(df_tags, ["tag","tags","keyword"])
    count_col = _pick_col(df_tags, ["count","tag_count","frequency","freq"])

    tags_cat = df_tags.loc[df_tags["category_norm"] == cat_key, [tag_col, count_col]].copy()
    tags_cat[count_col] = _as_num(tags_cat[count_col]).fillna(0)
    tags_top10 = tags_cat.sort_values(count_col, ascending=False).head(10)

    if tags_top10.empty:
        fig.add_trace(
            go.Pie(
                labels=["(no data)"], values=[1],
                marker=dict(colors=[PASTELS[0]]),
                showlegend=False, textinfo="label+percent", textposition="outside"
            ),
            row=1, col=1
        )
    else:
        fig.add_trace(
            go.Pie(
                labels=_wrap(tags_top10[tag_col].values, width=18),
                values=tags_top10[count_col].values,
                marker=dict(colors=PASTELS),
                showlegend=False,                # <- prevent labels at bottom
                textinfo="label+percent",        # <- show label + %
                textposition="outside",          # <- labels beside wedges
                hovertemplate="%{label}<br>%{value} hits<extra></extra>",
                sort=False
            ),
            row=1, col=1
        )
except Exception as e:
    fig.add_trace(go.Pie(labels=[f"Error: {e}"], values=[1], marker=dict(colors=[PASTELS[5]]),
                         showlegend=False, textinfo="label+percent", textposition="outside"),
                  row=1, col=1)
    

# 2) Top 10 titles 

try:
    title_col = _pick_col(df_titles, ["title","video_title"])
    like_ratio_col = _pick_col(
        df_titles, ["like_ratio","like_view_ratio","avg_like_view_ratio","lv","like_to_view_ratio"]
    )
    comment_ratio_col = _pick_col(
        df_titles, ["comment_ratio","comment_view_ratio","avg_comment_view_ratio","cv","comment_to_view_ratio"]
    )

    sum_col = "sum_like_comment_ratio"
    if sum_col not in df_titles.columns:
        df_titles[sum_col] = _as_num(df_titles[like_ratio_col]) + _as_num(df_titles[comment_ratio_col])

    titles_cat = df_titles.loc[df_titles["category_norm"] == cat_key, [title_col, sum_col]].copy()
    titles_cat[sum_col] = _as_num(titles_cat[sum_col]).fillna(0)
    titles_top10 = titles_cat.sort_values(sum_col, ascending=False).head(10)

    if titles_top10.empty:
        fig.add_trace(go.Bar(x=[0], y=["(no data)"], orientation="h", marker=dict(color=PASTELS[1])), row=2, col=1)
    else:
        # reverse for horizontal bar (largest at top)
        y_labels = _wrap(titles_top10[title_col].values, width=60)[::-1]
        x_vals   = titles_top10[sum_col].values[::-1]
        colors   = (PASTELS * ((len(y_labels)//len(PASTELS))+1))[:len(y_labels)]

        fig.add_trace(
            go.Bar(
                x=x_vals,
                y=y_labels,
                orientation="h",
                marker=dict(color=colors, line=dict(color="rgba(0,0,0,0)", width=0.5)),
                hovertemplate="<b>%{y}</b><br>sum(like/view + comment/view)=%{x:.4f}<extra></extra>"
            ),
            row=2, col=1
        )
        fig.update_yaxes(automargin=True, row=2, col=1)
        fig.update_xaxes(title_text="like/view + comment/view (sum)", row=2, col=1)
except Exception as e:
    fig.add_trace(go.Bar(x=[0], y=[f"Error: {e}"], orientation="h", marker=dict(color=PASTELS[5])), row=2, col=1)


# 3) Short vs Long (pie) 
median_note = None
try:
    short_ct_col = _pick_col(df_feat, ["short_count","shortform_count","num_short"])
    long_ct_col  = _pick_col(df_feat, ["long_count","longform_count","num_long"])
    short_med    = _pick_col(df_feat, ["short_median","shortform_median"], required=False)
    long_med     = _pick_col(df_feat, ["long_median","longform_median"], required=False)

    feat_row = df_feat.loc[df_feat["category_norm"] == cat_key]
    if feat_row.empty:
        fig.add_trace(
            go.Pie(labels=["(no data)"], values=[1], marker=dict(colors=[PASTELS[2]]),
                   showlegend=False, textinfo="label+percent", textposition="outside"),
            row=3, col=1
        )
    else:
        r = feat_row.iloc[0]
        def _to_int(v):
            try: return int(float(v))
            except Exception: return 0
        short_n = _to_int(r[short_ct_col])
        long_n  = _to_int(r[long_ct_col])
        smed = (r[short_med] if short_med and pd.notna(r[short_med]) else None)
        lmed = (r[long_med]  if long_med  and pd.notna(r[long_med])  else None)

        fig.add_trace(
            go.Pie(
                labels=["Shortform (≤3 min)", "Longform (>3 min)"],
                values=[short_n, long_n],
                marker=dict(colors=[PASTELS[0], PASTELS[3]]),
                showlegend=False,               # <- no legend below
                textinfo="label+percent",       # <- show label + %
                textposition="outside",         # <- labels beside wedges
                hovertemplate="%{label}<br>%{value} videos<extra></extra>",
                sort=False
            ),
            row=3, col=1
        )

        parts = []
        if smed: parts.append(f"Short median: {smed}")
        if lmed: parts.append(f"Long median: {lmed}")
        if parts:
            median_note = " | ".join(parts)
except Exception as e:
    fig.add_trace(go.Pie(labels=[f"Error: {e}"], values=[1], marker=dict(colors=[PASTELS[5]]),
                         showlegend=False, textinfo="label+percent", textposition="outside"),
                  row=3, col=1)
    

# 4) Top 10 influencers
try:
    auth_col  = _pick_col(df_influ, ["author_id","creator_id","channel_id"])
    score_col = _pick_col(df_influ, ["avg_video_score","avg_score","video_score_mean"])
    alv_col   = _pick_col(df_influ, ["avg_like_view_ratio","avg_like_to_view","lv_avg"])
    acv_col   = _pick_col(df_influ, ["avg_comment_view_ratio","avg_comment_to_view","cv_avg"])

    infl_cat = df_influ.loc[df_influ["category_norm"] == cat_key, [auth_col, score_col, alv_col, acv_col]].copy()
    infl_cat[alv_col] = _as_num(infl_cat[alv_col])
    infl_cat[acv_col] = _as_num(infl_cat[acv_col])
    infl_cat[score_col] = _as_num(infl_cat[score_col])

    infl_cat = infl_cat[(infl_cat[alv_col] >= 0.05) & (infl_cat[acv_col] >= 0.0005)]
    infl_top10 = infl_cat.sort_values(score_col, ascending=False).head(10)

    if infl_top10.empty:
        list_text = "(none after engagement filters)"
    else:
        lines = []
        for i, row in enumerate(infl_top10.itertuples(index=False), start=1):
            author = getattr(row, auth_col)
            avg_sc = getattr(row, score_col)
            alv    = getattr(row, alv_col)
            acv    = getattr(row, acv_col)
            lines.append(f"{i:>2}. {author} | score={avg_sc:.3f} | like/view={alv:.2%} | comment/view={acv:.3%}")
        list_text = "<br>".join(lines)

    fig.add_trace(go.Scatter(x=[0], y=[0], mode="markers", marker=dict(opacity=0)), row=4, col=1)
    fig.update_xaxes(visible=False, row=4, col=1)
    fig.update_yaxes(visible=False, row=4, col=1)
    fig.add_annotation(
        row=4, col=1, x=0.5, y=0.5, xref="x domain", yref="y domain",
        text=list_text, showarrow=False, align="left",
        font=dict(family="Courier New, monospace", size=12)
    )
except Exception as e:
    fig.add_trace(go.Scatter(x=[0], y=[0], mode="markers", marker=dict(opacity=0)), row=4, col=1)
    fig.add_annotation(
        row=4, col=1, x=0.5, y=0.5, xref="x domain", yref="y domain",
        text=f"Influencers error: {e}", showarrow=False
    )

fig.update_layout(
    template="plotly_white",
    width=1100, height=2100,
    title=dict(text=f"Category Insights — {CATEGORY}", x=0.5, xanchor="center"),
    margin=dict(l=60, r=60, t=80, b=60),
    uniformtext_minsize=10, 
    uniformtext_mode="hide" 
)

for ann in fig['layout']['annotations']:
    if isinstance(ann.text, str) and ann.text.startswith(("Top 10 Tags", "Top 10 Titles", "Short vs Long", "Top 10 Influencers")):
        ann.x = 0.5
        ann.xanchor = 'center'

if median_note:
    for ann in fig['layout']['annotations']:
        if isinstance(ann.text, str) and ann.text.startswith("Short vs Long"):
            ann.text = ann.text + f"<br><sup>{median_note}</sup>"
            break

fig.update_traces(selector=dict(type="pie"), showlegend=False)

fig.show()

In [None]:
def _to_int_maybe(x):
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return None
    s = str(x).strip()
    s = re.sub(r"[^\d]", "", s) 
    return int(s) if s else None

def classify_creator_size(subscribers):
    n = _to_int_maybe(subscribers)
    if n is None:
        return "Unknown"
    if n >= 1_000_000:
        return "Mega Influencer"
    if n >= 100_000:
        return "Macro Influencer"
    if n >= 1_000:
        return "Micro Influencer"
    return "Nano Influencer"


#example IDS: 6084, 12942

CREATOR_ID = "12942"

CREATOR_META = {
    "12942": {
        #these details are not in the sheet, so they were manually taken. In future, they can be scraped alongside the other data
        "name": "LiVing Ash", #"Glam up with Tanya",
        "subscribers": 130000, #2480,
        "classification": "Macro",
        "total_videos": 417,
        "profile_img": "https://yt3.googleusercontent.com/9pyt-iT19kqGtimYSDvp_GFWkq69BrCPD3EFBrufTVzoYUPx3_3jxN7ILhzCcpvsEvkSo4Rj=s160-c-k-c0x00ffffff-no-rj",
        "banner_img": "https://yt3.googleusercontent.com/oBMdgbh1tw-OHOq-xpa5sWEqm4fBhahAO_JDFVmo4B7mDw_0s1ql8pFVlRlrawh8S67dmeTgxg=w1138-fcrop64=1,00005a57ffffa5a8-k-c0xffffffff-no-nd-rj",
        "channel_url": "https://www.youtube.com/channel/UCEGqJWjAIZvUoZ-SimGdkpA",
    }
}



# CSV paths
PATH_UNIQUE_AUTHORS = "unique_video_authors.csv"
PATH_VIDEOS_SCORED  = "videos_scored.csv"

# How many videos to list (None = all)
MAX_VIDEOS_LIST = 25

# Pastel palette
PASTELS = ["#a3c4f3","#c2eabd","#ffd6a5","#ffadad","#bdb2ff","#bde0fe","#ffc8dd","#caffbf","#ffe5a5"]

def _as_num(s): 
    return pd.to_numeric(s, errors="coerce")

def _pct(x, places=2):
    try: return f"{float(x):.{places}%}"
    except: return ""

def _wrap(vals, width=38):
    return [textwrap.fill(str(v), width=width) for v in vals]

def _br_wrap(s, every=50):
    s = "" if pd.isna(s) else str(s)
    return "<br>".join(textwrap.wrap(s, every)) if s else ""

def parse_videos_list(s):
    if not isinstance(s, str): return []
    return [v.strip() for v in s.split("|") if v.strip()]

def ensure_ratios(df):
    if "like_ratio" not in df.columns:
        if ("likeCount" in df.columns) and ("viewCount" in df.columns):
            likes = _as_num(df["likeCount"]).fillna(0)
            views = _as_num(df["viewCount"]).fillna(0).replace(0, np.nan)
            df["like_ratio"] = (likes / views).fillna(0.0)
        else:
            df["like_ratio"] = np.nan
    if "comment_ratio" not in df.columns:
        if "commentCount" in df.columns:
            comments = _as_num(df["commentCount"]).fillna(0)
        elif "total_comment_count" in df.columns:
            comments = _as_num(df["total_comment_count"]).fillna(0)
        else:
            comments = pd.Series(0, index=df.index, dtype=float)
        views = _as_num(df.get("viewCount", 0)).fillna(0).replace(0, np.nan)
        df["comment_ratio"] = (comments / views).fillna(0.0)
    if "video_score" not in df.columns:
        for alt in ["score", "final_score", "videoScore", "video_score_final"]:
            if alt in df.columns:
                df.rename(columns={alt: "video_score"}, inplace=True); break
    if "video_score" not in df.columns:
        df["video_score"] = np.nan
    return df

def header_block_text(meta, row):
    lines = []
    if meta:
        name = meta.get("name", "")
        subs = meta.get("subscribers", "")
        classification = meta.get("classification", "")
        total = meta.get("total_videos", "")
        url = meta.get("channel_url", "")
        if name: lines.append(f"<b>{name}</b>")
        if subs != "":  lines.append(f"Subscribers: {subs} (Size: {classification})" if isinstance(subs, int) else f"Subscribers: {subs}")
        if total != "": lines.append(f"Total videos: {total}")
        if url:         lines.append(f'<a href="{url}">{url}</a>')
    if row is not None:
        lines.append(f"Videos in system: {row.get('video_count', 0)}")
        lines.append(f"Main category: {row.get('main_category','')}")
        lines.append(f"Avg video score: {row.get('avg_video_score','')}")
        lines.append(f"Avg like/view: {_pct(row.get('avg_like_view_ratio', np.nan))}")
        lines.append(f"Avg comment/view: {_pct(row.get('avg_comment_view_ratio', np.nan))}")
        lines.append(f"Avg spam ratio: {_pct(row.get('avg_spam_ratio', np.nan))}")
    return "<br>".join(lines)

ua = pd.read_csv(PATH_UNIQUE_AUTHORS, dtype=str)
vs = pd.read_csv(PATH_VIDEOS_SCORED, dtype=str)

# Coerce numerics in authors
for col in ["video_count","avg_video_score","avg_spam_ratio","avg_like_view_ratio","avg_comment_view_ratio"]:
    if col in ua.columns:
        ua[col] = _as_num(ua[col])

if "author_id" not in ua.columns:
    for alt in ["channel_id","creator_id","authorId","channelId"]:
        if alt in ua.columns:
            ua.rename(columns={alt: "author_id"}, inplace=True); break

row_ua = ua.loc[ua["author_id"] == CREATOR_ID]
row_dict = None; video_ids = []
if not row_ua.empty:
    row_ua = row_ua.iloc[0]
    row_dict = {
        "video_count": int(_as_num(row_ua.get("video_count", 0)) or 0),
        "main_category": row_ua.get("main_category", ""),
        "avg_video_score": (lambda x: f"{float(x):.3f}" if pd.notna(x) else "")(row_ua.get("avg_video_score", np.nan)),
        "avg_spam_ratio": float(row_ua.get("avg_spam_ratio", np.nan)) if pd.notna(row_ua.get("avg_spam_ratio", np.nan)) else np.nan,
        "avg_like_view_ratio": float(row_ua.get("avg_like_view_ratio", np.nan)) if pd.notna(row_ua.get("avg_like_view_ratio", np.nan)) else np.nan,
        "avg_comment_view_ratio": float(row_ua.get("avg_comment_view_ratio", np.nan)) if pd.notna(row_ua.get("avg_comment_view_ratio", np.nan)) else np.nan,
    }
    if "videos" in ua.columns:
        video_ids = parse_videos_list(row_ua["videos"])

vs = ensure_ratios(vs)
if "videoId" not in vs.columns:
    for alt in ["video_id","id","VideoID"]:
        if alt in vs.columns:
            vs.rename(columns={alt: "videoId"}, inplace=True); break

vids_df = vs.loc[vs["videoId"].isin(video_ids)].copy() if video_ids else vs.iloc[0:0].copy()

for col in ["video_score","spam_ratio","comment_score_mean","like_ratio","comment_ratio","viewCount"]:
    if col in vids_df.columns:
        vids_df[col] = _as_num(vids_df[col])

title_col = "title" if "title" in vids_df.columns else ("video_title" if "video_title" in vids_df.columns else None)
cat_col   = "defining_category"

table_df = vids_df[[c for c in ["videoId", title_col, cat_col, "video_score", "spam_ratio", "comment_score_mean", "like_ratio", "comment_ratio"] if c and c in vids_df.columns]].copy()
if title_col in table_df.columns:
    table_df[title_col] = table_df[title_col].apply(lambda s: _br_wrap(s, 60))
for pcol in ["like_ratio","comment_ratio","spam_ratio"]:
    if pcol in table_df.columns:
        table_df[pcol] = table_df[pcol].apply(lambda x: _pct(x, 2) if pd.notna(x) else "")
if "video_score" in table_df.columns:
    table_df["video_score"] = table_df["video_score"].apply(lambda x: f"{x:.3f}" if pd.notna(x) else "")
if MAX_VIDEOS_LIST is not None and len(table_df) > MAX_VIDEOS_LIST:
    table_df = table_df.head(MAX_VIDEOS_LIST)

cat_counts = vids_df[cat_col].fillna("(unknown)").value_counts() if cat_col else pd.Series([], dtype=int)

scatter_df = vids_df.dropna(subset=["like_ratio","comment_ratio"])
sizes = (scatter_df["video_score"].fillna(0.5) * 6 + 3) if "video_score" in scatter_df.columns else pd.Series(7, index=scatter_df.index)
sizes = np.clip(sizes, 4, 10)  # smaller dots

fig = make_subplots(
    rows=6, cols=1,
    specs=[
        [{"type":"xy"}],       # 1: overview text
        [{"type":"xy"}],       # 2: profile + banner
        [{"type":"domain"}],   # 3: category pie
        [{"type":"xy"}],       # 4: engagement scatter
        [{"type":"table"}],    # 5: table part 1
        [{"type":"table"}],    # 6: table part 2
    ],
    row_heights=[0.11, 0.16, 0.20, 0.19, 0.17, 0.17],
    vertical_spacing=0.05,
)

meta = CREATOR_META.get(CREATOR_ID, {})
header_text = header_block_text(meta, row_dict)
fig.add_trace(go.Scatter(
    x=[0.5], y=[0.5], mode="text", text=[header_text],
    textposition="middle center", textfont=dict(size=15),
    hoverinfo="skip", showlegend=False
), row=1, col=1)
fig.update_xaxes(visible=False, row=1, col=1)
fig.update_yaxes(visible=False, row=1, col=1)

if cat_counts.empty:
    fig.add_trace(go.Pie(labels=["(no videos)"], values=[1],
                         marker=dict(colors=[PASTELS[0]]),
                         textinfo="label+percent", textposition="outside",
                         showlegend=False, sort=False),
                  row=3, col=1)
else:
    fig.add_trace(go.Pie(labels=cat_counts.index.tolist(),
                         values=cat_counts.values.tolist(),
                         marker=dict(colors=PASTELS),
                         textinfo="label+percent",
                         textposition="outside",
                         showlegend=False, sort=False),
                  row=3, col=1)

#  Row 4: Engagement scatter (smaller dots, auto margins) ----
if scatter_df.empty:
    fig.add_trace(go.Scatter(x=[0], y=[0], mode="text",
                             text=["(no videos with ratios)"],
                             showlegend=False, hoverinfo="skip"),
                  row=4, col=1)
else:
    fig.add_trace(go.Scatter(
        x=scatter_df["like_ratio"], y=scatter_df["comment_ratio"],
        mode="markers",
        marker=dict(size=sizes, color=PASTELS[4], opacity=0.9, line=dict(color="#666", width=0.4)),
        text=_wrap(scatter_df[title_col] if title_col in scatter_df.columns else scatter_df["videoId"]),
        hovertemplate="<b>%{text}</b><br>like/view=%{x:.2%}<br>comment/view=%{y:.3%}<extra></extra>",
        showlegend=False
    ), row=4, col=1)
    fig.update_xaxes(title_text="Like / View", tickformat=".0%", automargin=True, row=4, col=1)
    fig.update_yaxes(title_text="Comment / View", tickformat=".2%", automargin=True, row=4, col=1)

#  Rows 5 & 6: Tables (give them lots of space, wrap long titles) ----
def table_trace(df_slice):
    cols = list(df_slice.columns)
    # make the title column wider; others compact
    if title_col and title_col in cols:
        widths = []
        for c in cols:
            if c == title_col:
                widths.append(0.46)
            elif c in ("videoId", cat_col):
                widths.append(0.16)
            else:
                widths.append(0.09)
    else:
        widths = [1.0/len(cols)]*len(cols)

    return go.Table(
        header=dict(values=[f"<b>{c}</b>" for c in cols],
                    fill_color="#f7f7f7", align="left", font=dict(size=12)),
        cells=dict(values=[df_slice[c] for c in cols],
                   fill_color="white", align="left", font=dict(size=11), height=26),
        columnwidth=widths
    )

if table_df.empty:
    fig.add_trace(go.Table(header=dict(values=["(no videos in system for this creator)"]),
                           cells=dict(values=[[""]]), columnwidth=[1.0]),
                  row=5, col=1)
    fig.add_trace(go.Table(header=dict(values=[" "]), cells=dict(values=[[" "]])),
                  row=6, col=1)
else:
    half = (len(table_df) + 1)//2
    left = table_df.iloc[:half]
    right = table_df.iloc[half:]
    fig.add_trace(table_trace(left),  row=5, col=1)
    fig.add_trace(table_trace(right) if not right.empty
                  else go.Table(header=dict(values=[" "]), cells=dict(values=[[" "]])),
                  row=6, col=1)

fig.update_layout(
    template="plotly_white",
    width=1100, height=2000,        # tall & narrow-ish to reduce crowding
    title=dict(text=f"Creator Insights — ID: {CREATOR_ID}", x=0.5, xanchor="center"),
    margin=dict(l=70, r=70, t=90, b=60),
    showlegend=False,
    uniformtext_minsize=10, uniformtext_mode="hide",
)

row_title_y = {
    1: 1.975,   # above row 1
    2: 1.835,   # above row 2
    3: 1.655,   # above row 3
    4: 1.460,   # above row 4
    5: 1.265,   # above row 5
    6: 1.085,   # above row 6
}

def add_row_title(fig, row, text):
    fig.add_annotation(
        text=f"<b>{text}</b>",
        x=0.5, y=row_title_y.get(row, 0.5),
        xref="paper", yref="paper",
        showarrow=False
    )

add_row_title(fig, 1, "Creator Overview")
add_row_title(fig, 3, "Videos by Category")
add_row_title(fig, 4, "Engagement Map (Like/View vs Comment/View)")
add_row_title(fig, 5, "Videos (list)")
add_row_title(fig, 6, "Videos (list continued)")

fig.show()


In [None]:
VIDEO_ID = 85557
VIDEOS_CSV = "videos_scored.csv"

creator_name = "LiVing Ash"
# in future, replace with a map of the creator ID to the channel name if the name is in the data

def _as_num(s, dtype=float):
    out = pd.to_numeric(s, errors="coerce")
    return out.astype(dtype) if dtype in (int, np.int64) else out

def iso8601_to_seconds(s: str):
    if not isinstance(s, str) or not s:
        return None
    m = re.fullmatch(
        r"P(?:(?P<days>\d+)D)?(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)?",
        s.strip()
    )
    if not m:
        m = re.fullmatch(r"PT(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?", s.strip())
        if not m:
            return None
    d = {k: (int(v) if v else 0) for k, v in m.groupdict().items()}
    total = d.get("days", 0)*86400 + d.get("hours", 0)*3600 + d.get("minutes", 0)*60 + d.get("seconds", 0)
    return float(total)

def human_duration(seconds: float | None) > str:
    if seconds is None or np.isnan(seconds):
        return "unknown"
    seconds = int(round(seconds))
    h = seconds // 3600
    m = (seconds % 3600) // 60
    s = seconds % 60
    parts = []
    if h > 0:
        parts.append(f"{h}H")
    if m > 0 or h > 0: 
        parts.append(f"{m}M")
    parts.append(f"{s}S")
    return " ".join(parts)

def short_or_long(seconds: float | None) > str:
    if seconds is None or np.isnan(seconds):
        return "unknown"
    return "shortform" if seconds <= 180 else "longform"

def pick_value(*vals, fallback="unknown"):
    for v in vals:
        if isinstance(v, str) and v.strip():
            return v
    return fallback

def pct_or_blank(x, places=2):
    try:
        return f"{float(x):.{places}%}"
    except Exception:
        return ""

def int_or_zero(x):
    try:
        return int(float(x))
    except Exception:
        return 0

def video_insights(video_id: str, videos_csv: str = VIDEOS_CSV, creator_name_map=None):
    creator_name_map = creator_name_map or {}

    df = pd.read_csv(videos_csv, dtype=str, engine="python", on_bad_lines="skip")
    if "videoId" not in df.columns:
        for alt in ["video_id", "id", "VideoID"]:
            if alt in df.columns:
                df.rename(columns={alt: "videoId"}, inplace=True)
                break

    subset = df.loc[df["videoId"].astype(str).str.strip() == str(video_id).strip()]
    if subset.empty:
        print(f"No row found for videoId={video_id!r} in {videos_csv}.")
        return
    if len(subset) > 1:
        # if duplicates, take the first but show how many
        print(f"Warning: found {len(subset)} rows for videoId={video_id}; showing the first.\n")
    row = subset.iloc[0]

    # Basic details
    channel_id = row.get("channelId", "")

    category = pick_value(row.get("defining_category", ""), row.get("category", ""))
    title    = row.get("title", row.get("video_title", ""))
    tags     = row.get("tags", "")
    duration_iso = row.get("contentDuration", row.get("duration", ""))
    seconds = iso8601_to_seconds(duration_iso) if duration_iso else None
    duration_h = human_duration(seconds)
    vid_class = short_or_long(seconds)

    default_lang = pick_value(row.get("defaultLanguage", ""), fallback="unknown")
    default_audio = pick_value(row.get("defaultAudioLanguage", ""), fallback="unknown")

    views   = int_or_zero(row.get("viewCount", "0"))
    likes   = int_or_zero(row.get("likeCount", "0"))
    commentCount_meta = int_or_zero(row.get("commentCount", "0"))
    total_comment_count = int_or_zero(row.get("total_comment_count", row.get("normal_comment_count", "0")))

    like_ratio = pd.to_numeric(row.get("like_ratio", "nan"), errors="coerce")
    if pd.isna(like_ratio):
        like_ratio = (likes / views) if views > 0 else np.nan

    comment_ratio = pd.to_numeric(row.get("comment_ratio", "nan"), errors="coerce")
    if pd.isna(comment_ratio):
        comments_used = max(commentCount_meta, total_comment_count)
        comment_ratio = (comments_used / views) if views > 0 else np.nan

    video_score = pd.to_numeric(row.get("video_score", "nan"), errors="coerce")
    spam_ratio  = pd.to_numeric(row.get("spam_ratio", "nan"), errors="coerce")
    cmt_mean    = pd.to_numeric(row.get("comment_score_mean", "nan"), errors="coerce")
    cmt_min     = pd.to_numeric(row.get("comment_score_min", "nan"), errors="coerce")
    cmt_max     = pd.to_numeric(row.get("comment_score_max", "nan"), errors="coerce")

    print("\nVideo details:")
    print(f"  videoId:               {video_id}")
    print(f"  channelId:             {channel_id}")
    print(f"  Creator name:          {creator_name}")
    print(f"  category:              {category}")
    print(f"  title:                 {title}")
    print(f"  tags:                  {tags}")
    print(f"  duration:              {duration_h}")
    print(f"  video classification:  {vid_class}")
    print(f"  defaultLanguage:       {default_lang}")
    print(f"  defaultAudioLanguage:  {default_audio}")

    print("\nMetrics:")
    print(f"  viewCount:             {views:,}")
    print(f"  likeCount:             {likes:,}")
    print(f"  commentCount:          {commentCount_meta:,}")

    print("\nEngagement:")
    print(f"  Video score:              {'' if pd.isna(video_score) else f'{video_score:.3f}'}")
    print(f"  Comments in the system:   {total_comment_count:,}")
    print(f"  Spam ratio:               {pct_or_blank(spam_ratio, 2)}")
    print(f"  Comment score mean:       {'' if pd.isna(cmt_mean) else f'{cmt_mean:.3f}'}")
    print(f"  Comment score range:   "
          f"{'' if pd.isna(cmt_min) else f'{cmt_min:.3f}'} to {'' if pd.isna(cmt_max) else f'{cmt_max:.3f}'}")
    print(f"  Like-view ratio:            {pct_or_blank(like_ratio, 2)}")
    print(f"  Comment-view ratio:         {pct_or_blank(comment_ratio, 3)}")
    print("")

if __name__ == "__main__":
    video_insights(VIDEO_ID, VIDEOS_CSV, creator_name)
