In [None]:
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--driver-memory 4g "
    "--executor-memory 4g "
    "--conf spark.executor.instances=1 "
    "--conf spark.sql.shuffle.partitions=8 "
    "pyspark-shell"
)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, row_number
from pyspark.sql.window import Window
from graphframes import GraphFrame

In [None]:
from pyspark.sql import SparkSession

#.config("spark.driver.memory", "4g") \
#.config("spark.executor.memory", "4g") \

spark = SparkSession.builder \
    .appName("YouTube Analytics") \
    .config("spark.jars.packages",
            "org.mongodb.spark:mongo-spark-connector_2.12:10.5.0,"
            "graphframes:graphframes:0.8.4-spark3.5-s_2.12") \
    .getOrCreate()

print("Spark session created")


In [None]:
# -------------------------
# Step 1: MongoDB Connection
# -------------------------
mongo_ip = "ip"
mongo_uri = f"mongodb://{mongo_ip}:27017/youtube_analytics"

collections = ["crawls", "edges", "video_snapshots", "videos"]
dfs = {}

for coll in collections:
    df = spark.read.format("mongodb") \
        .option("database", "youtube_analytics") \
        .option("collection", coll) \
        .option("spark.mongodb.read.connection.uri", mongo_uri) \
        .load()
    dfs[coll] = df
    print(f"First 5 rows from '{coll}':")
    df.show(5)

# Cache frequently used dataframes
videos_df = dfs['videos'].cache()
edges_df = dfs['edges'].cache()

# Convenience aliases for DataFrames
snapshots_df = dfs['video_snapshots']
crawls_df = dfs['crawls']



First 5 rows from 'crawls':


                                                                                

