# **Final Project**

This is a project doing some basic data analysis of IMDB movie data and associated wiki streaming events. It should be completed by groups of no less than 2 students and no more than 4 students. Each member of the group should have at least a few commits associated in the project repo.

## **Scoring**

The code must run and provide the correct answers . 1/2 points
The remainder will come from notebook organization, code comments, etc .
For the questions that have answers, please also provide those in markdown cells in the notebook, and/or part of a mardown file in the repo .
All relevant code should be shared via a shared Git repository. Additionally, you will send an email to joe@adaltas.com when the project has been submitted . Please ensure that the names of all participants are included in the repo and in the submission email . Note: For full credit the code must run with little to no extra input from the end user, and, any extra input that is required must be clearly documented and explained. Also note, any question that is at least attempted will be awarded with partial credit provided there is a corresponding explanation of the difficulties faced.

## **Questions**

  1 - load data from here. This should be done using a notebook cell and not a manual process to import the data. NOTE: You may not need all of the datasets, but you will be utilizing most of them.

  2 - How many total people in data set?

  3 - What is the earliest year of birth?

  4 - How many years ago was this person born?

  5 - Using only the data in the data set, determine if this date of birth correct.

  6 - Explain the reasoning for the answer in a code comment or new markdown cell.

  7 - What is the most recent data of birth?

  8 - What percentage of the people do not have a listed date of birth?

  9 - What is the length of the longest "short" after 1900?

  10 - What is the length of the shortest "movie" after 1900?

  11 - List of all of the genres represented.

  12 - What is the higest rated comedy "movie" in the dataset? Note, if there is a tie, the tie shall be broken by the movie with the most votes.

  13 - Who was the director of the movie?

  14 - List, if any, the alternate titles for the movie.

## **Stream Processing**

Choose any five entities from the data set. These can be specific movies, actors, crews, etc, or more abstract concepts such as specific genres, etc. The main criteria is that the entities chosen must have a trackable wiki page. Set up a stream processing job that will track events for the chosen entities from the wikimedia Events Platform. These tracking jobs should provide some simple metrics. These metrics should be stored in a database or file (depending on the platform used). At least one of the metrics should be of the "alert" type (meaning some event that would require further action. For instance imagine wanting to be notified each time a specific user makes a change. Capture this "alert" and mimic an alerting system by routing these events to a different file/database.) These tables/data do not need to be shared, but the structure of the output should be clearly noted in the code and/or markdown cells. Additionally, a brief explanation/overview of this section should be provided in a seperate markdown cell or in the project readme.

---

## **Population Script**

In [1]:
import os

local_path = "./data"                                                                                             
os.makedirs(local_path, exist_ok=True)
local_path

# 2. Download files using shell 
files = [
    "name.basics.tsv.gz",
    "title.akas.tsv.gz",
    "title.basics.tsv.gz",
    "title.crew.tsv.gz",
    "title.episode.tsv.gz",
    # "title.principals.tsv.gz",                        # Too big for instance to run it in Databricks
    "title.ratings.tsv.gz"
]

dict_files_names = {
    "name.basics.tsv.gz": "name.basics",
    "title.akas.tsv.gz": "title.akas",
    "title.basics.tsv.gz": "title.basics",
    "title.crew.tsv.gz": "title.crew",
    "title.episode.tsv.gz": "title.episode",
    # "title.principals.tsv.gz": "title.principals",    # Too big for instance to run it in Databricks
    "title.ratings.tsv.gz": "title.ratings"
}

base_url = "https://datasets.imdbws.com/"

for f in files:
    url = base_url + f
    out = f"{local_path}/{f}"
    print("Downloading:", f)
    os.system(f"wget -O {out} {url}")

Downloading: name.basics.tsv.gz


