In [None]:
#Set up environment, only need to run once
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark requests beautifulsoup4 lxml

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import requests
from bs4 import BeautifulSoup

#function that parses string rating from letterboxd
def parse_stars(star_string):
  full_stars = star_string.count("★")
  half_star = 0.5 if "½" in star_string else 0.0
  return full_stars + half_star

#function that asks user to input links to all profiles
def links():
  user = input("Enter the link to your Letterboxd profile:")
  print("Enter the links of Letterboxd pages for 3 friends")
  friend1 = input("Enter the link to friend 1's Letterboxd profile:")
  friend2 = input("Enter the link to friend 2's Letterboxd profile:")
  friend3 = input("Enter the link to friend 3's Letterboxd profile:")

  #initialize nested dictionary for all profiles
  #key for all_ratings is the username: you, friend1, friend2, friend3
  #value is ratings
  all_ratings = {
      "you": scrape(user + "films/", "you"),
      "friend1": scrape(friend1 + "films/", "friend1"),
      "friend2": scrape(friend2 + "films/", "friend2"),
      "friend3": scrape(friend3 + "films/", "friend3")
  }

  return all_ratings

#scrapes the website received from each profile to find movies watched and their rating
def scrape(link, username):
  headers = {"User-Agent": "Mozilla/5.0"}
  response = requests.get(link, headers=headers)
  if response.status_code != 200:
      print(f"Failed to fetch {link}")
      return []

  soup = BeautifulSoup(response.text, "lxml")
  #initialize dictionaries holding all rated films on profile
  films = []

  #find each movie and its rating within webpage
  for grid_item in soup.select("#content li.griditem"):
    poster_div = grid_item.find("div", class_="react-component")
    if not poster_div:
      continue
    title = poster_div.get("data-item-name")
    if not title:
      continue

    rating_span = grid_item.find("span", class_="rating")
    if rating_span:
      star_text = rating_span.get_text(strip=True)
      rating = parse_stars(star_text)
    else:
      rating = 0.0

    films.append((title, rating))

  print(films)
  return films

#function that constructs dataframe based on the dictionary all_ratings of the movies watched
def build_dataset(all_ratings):
  #create SparkSession object
  spark = SparkSession.builder.appName("MovieRec").getOrCreate()
  #create dictionary where key is username and value is the integer id
  user_id = {u: i for i, u in enumerate(all_ratings.keys())}

  #makes movies a set so movies are not repeated
  movies = set()
  #extracts all movie titles from key in all_ratings values
  for ratings in all_ratings.values():
    movies.update([title for title, _ in ratings])
  #create movie_id dictionary where key is movie title and value is integer id
  movie_id = {m: i for i, m in enumerate(movies)}

  #create dataframe rows which have the first column as the movie, and each user's rating following based on the column
  rows = []
  for user, ratings in all_ratings.items():
    for movie, rating in ratings:
      rows.append(Row(
          userId = user_id[user],
          movieId = movie_id[movie],
          rating = float(rating)
      ))

  #creates dataframe from the rows
  ratings_df = spark.createDataFrame(rows)
  #print and return the dataframe, list of all users and movie titles, and the SparkSession object
  ratings_df.show()
  return ratings_df, user_id, movie_id, spark

#function that trains ALS (Alternating Least Squares)
def train_als(ratings_df):
  #split data randomly into training and testing
  (training, test) = ratings_df.randomSplit([0.8, 0.2])

  #ALS parameters
  als = ALS(
      maxIter=10,
      regParam=0.1,
      userCol="userId",
      itemCol="movieId",
      ratingCol="rating",
      #if value is NaN then drop
      coldStartStrategy="drop"
  )

  #train and test in ALS model
  model = als.fit(training)
  predictions = model.transform(test)

  #evaluate ALS model and return to user to show level of accuracy
  evaluator = RegressionEvaluator(metricName="rmse",
                                    labelCol="rating",
                                    predictionCol="prediction")
  rmse = evaluator.evaluate(predictions)
  print(f"Model RMSE = {rmse:.3f}")
  return model

#function that recommends the movies primary user should watch
def recommend(model, spark, user_id, movie_id, all_ratings, username, top_n=10):
  #finds integer ID from username
  user = user_id[username]
  #create dataframe with only column as the user's id
  user_df = spark.createDataFrame([Row(userId=user)])
  #predict N top movies and ratings for user returned as a list of dictionaries
  user_recs = model.recommendForUserSubset(user_df, top_n*10).collect()[0].recommendations

  #if movie has already been seen by user, remove from recommendation list
  seen = {movie_id[movie] for movie, _ in all_ratings[username]}
  recs_filter = [rec for rec in user_recs if rec["movieId"] not in seen]
  #only select top n movie recommendations
  recs_filter = recs_filter[:top_n]

  #get movie titles back
  id_to_movie = {v: k for k, v in movie_id.items()}

  #print top n movies for user with their predicted rating
  print(f"\nTop {top_n} Recommended Movies:")
  for rec in recs_filter:
    movie_title = id_to_movie[rec["movieId"]]
    predicted_rating = max(0.0, min(5.0, rec['rating']))
    print(f"{movie_title}: predicted rating: {predicted_rating:.1f}")

#main function as driver that runs all functions
def main():
    all_ratings = links()
    ratings_df, user_id, movie_id, spark = build_dataset(all_ratings)
    model = train_als(ratings_df)
    recommend(model, spark, user_id, movie_id, all_ratings, "you")

if __name__ == "__main__":
    main()