+----+--------------------+--------------------+--------------------+--------------------+
| _id|                date|            log_info|               notes|        processed_at|
+----+--------------------+--------------------+--------------------+--------------------+
|0222|2025-10-12T23:19:...|{0222, 2025-10-12...|Crawl processing ...|2025-10-12T23:19:...|
|0301|2025-10-12T23:37:...|{0301, 2025-10-12...|Crawl processing ...|2025-10-12T23:37:...|
+----+--------------------+--------------------+--------------------+--------------------+

First 5 rows from 'edges':
+--------------------+--------+-----------+-----------+
|                 _id|crawl_id|        dst|        src|
+--------------------+--------+-----------+-----------+
|68ec99ddab8d16963...|    0222|DjdA-5oKYFQ|LKh7zAJ4nwo|
|68ec99ddab8d16963...|    0222|NxTDlnOuybo|LKh7zAJ4nwo|
|68ec99ddab8d16963...|    0222|c-8VuICzXtU|LKh7zAJ4nwo|
|68ec99ddab8d16963...|    0222|DH56yrIO5nI|LKh7zAJ4nwo|
|68ec99ddab8d16963...|    0222|W1U

In [5]:
# -------------------------
# Step 2: Network Aggregation (Memory-Friendly)
# -------------------------

# Sample a manageable number of edges
SAMPLE_SIZE = 50_000  # adjust down if needed
sample_edges_df = edges_df.limit(SAMPLE_SIZE)

# Get all unique vertex IDs from the sampled edges
edge_vertex_ids = sample_edges_df.select('src').union(sample_edges_df.select('dst')).distinct()

# Keep only videos that correspond to these vertices
sample_vertices_df = videos_df.join(
    edge_vertex_ids,
    videos_df['_id'] == edge_vertex_ids['src'],  # match relevant vertices
    'inner'
).select(videos_df['_id'].alias('id'))

# Optional: repartition to reduce shuffle memory usage
sample_edges_df = sample_edges_df.repartition(10, 'src')
sample_vertices_df = sample_vertices_df.repartition(10, 'id')

# Create GraphFrame
graph = GraphFrame(sample_vertices_df, sample_edges_df.select(col('src'), col('dst')))

# Calculate degree distributions
in_degrees = graph.inDegrees
out_degrees = graph.outDegrees

all_degrees = in_degrees.join(out_degrees, 'id', 'outer').fillna(0)
all_degrees.describe().show()




+-------+-----------+------------------+------------------+
|summary|         id|          inDegree|         outDegree|
+-------+-----------+------------------+------------------+
|  count|      22036|             22036|             22036|
|   mean|       NULL|2.2690143401706298|2.2690143401706298|
| stddev|       NULL|3.4225807045249566| 6.328498087331983|
|    min|--aApGlDZGA|                 0|                 0|
|    max|zzuddQOyuW8|                46|                20|
+-------+-----------+------------------+------------------+



                                                                                

In [6]:
# -------------------------
# Step 3: Top-K Queries
# -------------------------
K = 10

# 1. Top K categories by number of videos
videos_df.groupBy('category') \
    .count() \
    .orderBy(desc('count')) \
    .show(K)

# 2. Top K videos by views (must use snapshots!)
snapshots_df.orderBy(desc('views')) \
    .select('video_id', 'views', 'category', 'crawl_id') \
    .show(K)

# 3. Top K videos by rating (use snapshots)
snapshots_df.orderBy(desc('rate')) \
    .select('video_id', 'rate', 'views', 'category', 'crawl_id') \
    .show(K)

# 4. Range query: Music videos, 200â€“500 sec duration
#     length_sec exists in BOTH videos and snapshots. Use snapshots for view-related info.
snapshots_df.filter(
    (col('category') == 'Music') &
    (col('length_sec').between(200, 500))
).select('video_id', 'length_sec', 'views', 'category') \
 .orderBy(desc('views')) \
 .show(K)


                                                                                

+------------------+------+
|          category| count|
+------------------+------+
|             music|212605|
|     entertainment|154820|
|            comedy|105659|
|            sports| 85578|
|film_and_animation| 85185|
| gadgets_and_games| 72083|
|  people_and_blogs| 60546|
| news_and_politics| 42741|
|     howto_and_diy| 21282|
|autos_and_vehicles| 19833|
+------------------+------+
only showing top 10 rows



                                                                                

+-----------+--------+----------------+--------+
|   video_id|   views|        category|crawl_id|
+-----------+--------+----------------+--------+
|dMH0bHeiRNg|42513417|          comedy|    0222|
|4c_Grdrx7t0|24133454|             una|    0301|
|0XxI-hvPRRA|20282464|          comedy|    0222|
|1dmVU08zVpA|16087899|   entertainment|    0222|
|RB-wUgnyGv0|15712924|   entertainment|    0222|
|QjA5faZF1A8|15256922|           music|    0222|
|-_CSo1gOd48|13199833|people_and_blogs|    0222|
|49IDp76kjPw|11970018|          comedy|    0222|
|tYnn51C3X_w|11823701|           music|    0222|
|pv5zWaTEVkI|11672017|           music|    0222|
+-----------+--------+----------------+--------+
only showing top 10 rows





+-----------+----+-----+------------------+--------+
|   video_id|rate|views|          category|crawl_id|
+-----------+----+-----+------------------+--------+
|feioiPwxt98| 5.0| 1632|            sports|    0301|
|LqNUMgSzWP4| 5.0|  167|             music|    0222|
|--B1obbH1ck| 5.0|  120|             music|    0222|
|Lq_pEG5dYBo| 5.0|  558|            comedy|    0222|
|fenxoc-IWb0| 5.0| 1590|autos_and_vehicles|    0222|
|LqPW591Xfaw| 5.0| 1583|     entertainment|    0222|
|--Qm8HF07BM| 5.0|  214|             music|    0222|
|LqGGf5Go3Jk| 5.0|  362|             music|    0222|
|fekHbMHXXcQ| 5.0|  101|film_and_animation|    0222|
|LqTK-3wsH1E| 5.0| 7735|     entertainment|    0222|
+-----------+----+-----+------------------+--------+
only showing top 10 rows

+--------+----------+-----+--------+
|video_id|length_sec|views|category|
+--------+----------+-----+--------+
+--------+----------+-----+--------+



                                                                                

In [7]:
# -------------------------
# Step 4: Subgraph Pattern Search (motifs)
# -------------------------

videos_df.createOrReplaceTempView("videos")

# Example: Find pairs of related videos in 'Music'
motifs = graph.find('(a)-[e]->(b)')\
.filter('a.id in (select _id from videos where category="Music")')
motifs.show(5)

+---+---+---+
|  a|  e|  b|
+---+---+---+
+---+---+---+





In [8]:
# -------------------------
# Step 5: Influence Analysis (PageRank)
# -------------------------

# 1. Compute PageRank
pagerank_results = graph.pageRank(resetProbability=0.15, maxIter=10)

# 2. Pick the latest snapshot per video
windowSpec = Window.partitionBy("video_id").orderBy(desc("age_days"))
latest_snapshots = snapshots_df.withColumn(
    "rn", row_number().over(windowSpec)
).filter("rn = 1").drop("rn")

# 3. Join PageRank results with snapshots
pagerank_results.vertices.join(
    latest_snapshots,
    pagerank_results.vertices.id == latest_snapshots.video_id
).select(
    "video_id", "pagerank", "views", "category"
).orderBy(desc("pagerank")).show(K)


25/11/16 00:04:29 WARN BlockManager: Block rdd_312_1 already exists on this machine; not re-adding it

+-----------+-----------------+-----+-----------------+
|   video_id|         pagerank|views|         category|
+-----------+-----------------+-----+-----------------+
|-fTO_SYoFCM|6.246195832075682|   64| people_and_blogs|
|FN9ZOOImjCg|6.246195832075682|  182| people_and_blogs|
|KNdj3ae2Tlk|6.246195832075682|  164| people_and_blogs|
|pazSPUYZYVE|6.101569604111737|  350|news_and_politics|
|TFvum08n0nI|6.101569604111737| 1391|    entertainment|
|Ud7KYmEOWic|6.101569604111737|  232|news_and_politics|
|6OZ0ruq0DH4|6.101569604111737|  837|    entertainment|
|CESHloI5or8|6.101569604111737|  552|news_and_politics|
|QezUyZ7pKQc|6.101569604111737| 8635|           sports|
|D7k8Ni_oEsE|6.101569604111737|  519|           sports|
+-----------+-----------------+-----+-----------------+
only showing top 10 rows



                                                                                

In [9]:
# Stop Spark session
spark.stop()