--2025-12-20 18:10:50--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 2600:9000:203b:c800:3:3082:af00:93a1, 2600:9000:203b:b400:3:3082:af00:93a1, 2600:9000:203b:ac00:3:3082:af00:93a1, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|2600:9000:203b:c800:3:3082:af00:93a1|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 296030412 (282M) [binary/octet-stream]
Saving to: ‘./data/name.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 6.20M 46s
    50K .......... .......... .......... .......... ..........  0% 13.0M 34s
   100K .......... .......... .......... .......... ..........  0% 8.01M 34s
   150K .......... .......... .......... .......... ..........  0% 25.6M 28s
   200K .......... .......... .......... .......... ..........  0% 31.9M 24s
   250K .......... .......... .......... .......... ..........  0% 10.5M 25s
   300K .......... .......... .......... ......

Downloading: title.akas.tsv.gz


.. .......... ..........  0% 22.9M 47s
   100K .......... .......... .......... .......... ..........  0% 13.6M 42s
   150K .......... .......... .......... .......... ..........  0% 12.2M 41s
   200K .......... .......... .......... .......... ..........  0% 18.8M 38s
   250K .......... .......... .......... .......... ..........  0% 32.1M 34s
   300K .......... .......... .......... .......... ..........  0% 24.9M 31s
   350K .......... .......... .......... .......... ..........  0% 28.8M 29s
   400K .......... .......... .......... .......... ..........  0% 15.0M 30s
   450K .......... .......... .......... .......... ..........  0% 77.1M 27s
   500K .......... .......... .......... .......... ..........  0% 27.1M 26s
   550K .......... .......... .......... .......... ..........  0%  125M 24s
   600K .......... .......... .......... .......... ..........  0% 51.8M 23s
   650K .......... .......... .......... .......... ..........  0%  116M 22s
   700K .......... .......... .......

Downloading: title.basics.tsv.gz


HTTP request sent, awaiting response... 200 OK
Length: 215238181 (205M) [binary/octet-stream]
Saving to: ‘./data/title.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 6.22M 33s
    50K .......... .......... .......... .......... ..........  0% 12.6M 25s
   100K .......... .......... .......... .......... ..........  0% 8.02M 25s
   150K .......... .......... .......... .......... ..........  0% 26.3M 21s
   200K .......... .......... .......... .......... ..........  0% 36.1M 18s
   250K .......... .......... .......... .......... ..........  0% 24.2M 16s
   300K .......... .......... .......... .......... ..........  0% 22.6M 15s
   350K .......... .......... .......... .......... ..........  0% 23.0M 14s
   400K .......... .......... .......... .......... ..........  0%  249M 13s
   450K .......... .......... .......... .......... ..........  0% 28.9M 12s
   500K .......... .......... .......... .......... ..........  0% 16.8M 12s
   550K ..........

Downloading: title.crew.tsv.gz


200 OK
Length: 79359201 (76M) [binary/octet-stream]
Saving to: ‘./data/title.crew.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 5.23M 14s
    50K .......... .......... .......... .......... ..........  0%  993K 46s
   100K .......... .......... .......... .......... ..........  0% 6.49M 35s
   150K .......... .......... .......... .......... ..........  0%  687K 54s
   200K .......... .......... .......... .......... ..........  0% 30.4M 44s
   250K .......... .......... .......... .......... ..........  0% 41.7M 37s
   300K .......... .......... .......... .......... ..........  0% 7.01M 33s
   350K .......... .......... .......... .......... ..........  0%  705K 43s
   400K .......... .......... .......... .......... ..........  0% 73.6M 38s
   450K .......... .......... .......... .......... ..........  0% 33.1M 34s
   500K .......... .......... .......... .......... ..........  0% 63.9M 31s
   550K .......... .......... .......... .......... ..........

Downloading: title.episode.tsv.gz


