In [25]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import sys

def computeCosineSimilarity(spark, data):
    # Compute xx, xy and yy columns
    pairScores = data \
      .withColumn("xx", func.col("rating1") * func.col("rating1")) \
      .withColumn("yy", func.col("rating2") * func.col("rating2")) \
      .withColumn("xy", func.col("rating1") * func.col("rating2"))

    # Compute numerator, denominator and numPairs columns
    calculateSimilarity = pairScores \
      .groupBy("movie1", "movie2") \
      .agg( \
        func.sum(func.col("xy")).alias("numerator"), \
        (func.sqrt(func.sum(func.col("xx"))) * func.sqrt(func.sum(func.col("yy")))).alias("denominator"), \
        func.count(func.col("xy")).alias("numPairs")
      )

    # Calculate score and select only needed columns (movie1, movie2, score, numPairs)
    result = calculateSimilarity \
      .withColumn("score", \
        func.when(func.col("denominator") != 0, func.col("numerator") / func.col("denominator")) \
          .otherwise(0).cast("float") \
      ).select("movie1", "movie2", "score", "numPairs")

    return result



spark = SparkSession.builder.appName("MovieSimilarities").master("local[*]").getOrCreate()

movieNamesSchema = StructType([ \
                               StructField("movieID", IntegerType(), True), \
                               StructField("movieTitle", StringType(), True) \
                               ])

moviesSchema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])


# Create a broadcast dataset of movieID and movieTitle.
# Apply ISO-885901 charset
movieNames = spark.read \
      .option("charset", "ISO-8859-1") \
     .schema(movieNamesSchema) \
     .csv("movies.csv")
#movieNames = spark.read \
  #  .option("sep", "\t").option("header", "false").option("charset", "UTF-8") .schema(moviesSchema).csv("movies.dat")




# Load up movie data as dataset
movies = spark.read \
  .option("charset", "UTF-8") \
     .schema(moviesSchema) \
     .csv("ratings.csv")
#movies = spark.read \
   # .option("sep", "\t").option("header", "false").option("charset", "UTF-8") .schema(moviesSchema).csv("ratings.dat")






# Get movie name by given movie id
def getMovieName(movieNames, movieId):
    result = movieNames.filter(func.col("movieID") == movieId) \
        .select("movieTitle").collect()[0]

    return result[0]

ratings = movies.select("userId", "movieId", "rating")
#print(ratings.show(20))

# Emit every movie rated together by the same user.
# Self-join to find every combination.
# Select movie pairs and rating pairs
moviePairs = ratings.alias("ratings1") \
      .join(ratings.alias("ratings2"), (func.col("ratings1.userId") == func.col("ratings2.userId")) \
            & (func.col("ratings1.movieId") < func.col("ratings2.movieId"))) \
      .select(func.col("ratings1.movieId").alias("movie1"), \
        func.col("ratings2.movieId").alias("movie2"), \
        func.col("ratings1.rating").alias("rating1"), \
        func.col("ratings2.rating").alias("rating2"))

#print(moviePairs.show(20))
moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()
#print(moviePairSimilarities.show(20))


scoreThreshold = 0.7
coOccurrenceThreshold = 20

movieID = 333








    # Filter for movies with this sim that are "good" as defined by
    # our quality thresholds above
filteredResults = moviePairSimilarities.filter(
    ((func.col("movie1") == movieID) | (func.col("movie2") == movieID)) &
    (func.col("score") > scoreThreshold) &
    (func.col("numPairs") > coOccurrenceThreshold)
).select("movie1", "movie2", "score", "numPairs") \
 .orderBy(func.col("score").desc())  # Sort by similarity



#print(filteredResults.show(20))
    # Sort by quality score.
results = filteredResults.sort(func.col("score").desc()).take(15)





print ("Top 10 similar movies for " + getMovieName(movieNames, movieID))

for result in results:
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = result.movie1
        if (similarMovieID == movieID):
          similarMovieID = result.movie2

        print(getMovieName(movieNames, similarMovieID) + "\tscore: " \
              + str(result.score) + "\tstrength: " + str(result.numPairs))

