In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql import functions as F

In [None]:
spark= SparkSession.builder\
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")\
    .config("spark.jars", "sqljdbc42.jar")\
    .getOrCreate()

In [None]:
spark

In [None]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "moviesProject") \
    .option("enable.auto.commit", True) \
    .option("startingOffsets", "latest") \
    .load()

In [None]:
df.printSchema()

In [None]:
schema = StructType([
    StructField("show_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
    StructField("director", StringType(), True),
    StructField("cast", StringType(), True),
    StructField("country", StringType(), True),
    StructField("date_added", StringType(), True),
    StructField("release_year", StringType(), True),
    StructField("rating", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("listed_in", StringType(), True),
    StructField("description", StringType(), True),
    StructField("source", StringType(), True),
    StructField("total_views", StringType(), True),
    StructField("user_reviews", StringType(), True),
    StructField("user_rating", StringType(), True)
])

In [None]:
json_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

In [None]:
json_df.printSchema()

In [None]:
Updateemptystring = json_df.replace("", None)


In [None]:
removewhitespace = Updateemptystring.select([ltrim(c).alias(c) for c in Updateemptystring.columns])

In [None]:
drop_date_added = removewhitespace.drop("date_added")

In [None]:
df_cleaned = drop_date_added.withColumn("listed_in", regexp_replace(col("listed_in"), "[&-]", ","))

In [None]:
explode_listed_in= df_cleaned.withColumn("Category", explode(split("listed_in", ","))).drop("listed_in")

In [None]:
df_exploded = explode_listed_in.withColumn("Category", trim(col("Category")))

In [None]:
df_cleaned_years = df_exploded.filter(~col("Category").rlike("^[0-9]{4}$"))  # Remove years
df_cleaned_duration = df_cleaned_years.filter(~col("Category").rlike("^[0-9]+ min$")&
    ~col("Category").rlike("Seasons"))  # Remove durations

In [None]:
country_list = ["Germany", "France", "United States", "India","Nigeria"]

# Define a UDF to check if a value is a country
def is_country(value):
    if value in country_list:
        return value
    return None

In [None]:
from pyspark.sql.functions import udf
is_country_udf = udf(is_country, StringType())

In [None]:
df_with_countries = df_cleaned_duration.withColumn("country_detected", is_country_udf(col("Category")))

In [None]:
df_without_countries = df_with_countries.filter(col("country_detected").isNull()).drop("country_detected")

In [None]:
category_mapping = {
    'TV Shows': 'Television',
    'International': 'Television',
    'Music Videos and Concerts': 'Music',
    'Romance': 'Romance',
    'Young Adult Audience': 'Young Adult Audience',
    'Danny Tellez': 'Other',
    'Yakima Canutt': 'Other',
    'Entertainment': 'Entertainment',
    'Guinn ``big Boy'' Williams': 'Other',
    'Adventure': 'Adventure',
    'Sports': 'Sports',
    'Faith and Spirituality': 'Spiritual',
    'Drama': 'Drama',
    'Military and War': 'Military and War',
    'Documentary': 'Documentary',
    'Manuel González Sabin Cañita': 'Other',
    'Fitness': 'Health',
    'Fantasy': 'Fantasy',
    'LGBTQ': 'LGBTQ',
    'Tom Lingham': 'Other',
    'and Culture': 'Culture',
    'BJ Minor': 'Other',
    'Mark Salidino': 'Other',
    'Lew Luana': 'Other',
    'Martin Kove': 'Other',
    'Iron Eyes Cody': 'Other',
    'Crystal Howell': 'Other',
    'Ivan Miller': 'Other',
    'Tex Palmer': 'Other',
    'Anime': 'Animation',
    'Suspense': 'Thriller',
    'Charles Arnt': 'Other',
    'Animation': 'Animation',
    'Arts': 'Art',
    'Kayden Bryce': 'Other',
    'Special Interest': 'Special Interest',
    'Mic Larry': 'Other',
    'Kids': 'Children',
    'Science Fiction': 'Sci-Fi',
    'Tony Lee': 'Other',
    'Arthouse': 'Art',
    'Horror': 'Horror',
    'George Cleveland': 'Other',
    'Western': 'Western',
    'Luxembourg': 'Other',
    'Reese AKA LowKeyRG': 'Other',
    'William Haade': 'Other',
    'Comedy': 'Comedy',
    'Action': 'Action',
    'Historical': 'History',
    'Talk Show and Variety': 'Talk Show',
    'Glenn Strange': 'Other',
    'Unscripted': 'Reality',
    'October 16': 'Other',
    'Romantic TV Shows': 'Romance',
    'Korean TV Shows': 'Television',
    'Patricia López Arnaiz': 'Other',
    'Comedies': 'Comedy',
    'Science': 'Science',
    'Faith': 'Spiritual',
    'Musicals': 'Musical',
    'Anime Features': 'Animation',
    'Marta Larralde': 'Other',
    'Sports Movies': 'Sports',
    'Reality TV': 'Reality',
    'April 5': 'Other',
    'TV Horror': 'Horror',
    'Janeane Garofalo': 'Other',
    'Crime TV Shows': 'Crime',
    'Margaret Cho': 'Other',
    'Cult TV': 'Cult',
    'Independent Movies': 'Indie',
    'Language TV Shows': 'Television',
    'Talk Shows': 'Talk Show',
    'Anime Series': 'Animation',
    "Kids' TV": 'Children',
    'Dramas': 'Drama',
    '2 Seasons': 'Other',
    'Eden Marryshow': 'Other',
    'Romantic Movies': 'Romance',
    'TV Mysteries': 'Mystery',
    'Spanish': 'Other',
    'British TV Shows': 'Television',
    'Teen TV Shows': 'Television',
    'Akin Lewis': 'Other',
    'International Movies': 'Movies',
    'TV Action': 'Action',
    'Movies': 'Movies',
    'TV Comedies': 'Comedy',
    'Horror Movies': 'Horror',
    'Thrillers': 'Thriller',
    'TV Dramas': 'Drama',
    'Heavy D': 'Other',
    'Music': 'Music',
    'Spirituality': 'Spiritual',
    'Family Movies': 'Family',
    'TV Sci': 'Sci-Fi',
    'International TV Shows': 'Television',
    'Up Comedy': 'Comedy',
    'Docuseries': 'Documentary',
    'Classic Movies': 'Classic',
    'TV Thrillers': 'Thriller',
    'Nature TV': 'Nature',
    'January 16': 'Other',
    'Netflix': 'Other',
    'Children': 'Children',
    'Stand': 'Comedy',
    'LGBTQ Movies': 'LGBTQ',
    'United Kingdom': 'Other',
    'Cult Movies': 'Cult',
    'Classic': 'Classic',
    'Wellness': 'Health',
    'Crime': 'Crime',
    'Thriller': 'Thriller',
    'Food': 'Food',
    'Teen': 'Teen',
    'Sketch Comedy': 'Comedy',
    'Health': 'Health',
    'Culture': 'Culture',
    'Latino': 'Other',
    'Family': 'Family',
    'Reality': 'Reality',
    'Sitcom': 'Comedy',
    'History': 'History',
    'Mystery': 'Mystery',
    'Classics': 'Classic',
    'Stand Up': 'Comedy',
    'Technology': 'Technology',
    'Lifestyle': 'Lifestyle',
    'Game Shows': 'Game Show',
    'Black Stories': 'Black Stories',
    'Cartoons': 'Animation',
    'Cooking': 'Food',
    'LGBTQ+': 'LGBTQ',
    'Adult Animation': 'Animation',
    'Late Night': 'Talk Show',
    'News': 'News',
    'Kosovo': 'Other',
    'Buddy': 'Other',
    'Police/Cop': 'Crime',
    'Survival': 'Survival',
    'Travel': 'Travel',
    'Biographical': 'Biography',
    'Spy/Espionage': 'Thriller',
    'Soap Opera / Melodrama': 'Drama',
    'Disaster': 'Disaster',
    'Series': 'Series',
    'Musical': 'Musical',
    'Concert Film': 'Music',
    'Romantic Comedy': 'Romance Comedy',
    'Variety': 'Variety',
    'Nature': 'Nature',
    'Coming of Age': 'Movies',
    'Animals': 'Animals',
    'Superhero': 'Action',
    'Anthology': 'Movies',
    'Medical': 'Medical',
    'Dance': 'Screen Dance',
    'Parody': 'Comedy',
    'Talk Show': 'Talk Show',
    'Game Show / Competition': 'Game Show',
    'Cameron Fraser': 'Other'
}

In [None]:
standardized_categories_df = df_without_countries.withColumn(
    "Standardized_Category",
    F.when(
        F.col("Category").isin(list(category_mapping.keys())),
        F.col("Category")
    ).otherwise("Other")
)

In [None]:
def map_categories(category):
    return category_mapping.get(category, 'Other')

In [None]:
map_categories_udf = F.udf(map_categories)

In [None]:
standardized_categories_df = standardized_categories_df.withColumn(
    "Standardized_Category",
    map_categories_udf(F.col("Standardized_Category"))
)

In [None]:
drop_category_df = standardized_categories_df.drop("Category")

In [None]:
drop_category_df.printSchema()

In [None]:
df_casted = drop_category_df.withColumn("total_views", col("total_views").cast(IntegerType())) \
              .withColumn("user_reviews", col("user_reviews").cast(IntegerType())) \
              .withColumn("user_rating", col("user_rating").cast(FloatType()))

In [None]:
director_nulls= df_casted.replace('Null', "unknown")

In [None]:
cast_nulls= director_nulls.replace('Null', "unknown")

In [None]:
# Filter out rows where 'director' and 'cast' columns that are fully numeric (integers)
numeric_pattern = "^[0-9]+$"

filtered_df = cast_nulls.filter(
    ~(
        col("director").rlike(numeric_pattern) &
        col("cast").rlike(numeric_pattern)
    )
)

In [None]:
filtered_df.printSchema()

In [None]:
description_nulls= filtered_df.replace('Null', "unknown")

In [None]:
casting_year = description_nulls.withColumn("release_year", col("release_year").cast(IntegerType()))

In [None]:
casting_year.printSchema()

In [None]:
# Define thresholds for classification
short_threshold = 90   # minutes
medium_threshold = 150 # minutes

In [None]:
# Classify movies based on duration directly
bins_movies = casting_year.withColumn(
    "movie_duration_category",
    when(regexp_extract(col("duration"), r"(\d+)", 1).cast("integer") <= short_threshold, "short")
    .when(
        (regexp_extract(col("duration"), r"(\d+)", 1).cast("integer") > short_threshold) &
        (regexp_extract(col("duration"), r"(\d+)", 1).cast("integer") <= medium_threshold),
        "medium"
    )
    .otherwise("long")
)

In [None]:
bins_movies.printSchema()

In [None]:
country_nulls= bins_movies.replace('Null', "unknown")

In [None]:
duration_nulls= country_nulls.replace('Null', "unknown")

In [None]:
duration_nulls.printSchema()

In [None]:
exploded_countries = duration_nulls.withColumn("country", explode(split("country", ",")))
dropnullcountries = exploded_countries.na.drop(subset=["country"])

In [None]:
checkpoint_dir = "/home/mostafa/MoviesProjectCheckpoint"

def write_to_sql_server(batch_df, batch_id):
    try:
        batch_df.write \
            .format("jdbc") \
            .option("url", "jdbc:sqlserver://192.168.1.11:1433;databaseName=MoviesProject;user=mostafa;password=mostafa7amdy;") \
            .option("dbtable", "moviesProject") \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .option("checkpointLocation", checkpoint_dir) \
            .mode("append")\
            .save()
    except Exception as e:
        print(str(e))

# Start the stream and write to SQL Server
dropnullcountries.writeStream \
    .foreachBatch(write_to_sql_server) \
    .start() 


In [44]:
duration_nulls.isStreaming