. .......... .......... ..........  8% 26.8M 1s
  4050K .......... .......... .......... .......... ..........  8% 27.7M 1s
  4100K .......... .......... .......... .......... ..........  8% 15.1M 1s
  4150K .......... .......... .......... .......... ..........  8%  163M 1s
  4200K .......... .......... .......... .......... ..........  8% 24.3M 1s
  4250K .......... .......... .......... .......... ..........  8% 32.4M 1s
  4300K .......... .......... .......... .......... ..........  8% 18.6M 1s
  4350K .......... .......... .......... .......... ..........  8%  204M 1s
  4400K .......... .......... .......... .......... ..........  8% 71.9M 1s
  4450K .......... .......... .......... .......... ..........  8% 66.2M 1s
  4500K .......... .......... .......... .......... ..........  8% 57.7M 1s
  4550K .......... .......... .......... .......... ..........  9% 32.0M 1s
  4600K .......... .......... .......... .......... ..........  9% 30.2M 1s
  4650K .......... .......... ..........

Downloading: title.ratings.tsv.gz


.........  2% 11.4M 1s
   200K .......... .......... .......... .......... ..........  3%  595M 1s
   250K .......... .......... .......... .......... ..........  3% 8.71M 1s
   300K .......... .......... .......... .......... ..........  4%  740M 1s
   350K .......... .......... .......... .......... ..........  5% 29.8M 1s
   400K .......... .......... .......... .......... ..........  5%  394M 0s
   450K .......... .......... .......... .......... ..........  6% 15.6M 0s
   500K .......... .......... .......... .......... ..........  6% 2.70M 1s
   550K .......... .......... .......... .......... ..........  7% 12.3M 1s
   600K .......... .......... .......... .......... ..........  8% 82.5M 1s
   650K .......... .......... .......... .......... ..........  8% 49.2M 1s
   700K .......... .......... .......... .......... ..........  9%  102M 1s
   750K .......... .......... .......... .......... .......... 10% 21.7M 1s
   800K .......... .......... .......... .......... .......... 10

In [2]:
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"
print("JAVA_HOME =", os.environ.get("JAVA_HOME"))
os.system("java -version")
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("IMDB Analysis").getOrCreate()
dict_df = {}
for file_name in files:
    df = (spark.read
          .option("compression", "gzip")
          .option("inferSchema", "false")
          .option("nullValue", "\\N")
          .csv(f"{local_path}/{file_name}", header=True, sep="\t"))
    df.show(5)
    dict_df[dict_files_names[file_name]] = df

JAVA_HOME = /opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home