from pyspark.sql import Row

# Initialize an empty list to store results
similar_movies_list = []

for result in results:
    # Determine the similar movie ID
    similarMovieID = result.movie1 if result.movie1 != movieID else result.movie2

    # Get the movie name
    movie_name = getMovieName(movieNames, similarMovieID)

    # Append the result to the list
    similar_movies_list.append(
        Row(
            movieID=similarMovieID,
            movieTitle=movie_name,
            score=result.score,
            strength=result.numPairs
        )
    )

# Convert the list to a DataFrame
similar_movies_df = spark.createDataFrame(similar_movies_list)

# Show the DataFrame
similar_movies_df.show(truncate=False)



# Filter ratings for userID = 20 and join with movieNames to get titles
user_20_ratings = ratings.filter(func.col("userID") == 7) \
    .join(movieNames, "movieID", "inner") \
    .select("movieID", "movieTitle", "rating") \
    .orderBy(func.desc("rating"))  # Sort by rating (highest first)

# Show the result
print("Movies rated by user 20:")
user_20_ratings.show(truncate=False)






# Assuming:
# - `similar_movies_df` = DataFrame with columns [movieID, movieTitle, score, strength]
# - `user_20_ratings` = DataFrame with columns [movieID, movieTitle, rating]

# Rename columns to avoid conflicts
similar_movies_renamed = similar_movies_df.select(
    "movieID",
    func.col("movieTitle").alias("similar_movieTitle"),
    "score",
    "strength"
)

user_20_ratings_renamed = user_20_ratings.select(
    "movieID",
    func.col("movieTitle").alias("user_20_movieTitle"),
    "rating"
)

# Perform an INNER JOIN to find the intersection
intersection_df = similar_movies_renamed.join(
    user_20_ratings_renamed,
    "movieID",
    "inner"
).select(
    "movieID",
    "similar_movieTitle",  # Title from the similar movies list
    "score",
    "rating"  # Rating given by user 20
).orderBy(
    func.desc("score")  # Sort by similarity score (highest first)
)

# Show the result
print("Movies that are both similar (high score) AND rated by user 7:")
intersection_df.show(truncate=False)


from pyspark.sql import functions as func

def predicted_rating(similar_movies_rated_df):
    # Calculate numerator (sum of score * rating) and denominator (sum of scores)
    weighted_sum = similar_movies_rated_df.withColumn(
        "score_times_rating",
        func.col("score") * func.col("rating")
    ).agg(
        func.sum("score_times_rating").alias("numerator"),
        func.sum("score").alias("denominator")
    ).collect()[0]

    # Avoid division by zero
    if weighted_sum["denominator"] == 0:
        return 0.0
    else:
        return weighted_sum["numerator"] / weighted_sum["denominator"]

# Example usage:
prediction = predicted_rating(intersection_df)
print(f"Predicted rating for user 7 and movie 333: {prediction:.2f}")

Top 10 similar movies for Tommy Boy (1995)
White Sands (1992)	score: 0.9712682366371155	strength: 23
Everest (1998)	score: 0.9689230918884277	strength: 38
Fantastic Voyage (1966)	score: 0.9666422009468079	strength: 41
Armour of God II: Operation Condor (Operation Condor) (Fei ying gai wak) (1991)	score: 0.9646155834197998	strength: 43
When We Were Kings (1996)	score: 0.963403046131134	strength: 38
Never Cry Wolf (1983)	score: 0.9629253149032593	strength: 30
Omega Man, The (1971)	score: 0.9625793695449829	strength: 36
Blue Streak (1999)	score: 0.9623979926109314	strength: 70
Kissing a Fool (1998)	score: 0.9620988965034485	strength: 23
Prefontaine (1997)	score: 0.9611513018608093	strength: 31
Happy Gilmore (1996)	score: 0.9607335925102234	strength: 254
Time to Kill, A (1996)	score: 0.9602470993995667	strength: 106
Preacher's Wife, The (1996)	score: 0.9599087834358215	strength: 37
Nothing to Lose (1994)	score: 0.9598357081413269	strength: 21
Swiss Family Robinson (1960)	score: 0.959405124

