In [1]:
from pyspark.sql import functions as F

# Tune partitions for massive shuffle
spark.conf.set("spark.sql.shuffle.partitions", "4000")

df = spark.read.parquet("gs://ir-2026-shaked-bucket/multistream*_preprocessed.parquet")

links = (
    df
    .filter(F.size(F.col("anchor_text")) > 0)  # Drop empty rows early
    .select(F.col("anchor_text.id").alias("ids")) # Grab only IDs (ints)
    .select(F.explode("ids").alias("wiki_id"))    # Explode ints only
)

pageviews_agg = (
    links
    .groupBy("wiki_id")
    .agg(F.count("*").alias("pageviews"))
)

OUTPUT_PATH = "gs://ir_project_2025/pageviews_aug_2021"

# Use "error" mode to prevent accidental overwrites
pageviews_agg.write \
    .mode("error") \
    .parquet(OUTPUT_PATH)

                                                                                