openjdk version "17.0.17" 2025-10-21
OpenJDK Runtime Environment Homebrew (build 17.0.17+0)
OpenJDK 64-Bit Server VM Homebrew (build 17.0.17+0, mixed mode, sharing)
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/20 18:11:08 WARN Utils: Your hostname, MacBook-Air-de-Paul.local, resolves to a loopback address: 127.0.0.1; using 192.168.105.1 instead (on interface bridge100)
25/12/20 18:11:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/20 18:11:08 WARN Utils: Your hostname, MacBook-Air-de-Paul.local, resolves to a loopback address: 127.0.0.1; using 192.168.105.1 instead (on interface bridge100)
25/12/20 18:11:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(

+---------+---------------+---------+---------+--------------------+--------------------+
|   nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm0000001|   Fred Astaire|     1899|     1987|actor,miscellaneo...|tt0072308,tt00504...|
|nm0000002|  Lauren Bacall|     1924|     2014|actress,miscellan...|tt0037382,tt00752...|
|nm0000003|Brigitte Bardot|     1934|     NULL|actress,music_dep...|tt0057345,tt00491...|
|nm0000004|   John Belushi|     1949|     1982|actor,writer,musi...|tt0072562,tt00779...|
|nm0000005| Ingmar Bergman|     1918|     2007|writer,director,a...|tt0050986,tt00694...|
+---------+---------------+---------+---------+--------------------+--------------------+
only showing top 5 rows
+---------+--------+--------------------+------+--------+-----------+-------------+---------------+
|  titleId|ordering|               title|region|language|      typ

---

## **Questions**

**2.**

In [3]:
unique_name_count = (
  dict_df["name.basics"]
  .select("primaryName")
  .distinct()
  .count()
)
display(unique_name_count)

                                                                                

11417848

**3.**

There is a major problem here since the original dataset provides us with dates in absolute values as the example below proves it with the date of birth of Cesar (-100 -> 100).

In [4]:
raw = spark.read.text(f"{local_path}/name.basics.tsv.gz")
raw.filter(raw.value.contains("Gaio Giulio Cesare")).show(20, False)

[Stage 18:>                                                         (0 + 1) / 1]

+-----------------------------------------------------------------------------------+
|value                                                                              |
+-----------------------------------------------------------------------------------+
|nm2471712\tGaio Giulio Cesare\t100\t44\twriter,archive_footage\ttt0191909,tt0057105|
+-----------------------------------------------------------------------------------+



                                                                                

-> We have then the date of birth closest to 0 in the code below.

In [5]:
from pyspark.sql.functions import col, min

min_birth_year = (
  dict_df["name.basics"]
  .select(min(col("birthYear").cast("double")))
  .collect()[0][0]
)

min_birth_year_df = dict_df["name.basics"].filter(col("birthYear") == min_birth_year)

min_birth_year_df.show(5)

[Stage 22:>                                                         (0 + 1) / 1]

+---------+------------------+---------+---------+-----------------+--------------------+
|   nconst|       primaryName|birthYear|deathYear|primaryProfession|      knownForTitles|
+---------+------------------+---------+---------+-----------------+--------------------+
|nm0784172|Lucio Anneo Seneca|        4|       65|           writer|tt0043802,tt02188...|
+---------+------------------+---------+---------+-----------------+--------------------+



                                                                                

**4.**

In [6]:
from datetime import date

current_year = date.today().year
years_difference = int(current_year - min_birth_year)
print(f"The difference between the current year and the earliest date of birth in our dataset is {years_difference} years!")

The difference between the current year and the earliest date of birth in our dataset is 2021 years!


**5.**

In [7]:
from pyspark.sql.functions import col

person_nconst = min_birth_year_df.select("nconst").first()[0]

person_works = (
    dict_df["title.crew"]
    .filter(col("directors").contains(person_nconst) | col("writers").contains(person_nconst))
    .join(dict_df["title.basics"], on="tconst")
    .select("startYear")
    .filter(col("startYear").isNotNull())
    .orderBy("startYear")
)

first_work = person_works.first()
if first_work:
    age = int(first_work[0]) - int(min_birth_year)
    verdict = "INCORRECT" if age < 0 else "SUSPICIOUS" if age < 10 or age > 150 else "plausible"
    print(f"Age at first work: {age} years - Birth year seems {verdict}")
else:
    print("No works found to verify birth year")

[Stage 24:>                                                         (0 + 1) / 1]

Age at first work: 1947 years - Birth year seems SUSPICIOUS


                                                                                

**6.**

- If we consider this question as a question about the veracity of the earliest date of birth in this dataset, we have answered it a bit above (indicating that the date values were absolute -> preventing us from finding the earliest one but allowing us to find the closest to 0)

- On another hand, if we consider this question as a question about how we can check the veracity of the date of birth of this person

**7.**

In [8]:
from pyspark.sql.functions import col, max

max_birth_year = (
  dict_df["name.basics"]
  .select(max(col("birthYear").cast("double")))
  .collect()[0][0]
)

max_birth_year_df = dict_df["name.basics"].filter(col("birthYear") == max_birth_year)

max_birth_year_df.show(5)

[Stage 32:>                                                         (0 + 1) / 1]

+----------+-----------------+---------+---------+--------------------+--------------------+
|    nconst|      primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+----------+-----------------+---------+---------+--------------------+--------------------+
|nm16784939|Kyrah Ivy Jackson|     2025|     NULL|             actress|                NULL|
| nm5642311|     Chase Ramsey|     2025|     NULL|actor,director,wr...|tt17505010,tt1471...|
+----------+-----------------+---------+---------+--------------------+--------------------+



                                                                                

**8.**

In [9]:
from pyspark.sql.functions import col

total_rows = dict_df["name.basics"].count()
null_rows = dict_df["name.basics"].filter(col("birthYear").isNull()).count()

birth_year_null_pct = (null_rows / total_rows) * 100

print(f"{birth_year_null_pct:.2f}% of the people in this dataset do not have a listed date of birth!")

[Stage 36:>                                                         (0 + 1) / 1]

95.58% of the people in this dataset do not have a listed date of birth!


                                                                                

**9.**

In [10]:
from pyspark.sql.functions import col, max

longest_short_after_1900 = (
  dict_df["title.basics"]
  .filter((col("titleType") == "short") & (col("startYear") >= 1900))
  .select(max(col("runtimeMinutes")))
  .collect()[0][0]
)

print(f"The longest short film after 1900 was {longest_short_after_1900} minutes long!")

[Stage 39:>                                                         (0 + 1) / 1]

The longest short film after 1900 was 97 minutes long!


                                                                                

**10.**

In [11]:
from pyspark.sql.functions import col, min

shortest_movie_after_1900 = (
  dict_df["title.basics"]
  .filter((col("titleType") == "movie") & (col("startYear") >= 1900))
  .select(min(col("runtimeMinutes")))
  .collect()[0][0]
)

print(f"The shortest movie film after 1900 was {shortest_movie_after_1900} minutes long!")

[Stage 42:>                                                         (0 + 1) / 1]

The shortest movie film after 1900 was 1 minutes long!


                                                                                

**11.**

In [12]:
from pyspark.sql.functions import split, explode, trim, col

genres_df = (
    dict_df["title.basics"]
    .select(explode(split(col("genres"), ",")).alias("genre"))
    .select(trim(col("genre")).alias("genre"))
    .filter(col("genre").isNotNull() & (col("genre") != ""))
)

unique_genres = [row["genre"] for row in genres_df.select("genre").distinct().collect()]

print(unique_genres)

[Stage 45:>                                                         (0 + 1) / 1]

['Crime', 'Romance', 'Thriller', 'Adventure', 'Drama', 'War', 'Documentary', 'Reality-TV', 'Family', 'Fantasy', 'Game-Show', 'Adult', 'History', 'Mystery', 'Musical', 'Animation', 'Music', 'Film-Noir', 'Short', 'Horror', 'Western', 'Biography', 'Comedy', 'Sport', 'Action', 'Talk-Show', 'Sci-Fi', 'News']


                                                                                

**12.**

In [13]:
from pyspark.sql.functions import col, desc, dense_rank
from pyspark.sql.window import Window

df_joined = dict_df["title.basics"].join(
    dict_df["title.ratings"],
    on="tconst",
    how="inner"
)

# Filtre: movies + Comedy + minimum 1000 votes
df_filtered = df_joined.filter(
    (col("titleType") == "movie") &
    (col("genres").contains("Comedy")) &
    (col("numVotes") >= 1000)
)

w = Window.orderBy(desc("averageRating"), desc("numVotes"))

highest_rated_comedy_movie = (
    df_filtered
    .withColumn("rank", dense_rank().over(w))
    .filter(col("rank") == 1)
    .drop("rank")
)

highest_rated_comedy_movie.show(truncate=False)

25/12/20 18:13:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 49:>                                                         (0 + 1) / 1]

