###   1.Load all IMDb datasets from DBFS using Spark

In [0]:
# Load all IMDb datasets from DBFS using Spark

title_principals_df = spark.read.option("header", True).option("sep", "\t").csv(
    "dbfs:/FileStore/shared_uploads/jilson.jose@edu.ece.fr/title_principals_tsv-1.gz"
)

title_akas_df = spark.read.option("header", True).option("sep", "\t").csv(
    "dbfs:/FileStore/shared_uploads/jilson.jose@edu.ece.fr//title_akas_tsv-1.gz"
)

title_basics_df = spark.read.option("header", True).option("sep", "\t").csv(
    "dbfs:/FileStore/shared_uploads/jilson.jose@edu.ece.fr/title_basics_tsv-1.gz"
)

name_basics_df = spark.read.option("header", True).option("sep", "\t").csv(
    "dbfs:/FileStore/shared_uploads/jilson.jose@edu.ece.fr/name_basics_tsv-1.gz"
)

title_crew_df = spark.read.option("header", True).option("sep", "\t").csv(
    "dbfs:/FileStore/shared_uploads/jilson.jose@edu.ece.fr/title_crew_tsv-1.gz"
)

title_episode_df = spark.read.option("header", True).option("sep", "\t").csv(
    "dbfs:/FileStore/shared_uploads/jilson.jose@edu.ece.fr/title_episode_tsv-1.gz"
)

title_ratings_df = spark.read.option("header", True).option("sep", "\t").csv(
    "dbfs:/FileStore/shared_uploads/jilson.jose@edu.ece.fr/title_ratings_tsv-1.gz"
) 

### 2.How many total people in data set

In [0]:
total_people = name_basics_df.select("nconst").distinct().count()
print(f"Total people: {total_people}")

Total people: 14315377


### 3.what is the earliest year of birth

In [0]:
from pyspark.sql.functions import col

# Filter valid numeric years
birth_years_df = name_basics_df.filter(col("birthYear").rlike("^\d{4}$")).withColumn("birthYear", col("birthYear").cast("int"))
earliest_birth = birth_years_df.orderBy("birthYear").select("primaryName", "birthYear").first()
print(f"Earliest birth year: {earliest_birth['birthYear']} by {earliest_birth['primaryName']}")


Earliest birth year: 1048 by Omar Khayyam


### 4.how many years ago was this person born

In [0]:
from datetime import datetime
current_year = datetime.now().year
years_ago = current_year - earliest_birth["birthYear"]
print(f"{earliest_birth['primaryName']} was born {years_ago} years ago.")


Omar Khayyam was born 977 years ago.


### 5.using only the data in the data set is this date of birth correct . explain the answer
**Answer:** From the data alone, we can't make claims of correctness. However, in the event that this person worked in productions during the early 1900s, the birth year is reasonable and consistent within itself.


### 6.what is the latest data of birth

In [0]:
latest_birth = birth_years_df.orderBy(col("birthYear").desc()).select("primaryName", "birthYear").first()
print(f"Latest birth year: {latest_birth['birthYear']} by {latest_birth['primaryName']}")


Latest birth year: 2024 by Ronnie Lordi


### 7.how many people do not have a year of birth

In [0]:
missing_births = name_basics_df.filter((col("birthYear") == "\\N") | col("birthYear").isNull()).count()
print(f"People without birth year: {missing_births}")


People without birth year: 13672818


### 8.what is the length of the longest short after 1900

In [0]:
shorts_df = title_basics_df.filter(
    (col("genres").contains("Short")) & 
    (col("startYear").rlike("^\d{4}$")) & 
    (col("runtimeMinutes").rlike("^\d+$"))
).withColumn("startYear", col("startYear").cast("int")).withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

longest_short = shorts_df.filter(col("startYear") > 1900).orderBy(col("runtimeMinutes").desc()).select("primaryTitle", "runtimeMinutes").first()
print(f"Longest short after 1900: {longest_short['primaryTitle']} ({longest_short['runtimeMinutes']} min)")


Longest short after 1900: Project Bolo (400 min)


### 9.what is the length of the shortest movie after 1900

In [0]:
movies_df = title_basics_df.filter(
    (col("startYear").rlike("^\d{4}$")) & 
    (col("runtimeMinutes").rlike("^\d+$"))
).withColumn("startYear", col("startYear").cast("int")).withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

shortest_movie = movies_df.filter(col("startYear") > 1900).orderBy("runtimeMinutes").select("primaryTitle", "runtimeMinutes").first()
print(f"Shortest movie after 1900: {shortest_movie['primaryTitle']} ({shortest_movie['runtimeMinutes']} min)")
# Filter out runtimeMinutes == 0
movies_df = movies_df.filter((col("startYear") > 1900) & (col("runtimeMinutes") > 0))

shortest_movie = movies_df.orderBy("runtimeMinutes").select("primaryTitle", "runtimeMinutes").first()

print(f"Shortest movie after 1900 Excluding O Minutes: {shortest_movie['primaryTitle']} ({shortest_movie['runtimeMinutes']} min)")


