In [5]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from ast import literal_eval
import re
import os
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.feature_extraction import FeatureHasher
from sklearn.decomposition import PCA
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from collections import Counter
from pyspark.sql.functions import col
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()


In [None]:
import kagglehub
import os
import shutil

# Download the dataset
path = kagglehub.dataset_download("asaniczka/tmdb-movies-dataset-2023-930k-movies")
print("Dataset downloaded to:", path)

# Walk through all files to find the actual CSV path
for root, dirs, files in os.walk(path):
    for file in files:
        print("Found file:", os.path.join(root, file))

# Step 2: Set target directory to one level above current folder
parent_dir = os.path.abspath("..")
target_dir = os.path.join(parent_dir, "tmdb_data")
os.makedirs(target_dir, exist_ok=True)

# Step 3: Define source and destination paths
source_file = os.path.join(path, "TMDB_movie_dataset_v11.csv")
dest_file = os.path.join(target_dir, "TMDB_movie_dataset_v11.csv")

# Step 4: Copy the file
shutil.copy(source_file, dest_file)

# Step 5: Verify contents
print("Dataset successfully copied to:", target_dir)
print("Contents of target folder:", os.listdir(target_dir))

In [1]:
from pyspark.sql import SparkSession

# Create (or retrieve) the SparkSession
spark = (
    SparkSession
      .builder
      .appName("TMDB-Revenue-Pipeline")
      .master("local[*]")              # run locally on all cores
      .config("spark.ui.port", "4050") # avoid port conflicts for the UI
      .getOrCreate()
)


In [2]:
import os
parent_dir = os.path.abspath("..")
dataset_dir = os.path.join(parent_dir, "Dataset/TMDB_movie_dataset_v11.csv")
# Load the dataset into a Spark DataFrame (assuming the CSV is inside the folder)
df = spark.read.csv(dataset_dir, header=True, inferSchema=True)


In [4]:
df.printSchema()
df.show(5)

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- status: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- keywords: string (nullab

In [3]:
from pyspark.sql.functions import to_date, year, month, quarter,col, when
from pyspark.ml.feature import Imputer
from pyspark.sql.types import IntegerType, FloatType
# 1. Convert 'release_date' to date type
df = df.withColumn('release_date', to_date('release_date', 'yyyy-MM-dd'))

# 2. Extract year, month, quarter
df = df.withColumn('release_year', year('release_date'))
df = df.withColumn('release_month', month('release_date'))
df = df.withColumn('release_quarter', quarter('release_date'))

# 3. Drop the original 'release_date' column
df = df.drop('release_date')
# 3. Convert 'adult' to numeric (0/1)
df = df.withColumn('adult', when(col('adult') == True, 1)
                              .when(col('adult') == 'True', 1)
                              .otherwise(0))
# Keep only rows where status is 'Released'
df = df.filter(col('status') == 'Released')
# Drop unnecessary columns
features_num = ['vote_average','vote_count','runtime','release_year','release_month','release_quarter','adult','budget']

# 7. Convert numeric columns to their respective types
df = df.withColumn('vote_average', col('vote_average').cast(FloatType()))
df = df.withColumn('vote_count', col('vote_count').cast(IntegerType()))
df = df.withColumn('runtime', col('runtime').cast(IntegerType()))
df = df.withColumn('budget', col('budget').cast(IntegerType()))
# Fill NULLs with '' (empty string) for selected columns
features_cat=['keywords', 'genres', 'overview', 'spoken_languages', 'production_countries','production_companies']
df = df.fillna('', subset=features_cat)
# Combine both lists to get the columns you want to keep
columns_to_keep = features_num + features_cat

# Get the list of all columns in the DataFrame
all_columns = df.columns

# Find columns that are not in the keep list
columns_to_drop = [col for col in all_columns if col not in columns_to_keep]

# Drop those columns
df = df.drop(*columns_to_drop)

#impute missing values for numeric columns with mean 

# Create the Imputer
imputer = Imputer(
    inputCols=features_num,
    outputCols=features_num
).setStrategy("mean")

# Fit and transform the DataFrame
df = imputer.fit(df).transform(df)

df.drop_duplicates()
# Show results
df.show(truncate=False)

+------------+----------+-------+-----+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
from ast import literal_eval
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
def remove_stopwords(text):
    words = [word for word in text.split() if word.lower() not in stop_words]
    return " ".join(words)

def lemmatize_text(text):
    words = [lemmatizer.lemmatize(word) for word in text.split()]
    return " ".join(words)
def jaccard_similarity(set1, set2):
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    return intersection / union if union != 0 else 0

# 1. Define your safe_literal_eval function
def safe_literal_eval(value):
    if isinstance(value, str):
        try:
            return literal_eval(value)
        except (ValueError, SyntaxError):
            return value.split(',') if value else []
    return value if isinstance(value, list) else []

# 2. Register it as a UDF
safe_literal_eval_udf = udf(safe_literal_eval, ArrayType(StringType()))

# 3. List of features
features = ['keywords', 'genres', 'spoken_languages', 'production_countries', 'production_companies']

# 4. Apply the UDF to each feature
for feature in features:
    df = df.withColumn(feature, safe_literal_eval_udf(col(feature)))

# 5. Show result
df.show(truncate=False)

+------------+----------+-------+-----+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------+---------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------

In [7]:
def clean_data(x):
    if isinstance(x, list):
        return [str(i).lower().replace(" ", "") for i in x]
    else:
        return []

clean_data_udf = udf(clean_data, ArrayType(StringType()))

features = ['keywords', 'genres', 'spoken_languages', 'production_countries', 'production_companies']

for feature in features:
    df = df.withColumn(feature, clean_data_udf(col(feature)))

# 5. Show result
df.show(truncate=False)

+------------+----------+-------+-----+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------