+----------+---------+--------------+--------------+-------+---------+-------+--------------+-------------------+-------------+--------+
|tconst    |titleType|primaryTitle  |originalTitle |isAdult|startYear|endYear|runtimeMinutes|genres             |averageRating|numVotes|
+----------+---------+--------------+--------------+-------+---------+-------+--------------+-------------------+-------------+--------+
|tt38949436|movie    |Itlu Me Yedava|Itlu Me Yedava|0      |2025     |NULL   |132           |Comedy,Drama,Family|9.7          |2196    |
+----------+---------+--------------+--------------+-------+---------+-------+--------------+-------------------+-------------+--------+



25/12/20 18:13:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

**13.**

In [14]:
from pyspark.sql.functions import col, split, explode

# Joindre avec title.crew
highest_rated_comedy_movie_with_crew = (
    highest_rated_comedy_movie
    .join(dict_df["title.crew"], on="tconst", how="inner")
)

# Exploser les directors (car ils peuvent être multiples, séparés par des virgules)
directors_exploded = (
    highest_rated_comedy_movie_with_crew
    .withColumn("director_id", explode(split(col("directors"), ",")))
)

highest_rated_comedy_movie_director_df = (
    directors_exploded
    .join(dict_df["name.basics"], directors_exploded.director_id == dict_df["name.basics"].nconst, how="inner")
    .select("primaryName")
)