Shortest movie after 1900: Storm P. tegner de Tree Små Mænd (0 min)
Shortest movie after 1900 Excluding O Minutes: What Came Out of the Cheese; or, The Lilliputians in a New York Restaurant (1 min)


### 10.provide a list of all of the genres represented

In [0]:
from pyspark.sql.functions import explode, split

genres_df = title_basics_df.select(split(col("genres"), ",").alias("genre_list"))
distinct_genres = genres_df.select(explode("genre_list").alias("genre")).distinct().orderBy("genre")
distinct_genres.show(truncate=False)


+-----------+
|genre      |
+-----------+
|Action     |
|Adult      |
|Adventure  |
|Animation  |
|Biography  |
|Comedy     |
|Crime      |
|Documentary|
|Drama      |
|Family     |
|Fantasy    |
|Film-Noir  |
|Game-Show  |
|History    |
|Horror     |
|Music      |
|Musical    |
|Mystery    |
|News       |
|Reality-TV |
+-----------+
only showing top 20 rows



### 11.what is the higest rated comedy movie in the dataset . note, if there is a tie, the tie shall be broken by the movie with the most votes .

In [0]:
# Clean and join
ratings_clean = title_ratings_df.withColumn("averageRating", col("averageRating").cast("float")) \
                                 .withColumn("numVotes", col("numVotes").cast("int"))

comedy_movies_df = title_basics_df.filter(col("genres").contains("Comedy"))
comedy_ratings_df = comedy_movies_df.join(ratings_clean, on="tconst", how="inner")

# Sort by rating and votes
top_comedy = comedy_ratings_df.orderBy(col("averageRating").desc(), col("numVotes").desc()).select("tconst", "primaryTitle", "averageRating", "numVotes").first()
print(f"Top comedy: {top_comedy['primaryTitle']} ({top_comedy['averageRating']} ⭐, {top_comedy['numVotes']} votes)")


Top comedy: I challenge the Ender Dragon in Minecraft (Ending) (10.0 ⭐, 320 votes)


### 12.who was the director of the movie.

In [0]:
# Get director using title_crew
director_row = title_crew_df.filter(col("tconst") == top_comedy["tconst"]).select("directors").first()

if director_row and director_row["directors"] != "\\N":
    director_ids = director_row["directors"].split(",")
    directors = name_basics_df.filter(col("nconst").isin(director_ids)).select("primaryName").rdd.flatMap(lambda x: x).collect()
    print(f"Director(s): {', '.join(directors)}")
else:
    print("No director listed.")


Director(s): Felix Kjellberg


### 13.List, if any, the alternate titles for the movie .

In [0]:
alt_titles = title_akas_df.filter(col("titleId") == top_comedy["tconst"]).select("title").distinct().rdd.flatMap(lambda x: x).collect()
print(f"Alternate titles: {', '.join(alt_titles)}")


Alternate titles: I challenge the Ender Dragon in Minecraft (Ending)


# Degrees of seperation
A degree of separation is a way to measure how far apart two people (in this case, actors) are from each other in a network, based on their connections.

Here's a simple explanation:

1st Degree: Direct connection

If Actor A and Actor B have worked together in a movie, they have 1 degree of separation Example: Tom Hanks and Tim Allen worked together in "Toy Story", so they are 1 degree apart
2nd Degree: Connection through one intermediary

If Actor A worked with Actor B, and Actor B worked with Actor C (but A never worked with C), then A and C have 2 degrees of separation Example: If Tom Hanks worked with Julia Roberts, and Julia Roberts worked with George Clooney (but Tom Hanks never worked with Clooney), then Hanks and Clooney are 2 degrees apart
3rd Degree: Connection through two intermediaries

Continues the chain with one more person in between Example: Actor A → Actor B → Actor C → Actor D
Explore the degrees of seperation for nconst nm0000102 to the 6th Degree . Please note, you may need to persist each degree of seperation to permanent storage as the community edition may not have the resources to do the calculations in one run .

Build Actor-Movie Pairs and Build the Actor-to-Actor Graph

In [0]:
from pyspark.sql.functions import col

# Only keep actors and actresses
actor_movie_df = title_principals_df.filter(
    col("category").isin("actor", "actress")
).select("tconst", "nconst").distinct()

actor_edges_df = actor_movie_df.alias("a").join(
    actor_movie_df.alias("b"),
    on="tconst"
).filter(col("a.nconst") != col("b.nconst")) \
 .select(col("a.nconst").alias("from_actor"), col("b.nconst").alias("to_actor")).distinct()


##  BFS - Degree of Separation from Kevin Bacon
###  14.Build the degrees of seperation . 

In [0]:
# Start with Kevin Bacon
from pyspark.sql.functions import lit

# Degree 0: Kevin Bacon himself
degree_0 = spark.createDataFrame([("nm0000102", 0)], ["nconst", "degree"])

# Save initial degree
degree_0.write.mode("overwrite").parquet("/tmp/degree_0")

