In [2]:
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000/") \
    .getOrCreate()

parquet_path = "hdfs://namenode:9000/output/initial_preprocessed_movie_data.parquet"

def overview_data(df, rows=10):
    pandas_df = pd.DataFrame(df.head(rows), columns=df.columns)
    return pandas_df
    
movies = spark.read.parquet(parquet_path)
overview_data(movies)


Unnamed: 0,genres,year,num_genres,avg_rating,num_ratings,primaryTitle,originalTitle,isAdult,startYear,runtimeMinutes,IMDB_genres,averageRating,numVotes
0,Documentary,1894,1,2.444444,18,,,,,,,,
1,Documentary,1895,1,2.227273,11,,,,,,,,
2,Comedy|Horror,1896,2,2.5,22,,,,,,,,
3,(no genres listed),1900,1,3.3,5,,,,,,,,
4,Fantasy,1901,1,3.5,1,,,,,,,,
5,Comedy,1902,1,3.5,2,,,,,,,,
6,Comedy|Fantasy|Horror,1906,3,3.205882,17,,,,,,,,
7,Drama,1909,1,2.25,8,,,,,,,,
8,(no genres listed),1909,1,3.5,1,,,,,,,,
9,Comedy,1909,1,2.75,10,,,,,,,,


# Predictions

In [3]:
from pyspark.sql.functions import col, split, when, lit, udf
from pyspark.ml.feature import CountVectorizer, VectorAssembler, MinMaxScaler
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np

def get_similar_movies(movies_df, target_title, top_n=10):
    movies = movies_df.withColumn("genres_array", split(col("genres"), "\\|"))

    movies = movies.withColumn(
        "genres_array",
        when(col("genres_array").isNull(), lit([])).otherwise(col("genres_array"))
    )

    cv = CountVectorizer(inputCol="genres_array", outputCol="genre_vector")
    cv_model = cv.fit(movies)
    movies_vec = cv_model.transform(movies)

    numeric_cols = ["avg_rating", "runtimeMinutes", "startYear", "isAdult"]
    for c in numeric_cols:
        movies_vec = movies_vec.withColumn(c, col(c).cast("double"))
    movies_vec = movies_vec.fillna({c: 0.0 for c in numeric_cols})

    assembler_num = VectorAssembler(
        inputCols=numeric_cols,
        outputCol="numeric_raw"
    )
    movies_num = assembler_num.transform(movies_vec)

    scaler = MinMaxScaler(inputCol="numeric_raw", outputCol="numeric_scaled")
    scaler_model = scaler.fit(movies_num)
    movies_scaled = scaler_model.transform(movies_num)

    assembler_all = VectorAssembler(
        inputCols=["genre_vector", "numeric_scaled"],
        outputCol="full_features"
    )
    movies_final = assembler_all.transform(movies_scaled)

    target_vec = movies_final.filter(col("primaryTitle") == target_title).select("full_features").collect()
    if not target_vec:
        print(f"Film '{target_title}' not found.")
        return

    target_arr = np.array(target_vec[0]["full_features"].toArray())

    def cosine_sim(v):
        v_arr = np.array(v.toArray())
        dot = np.dot(target_arr, v_arr)
        norm = np.linalg.norm(target_arr) * np.linalg.norm(v_arr)
        return float(dot / norm) if norm != 0 else 0.0

    cosine_udf = udf(cosine_sim, returnType="double")

    result = (
        movies_final
        .withColumn("similarity", cosine_udf(col("full_features")))
        .orderBy(col("similarity").desc())
        .filter(col("primaryTitle") != target_title)
        .select("primaryTitle", "similarity", "avg_rating", "startYear", "runtimeMinutes")
        .limit(top_n)
    )

    print(f"Top {top_n} films, simmilar to '{target_title}':")
    return overview_data(result)


In [4]:
get_similar_movies(movies, "The Matrix")

Top 10 films, simmilar to 'The Matrix':


Unnamed: 0,primaryTitle,similarity,avg_rating,startYear,runtimeMinutes
0,Blade Runner,0.999933,4.110005,1982.0,117.0
1,The Sender,0.999682,4.0,1998.0,98.0
2,The Terminator,0.999643,3.902092,1984.0,107.0
3,UFO: Annihilate S.H.A.D.O. Kill Straker... Stop,0.999585,4.125,1974.0,86.0
4,The Cure,0.999558,4.0,2014.0,90.0
5,Blue Tornado,0.999531,4.0,1991.0,87.0
6,Equilibrium,0.999188,3.72871,2002.0,107.0
7,Captain America: Civil War,0.999144,3.695735,2016.0,147.0
8,Predator,0.999018,3.672675,1987.0,107.0
9,UFO: Destroy Moonbase,0.998926,3.7,1971.0,90.0


In [None]:
;/.ll