highest_rated_comedy_movie_director_df.show(truncate=False)

25/12/20 18:13:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/20 18:13:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

+---------------+
|primaryName    |
+---------------+
|Thrinadh Katari|
|Naveen Nalamasa|
+---------------+



**14**

For this last question we suspect that it was based on the fields provided in the _title.principals_ dataset (that we couldn't download at the beginning of this notebook since it causes OOM in Databricks...)

But we could easily imagine that the query to obtain the alternate titles would be something like:  
<br>
```
highest_rated_comedy_movie_titles_df = (
    highest_rated_comedy_movie
    .join(dict_df["title.principals"], on="tconst", how="inner")
    .select("primaryTitle", "alternatesTitle")
)

display(highest_rated_comedy_movie_titles_df)
```

---

## **Stream Processing**

### Overview

This section implements a real-time stream processing job that monitors Wikipedia edits for five entities from our IMDB dataset using the Wikimedia EventStreams API via `pywikibot`.

**Selected Entities (from IMDB dataset):**
1. **Christopher Nolan** - Director
2. **The Shawshank Redemption** - Top-rated movie
3. **Leonardo DiCaprio** - Actor
4. **Star Wars** - Movie franchise
5. **Science fiction** - Genre

**Metrics Tracked:**
- Edit count per entity
- Unique editors count
- Last edit timestamp

**Alert System:**
Every 5 edits on a tracked entity triggers an alert saved to `alerts.json`.

**Output Files:**
- `stream_metrics.json` - Metrics for all tracked entities
- `alerts.json` - High-frequency edit alerts

### Installation

```bash
pip install pywikibot requests-sse
```

In [15]:
%pip install pywikibot requests-sse
# Restart the kernel manually: Kernel > Restart Kernel

Note: you may need to restart the kernel to use updated packages.


In [16]:
import json
import os
from pywikibot.comms.eventstreams import EventStreams
from datetime import datetime, timedelta
from collections import defaultdict

# 5 entities from IMDB dataset to track
TRACKED_ENTITIES = {
    "Christopher Nolan": "director",
    "The Shawshank Redemption": "movie",
    "Leonardo DiCaprio": "actor",
    "Star Wars": "franchise",
    "Science fiction": "genre"
}

# Initialize storage
metrics = defaultdict(lambda: {"edit_count": 0, "unique_editors": set(), "last_edit": None})
alerts = []

print("Tracking entities:")
for entity, etype in TRACKED_ENTITIES.items():
    print(f"  - {entity} ({etype})")

Tracking entities:
  - Christopher Nolan (director)
  - The Shawshank Redemption (movie)
  - Leonardo DiCaprio (actor)
  - Star Wars (franchise)
  - Science fiction (genre)


In [17]:
# Output paths
METRICS_FILE = "stream_metrics.json"
ALERTS_FILE = "alerts.json"
LAST_EVENT_CACHE = "last_event_cache.txt"

def check_file_exists(path: str) -> bool:
    return os.path.exists(path)

def set_stream(start_time: datetime) -> EventStreams:
    """Initialize stream from cache or from 1 day ago"""
    if check_file_exists(LAST_EVENT_CACHE):
        with open(LAST_EVENT_CACHE, 'r') as f:
            return EventStreams(streams=["recentchange"], since=f.read().strip())
    else:
        since_date = (start_time - timedelta(days=1)).strftime('%Y%m%d')
        return EventStreams(streams=["recentchange"], since=since_date)

def save_metrics():
    """Save metrics and alerts to JSON files"""
    output = {entity: {"type": TRACKED_ENTITIES[entity], "edit_count": data["edit_count"],
                       "unique_editors": len(data["unique_editors"]), "last_edit": data["last_edit"]}
              for entity, data in metrics.items()}
    
    with open(METRICS_FILE, "w") as f:
        json.dump(output, f, indent=2)
    with open(ALERTS_FILE, "w") as f:
        json.dump(alerts, f, indent=2)
    
    print(f"Saved: {METRICS_FILE}, {ALERTS_FILE}")

In [18]:
def process_event(event: dict) -> bool:
    """Process event and check if it matches tracked entities"""
    title = event.get("title", "")
    user = event.get("user", "anonymous")
    timestamp = event.get("meta", {}).get("dt", datetime.now().isoformat())
    
    for entity in TRACKED_ENTITIES:
        if entity.lower() in title.lower():
            metrics[entity]["edit_count"] += 1
            metrics[entity]["unique_editors"].add(user)
            metrics[entity]["last_edit"] = timestamp
            
            print(f"Match: {entity} - '{title}' by {user}")
            
            # Alert every 5 edits
            if metrics[entity]["edit_count"] % 5 == 0:
                alerts.append({
                    "entity": entity,
                    "type": TRACKED_ENTITIES[entity],
                    "edit_count": metrics[entity]["edit_count"],
                    "timestamp": timestamp
                })
                print(f"ALERT: {entity} reached {metrics[entity]['edit_count']} edits!")
            return True
    return False

In [19]:
# Stream configuration
start_time = datetime.now()
duration_minutes = 5
stop_time = start_time + timedelta(minutes=duration_minutes)

print(f"Starting stream processing for {duration_minutes} minutes...")
print(f"Stop time: {stop_time.strftime('%H:%M:%S')}\n")

# Initialize stream (filter for English Wikipedia edits)
stream = set_stream(start_time)
stream.register_filter(server_name='en.wikipedia.org', type='edit')

# Process events
event_count = matched_count = 0

while datetime.now() < stop_time:
    try:
        event = next(stream)
        event_count += 1
        
        if process_event(event):
            matched_count += 1
        
        # Update cache
        event_timestamp = event.get('meta', {}).get('dt', '')
        if event_timestamp:
            with open(LAST_EVENT_CACHE, 'w') as f:
                f.write(event_timestamp)
        
        # Progress every 100 events
        if event_count % 100 == 0:
            print(f"Progress: {event_count} events ({matched_count} matched)")
            
    except Exception as e:
        print(f"Error: {e}")
        continue

# Save results
save_metrics()

# Summary
print(f"\n--- SUMMARY ---")
print(f"Duration: {duration_minutes} min | Events: {event_count} | Matched: {matched_count} | Alerts: {len(alerts)}")
print("\nResults:")
for entity, data in sorted(metrics.items(), key=lambda x: x[1]["edit_count"], reverse=True):
    print(f"  {entity}: {data['edit_count']} edits, {len(data['unique_editors'])} editors")

Starting stream processing for 5 minutes...
Stop time: 18:19:08

Progress: 100 events (0 matched)
Progress: 200 events (0 matched)
Progress: 300 events (0 matched)
Progress: 400 events (0 matched)
Progress: 500 events (0 matched)
Match: Star Wars - 'File:Star Wars Outlaws gameplay screenshot.jpg' by DatBot
Progress: 600 events (1 matched)
Progress: 700 events (1 matched)
Progress: 800 events (1 matched)
Progress: 900 events (1 matched)
Progress: 1000 events (1 matched)
Progress: 1100 events (1 matched)
Progress: 1200 events (1 matched)
Progress: 1300 events (1 matched)
Progress: 1400 events (1 matched)
Progress: 1500 events (1 matched)
Progress: 1600 events (1 matched)
Progress: 1700 events (1 matched)
Progress: 1800 events (1 matched)
Progress: 1900 events (1 matched)
Progress: 2000 events (1 matched)
Progress: 2100 events (1 matched)
Progress: 2200 events (1 matched)
Progress: 2300 events (1 matched)
Progress: 2400 events (1 matched)
Progress: 2500 events (1 matched)
Progress: 2600 e