In [None]:
results = filteredResults.sort(func.col("score").desc()).take(10)
print(results)

[Row(movie1=555, movie2=1008, score=0.9881222248077393, numPairs=15), Row(movie1=555, movie2=2593, score=0.985583484172821, numPairs=11), Row(movie1=295, movie2=555, score=0.985184371471405, numPairs=14), Row(movie1=555, movie2=2743, score=0.9850044846534729, numPairs=11), Row(movie1=555, movie2=2434, score=0.9836628437042236, numPairs=13), Row(movie1=388, movie2=555, score=0.9833303689956665, numPairs=38), Row(movie1=555, movie2=3406, score=0.9818342924118042, numPairs=18), Row(movie1=555, movie2=1365, score=0.9800745844841003, numPairs=21), Row(movie1=555, movie2=2610, score=0.9797157645225525, numPairs=13), Row(movie1=183, movie2=555, score=0.9795822501182556, numPairs=17)]


In [None]:
pip install requests



In [None]:



import requests
from IPython.display import display, Image, HTML

# API Keys (replace with your own)
TMDB_API_KEY = "d348679826a515c083a06353ba605405"
OMDB_API_KEY = "a71ce210-fa7d-4f01-b138-d4e2effa3693"

def get_movie_poster(title, year=None):
    """Try TMDb first, then fall back to OMDb if no poster is found."""
    clean_title = title.split(" (")[0]  # Remove year if present

    # --- Attempt 1: TMDb ---
    tmdb_url = f"https://api.themoviedb.org/3/search/movie?api_key={TMDB_API_KEY}&query={clean_title}"
    if year:
        tmdb_url += f"&year={year}"

    tmdb_response = requests.get(tmdb_url).json()
    if tmdb_response.get("results"):
        poster_path = tmdb_response["results"][0].get("poster_path")
        if poster_path:
            return f"https://image.tmdb.org/t/p/w500{poster_path}"

    # --- Attempt 2: OMDb (fallback) ---
    omdb_url = f"http://www.omdbapi.com/?apikey={OMDB_API_KEY}&t={clean_title}"
    if year:
        omdb_url += f"&y={year}"

    omdb_response = requests.get(omdb_url).json()
    if omdb_response.get("Poster") and omdb_response["Poster"] != "N/A":
        return omdb_response["Poster"]

    return None  # No poster found



movie_name = getMovieName(movieNames, 555)


# Fetch the poster for the movie with ID 555
poster_url = get_movie_poster(movie_name)

# Generate HTML for displaying the movie with ID 555
html_output = f"""
<div style="text-align: center; margin-bottom: 20px;">
    <h3>Movie: {movie_name}</h3>
    <img src="{poster_url if poster_url else 'https://via.placeholder.com/150x225?text=No+Poster'}"
         width="150" style="border-radius: 5px; border: 1px solid #ddd;">
</div>
"""







# Generate HTML for horizontal display
html_output += """
<div style="display: flex; overflow-x: auto; gap: 15px; padding: 10px;">
"""

for result in results:
    similarMovieID = result.movie1 if result.movie2 == movieID else result.movie2
    similarMovieName = getMovieName(movieNames, similarMovieID)

    # Extract year (e.g., "Balto (1995)" → 1995)
    year = None
    if "(" in similarMovieName:
        year_str = similarMovieName.split("(")[-1].split(")")[0]
        if year_str.isdigit():
            year = year_str

    poster_url = get_movie_poster(similarMovieName, year)

    html_output += f"""
    <div style="flex: 0 0 auto; text-align: center; width: 150px;">
        <img src="{poster_url if poster_url else 'https://upload.wikimedia.org/wikipedia/commons/a/ac/No_image_available.svg'}"
             width="150" style="border-radius: 5px; border: 1px solid #ddd;">
        <p style="margin: 5px 0; font-weight: bold; font-size: 12px;">{similarMovieName}</p>
        <p style="margin: 0; font-size: 11px;">Score: {result.score:.2f}</p>
    </div>
    """

html_output += "</div>"
display(HTML(html_output))