# Build BFS levels 1 to 6
current_level = degree_0
for i in range(1, 7):
    prev_level = spark.read.parquet(f"/tmp/degree_{i - 1}")
    # Join edges to expand
    next_level = prev_level.join(actor_edges_df, prev_level.nconst == actor_edges_df.from_actor, how="inner") \
        .select(col("to_actor").alias("nconst")).distinct()

    # Remove already visited
    visited = spark.read.parquet(f"/tmp/degree_0")
    for j in range(1, i):
        visited = visited.union(spark.read.parquet(f"/tmp/degree_{j}"))
    next_level = next_level.join(visited, on="nconst", how="left_anti")

    # Add degree info
    next_level = next_level.withColumn("degree", lit(i))

    # Save to storage
    next_level.write.mode("overwrite").parquet(f"/tmp/degree_{i}")
    print(f"Degree {i} saved.")


Degree 1 saved.
Degree 2 saved.
Degree 3 saved.
Degree 4 saved.
Degree 5 saved.
Degree 6 saved.


In [0]:
degree_counts = []
for i in range(7):
    df = spark.read.parquet(f"/tmp/degree_{i}")
    count = df.count()
    degree_counts.append((i, count))

sorted_counts = sorted(degree_counts, key=lambda x: x[1], reverse=True)
print("Degree counts:", degree_counts)

most = sorted_counts[0]
most_excluding_zero = sorted_counts[1]
least = sorted_counts[-1]
print(f"Most people: Degree {most[0]} ({most[1]})")
print(f"Most (excluding Degree 0): Degree {most_excluding_zero[0]} ({most_excluding_zero[1]})")
print(f"Least people: Degree {least[0]} ({least[1]})")


Degree counts: [(0, 1), (1, 888), (2, 107313), (3, 1124643), (4, 1440871), (5, 315279), (6, 48052)]
Most people: Degree 4 (1440871)
Most (excluding Degree 0): Degree 3 (1124643)
Least people: Degree 0 (1)


### 15.Which degree contains the most people?

In [0]:
degree_counts = []
for i in range(7):
    df = spark.read.parquet(f"/tmp/degree_{i}")
    count = df.count()
    degree_counts.append((i, count))
    print(f"Degree {i}: {count} people")

# Find the max
most = max(degree_counts, key=lambda x: x[1])
print(f"\n Most people overall: Degree {most[0]} with {most[1]} people")


Degree 0: 1 people
Degree 1: 888 people
Degree 2: 107313 people
Degree 3: 1124643 people
Degree 4: 1440871 people
Degree 5: 315279 people
Degree 6: 48052 people

 Most people overall: Degree 4 with 1440871 people


### 16.Aside from Degree 0, which degree contains the most people?

In [0]:
# Exclude degree 0
most_excl_0 = max(degree_counts[1:], key=lambda x: x[1])
print(f" Most people (excluding Degree 0): Degree {most_excl_0[0]} with {most_excl_0[1]} people")


 Most people (excluding Degree 0): Degree 4 with 1440871 people


### 17.Which contains the least .

In [0]:
least = min(degree_counts, key=lambda x: x[1])
print(f" Least people (including Degree 0): Degree {least[0]} with {least[1]} people")
# Exclude degree 0
least = min(degree_counts[1:], key=lambda x: x[1])
print(f"Least people (excluding Degree 0): Degree {least[0]} with {least[1]} people")


 Least people (including Degree 0): Degree 0 with 1 people
Least people (excluding Degree 0): Degree 1 with 888 people


### 18.Is the person from question 3 within 6 Degrees of nm0000102, if so, how many ?

In [0]:
earliest_nconst =  earliest_birth['primaryName']

found = False
for i in range(1, 7):
    df = spark.read.parquet(f"/tmp/degree_{i}")
    if df.filter(col("nconst") == earliest_nconst).count() > 0:
        print(f"{earliest_nconst} is within {i} degrees of Kevin Bacon.")
        found = True
        break

if not found:
    print(f"{earliest_nconst} is NOT within 6 degrees.")


Omar Khayyam is NOT within 6 degrees.


Which degree contains the least people?

### 19.Is nm0000102 within 6 degrees of the movie from question 11, if so, how many ?

In [0]:
top_comedy_tconst = top_comedy['primaryTitle']

comedy_actors = title_principals_df.filter(
    (col("tconst") == top_comedy_tconst) &
    (col("category").isin("actor", "actress"))
).select("nconst").distinct().rdd.flatMap(lambda x: x).collect()

for i in range(1, 7):
    df = spark.read.parquet(f"/tmp/degree_{i}")
    overlap = df.filter(col("nconst").isin(comedy_actors))
    if overlap.count() > 0:
        print(f"Kevin Bacon is within {i} degrees of top comedy movie actors.")
        break
else:
    print("Kevin Bacon is NOT within 6 degrees of the comedy movie.")


Kevin Bacon is NOT within 6 degrees of the comedy movie.
