**Lsh preprocessing**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("MovieRecommendationExport") \
    .master("local[*]") \
    .getOrCreate()

# Define Schemas
ratingsSchema = StructType([
    StructField("userID", IntegerType(), True),
    StructField("movieID", IntegerType(), True),
    StructField("rating", FloatType(), True)
])

moviesSchema = StructType([
    StructField("movieID", IntegerType(), True),
    StructField("movieTitle", StringType(), True),
    StructField("genres", StringType(), True),
    StructField("preproceesedStr", StringType(), True),
    StructField("genreVector", StringType(), True)
])



movies = spark.read.option("charset", "ISO-8859-1") \
    .schema(moviesSchema) \
    .csv("movies.csv")





validGenres = [
    "Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary",
    "Drama", "Fantasy", "Film-Noir", "Horror", "IMAX", "Musical", "Mystery",
    "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

# Split genres into an array
movies = movies.withColumn("preproceesed", F.split(F.col("genres"), "\\|"))
movies = movies.withColumn("preproceesedStr", F.concat_ws(",", F.col("preproceesed")))


movies = movies.withColumn("genreList", F.split(F.col("genres"), "\\|"))

# generate a binary vector for each movie
def genre_to_vector(genres):
    genre_set = set(genres) if genres else set()
    return Vectors.dense([1.0 if genre in genre_set else 0.0 for genre in validGenres])
genre_vector_udf = F.udf(genre_to_vector, VectorUDT())
movies = movies.withColumn("genreVector", genre_vector_udf(F.col("genreList")))
movies = movies.withColumn("genreArray", vector_to_array("genreVector"))
movies = movies.withColumn("genreArrayStr", F.concat_ws(",", "genreArray"))
movies.select("movieID", "movieTitle", "genres","preproceesedStr", "genreArrayStr") \
    .write.option("header", "true") \
    .csv("movies_with_genre_vectors.csv")




# Load datasets
ratings = spark.read.schema(ratingsSchema).csv("ratings.csv", header=True)
movies = spark.read.schema(moviesSchema).csv("movies_with_genre_vectors.csv", header=True)

# Filter out invalid genre vectors
movies = movies.filter(F.col("genreVector").isNotNull())

# Convert genreVector string to Spark Vector
def parse_vector(vector_str):
    try:
        return Vectors.dense([float(x) for x in vector_str.split(',')])
    except:
        return Vectors.dense([0.0] * 19)  # Assuming 19 genres

parse_vector_udf = F.udf(parse_vector, VectorUDT())
movies = movies.withColumn("genreVector", parse_vector_udf(F.col("genreVector")))

# User ID for recommendations
specific_user_id = 1

# Get movies already rated by the user
rated_movies = ratings.filter(F.col("userID") == specific_user_id).select("movieID")

# Calculate user's average rating
user_ratings = ratings.filter(F.col("userID") == specific_user_id)
user_avg_rating = user_ratings.agg(F.avg("rating").alias("avg_rating")).collect()[0]["avg_rating"]

user_ratings_with_genres = user_ratings.join(movies, on="movieID") \
    .withColumn("adjusted_rating", F.col("rating") - user_avg_rating)

# Calculate weighted genre vectors
def weighted_genre_vector(genre_vector, adjusted_rating):
    return Vectors.dense([x * adjusted_rating for x in genre_vector.toArray()])

weighted_genre_udf = F.udf(weighted_genre_vector, VectorUDT())
user_ratings_with_genres = user_ratings_with_genres.withColumn(
    "weighted_genre_vector",
    weighted_genre_udf(F.col("genreVector"), F.col("adjusted_rating"))
)

# Function to sum vectors
def sum_vectors(vectors):
    if not vectors:
        return Vectors.dense([0.0] * 19)
    summed_vector = vectors[0]
    for vec in vectors[1:]:
        summed_vector += vec
    return summed_vector

# Calculate count vector
count_vector_df = user_ratings_with_genres.groupBy("userID") \
    .agg(F.collect_list("genreVector").alias("genre_vectors")) \
    .withColumn("count_vector", F.udf(sum_vectors, VectorUDT())(F.col("genre_vectors")))

# Calculate weighted sum vector
weighted_sum_df = user_ratings_with_genres.groupBy("userID") \
    .agg(F.collect_list("weighted_genre_vector").alias("weighted_vectors")) \
    .withColumn("weighted_sum", F.udf(sum_vectors, VectorUDT())(F.col("weighted_vectors")))

def divide_vectors(weighted_vec, count_vec):
    result = []
    for w, c in zip(weighted_vec.toArray(), count_vec.toArray()):
        result.append(w / c if c != 0 else 0.0)
    return Vectors.dense(result)

user_profile_df = count_vector_df.join(weighted_sum_df, "userID") \
    .withColumn("normalized_vector",
               F.udf(divide_vectors, VectorUDT())(F.col("weighted_sum"), F.col("count_vector"))) \
    .withColumn("normalized_array", vector_to_array("normalized_vector"))


user_profile_row = user_profile_df.select(
    F.lit(-1).alias("movieID"),  # Special ID for user profile
    F.lit("USER_PROFILE").alias("movieTitle"),
    F.lit("").alias("genres"),
    vector_to_array("normalized_vector").alias("genre_vector"),
    F.lit(specific_user_id).alias("userID")
)


movies_for_export = movies.join(
    rated_movies,
    on="movieID",
    how="left_anti"
).select(
    "movieID",
    "movieTitle",
    "genres",
    vector_to_array("genreVector").alias("genre_vector"),
    F.lit(None).cast(IntegerType()).alias("userID")
)

# ==== SEPARATE EXPORTS ====

# Export user profile only
user_profile_export = user_profile_row.withColumn(
    "genre_vector", F.concat_ws(",", "genre_vector")
).select("movieID", "movieTitle", "genres", "genre_vector", "userID")

user_profile_export.coalesce(1).write.csv(
    "user_profile_export.csv",
    header=True, mode="overwrite"
)

# Export movies only (unrated by the user)
movies_export = movies_for_export.withColumn(
    "genre_vector", F.concat_ws(",", "genre_vector")
).select("movieID", "movieTitle", "genres", "genre_vector", "userID")

movies_export.coalesce(1).write.csv(
    "movies_export.csv",
    header=True, mode="overwrite"
)


print("- user_profile_export.csv")
print("- movies_export.csv")


spark.stop()


- user_profile_export.csv
- movies_export.csv


**LSH with BucketedRandomProjectionLSH**

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import Normalizer, BucketedRandomProjectionLSH
from pyspark.sql.functions import col, split, udf
from pyspark.sql.types import DoubleType
import numpy as np


spark = SparkSession.builder.appName("LSH_Cosine_Recommendation").getOrCreate()

movies_df = spark.read.csv("movies_export.csv", header=True, inferSchema=True)
user_df = spark.read.csv("user_profile_export.csv", header=True, inferSchema=True)


def string_to_vector(genre_vector_str):

    return Vectors.dense([float(i) for i in genre_vector_str.split(',')])

string_to_vector_udf = udf(string_to_vector, VectorUDT())

movies_df = movies_df.withColumn("genre_vector", string_to_vector_udf(col("genre_vector")))
user_df = user_df.withColumn("genre_vector", string_to_vector_udf(col("genre_vector")))

# Step 1: Normalize vectors (unit norm = cosine ≈ Euclidean)
normalizer = Normalizer(inputCol="genre_vector", outputCol="norm_vector", p=2.0)
normalized_movies = normalizer.transform(movies_df)
normalized_user = normalizer.transform(user_df)

# Step 2: Apply LSH using normalized vectors
lsh = BucketedRandomProjectionLSH(
    inputCol="norm_vector",
    outputCol="hashes",
    bucketLength=0.01, #x,v/bucketLength
    numHashTables=80  # hash function
)
lsh_model = lsh.fit(normalized_movies)

# Step 3: Find similar movies to the user profile
similar_movies = lsh_model.approxSimilarityJoin(
    normalized_user,
    normalized_movies,
    threshold=1,
    distCol="distance"
)

# Show results ordered by distance (lower = more similar)
results = similar_movies.select(
    col("datasetB.movieId"),
    col("datasetB.movieTitle"),
        col("datasetB.genres"),
          col("datasetB.genre_vector"),
    col("distance")
).orderBy("distance")

results.show(truncate=False)

# Compute exact cosine similarity for top-N results to compare results with euclidean
def cosine_sim(a, b):
    a, b = np.array(a), np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

cosine_udf = udf(cosine_sim, DoubleType())


user_vector = normalized_user.select("norm_vector").first()[0]

joined_with_user = results.withColumn(
    "cosine_similarity",
    cosine_udf(col("genre_vector"), udf(lambda: user_vector, VectorUDT())())
)

joined_with_user.show(truncate=False)


+-------+---------------------------------------+---------------------+-----------------------------------------------------------------------------+------------------+
|movieId|movieTitle                             |genres               |genre_vector                                                                 |distance          |
+-------+---------------------------------------+---------------------+-----------------------------------------------------------------------------+------------------+
|109673 |300: Rise of an Empire (2014)          |Action|Drama|War|IMAX|[1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0]|0.8156461264120838|
|116529 |Stalingrad (2013)                      |Action|Drama|War|IMAX|[1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0]|0.8156461264120838|
|4460   |Encounter in the Third Dimension (1999)|IMAX                 |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|0.8441