In [None]:
#using als algorithm on user datset
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Step 1: Initialize SparkSession
spark = SparkSession.builder.appName("ALS Recommendation System").getOrCreate()

# Step 2: Load the dataset
data = spark.read.csv("/content/rating.csv", header=True, inferSchema=True)

# Preview the data
data.show()

# Step 3: Preprocess data
# Ensure columns are in the correct format
data = data.select(
    col("userId").cast("integer"),
    col("movieId").cast("integer"),
    col("rating").cast("float"),
    col("timestamp")
)

# Step 4: Split the dataset
(training, test) = data.randomSplit([0.8, 0.2])

# Step 5: Configure ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    nonnegative=True,  # Ensures no negative ratings
    implicitPrefs=False,  # Set to False for explicit feedback
    coldStartStrategy="drop"  # Avoid NaN predictions
)

# Train the ALS model
model = als.fit(training)

# Step 6: Evaluate the model
predictions = model.transform(test)

# Use RMSE to evaluate predictions
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE): {rmse}")

# Step 7: Generate recommendations
# Top movie recommendations for each user
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.show()

# Top user recommendations for each movie
movie_recommendations = model.recommendForAllItems(10)
movie_recommendations.show()

# Stop SparkSession

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
|     1|    112|   3.5|2004-09-10 03:09:00|
|     1|    151|   4.0|2004-09-10 03:08:54|
|     1|    223|   4.0|2005-04-02 23:46:13|
|     1|    253|   4.0|2005-04-02 23:35:40|
|     1|    260|   4.0|2005-04-02 23:33:46|
|     1|    293|   4.0|2005-04-02 23:31:43|
|     1|    296|   4.0|2005-04-02 23:32:47|
|     1|    318|   4.0|2005-04-02 23:33:18|
|     1|    337|   3.5|2004-09-10 03:08:29|
|     1|    367|   3.5|2005-04-02 23:53:00|
|     1|    541|   4.0|2005-04-02 23:30:03|
|     1|    589|   3.5|2005-04-02 23:45:57|
|     1|    593|   3.5|2005-04-02 23:31:01|
|     1|    653|   3.0|2004-09-10 03:08:11|
|     1|    919|   3.5|2004-09-1

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType, FloatType
import numpy as np
from pyspark.ml.feature import Normalizer

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("MovieRecommender") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

def load_and_prepare_data():
    # Load movies data
    movies_df = spark.read.csv("/content/movie.csv", header=True, inferSchema=True)
    # Load ratings data for potential hybrid system extension
    ratings_df = spark.read.csv("/content/rating.csv", header=True, inferSchema=True)

    # Clean movieId column - ensure it's numeric
    movies_df = movies_df.withColumn("movieId", col("movieId").cast("integer"))

    return movies_df, ratings_df

def preprocess_text_features(movies_df):
    # Tokenization
    tokenizer = RegexTokenizer(inputCol="title", outputCol="title_tokens",
                             pattern="\\W")

    # Remove stop words
    remover = StopWordsRemover(inputCol="title_tokens", outputCol="filtered_tokens")

    # TF (Term Frequency)
    countVectorizer = CountVectorizer(inputCol="filtered_tokens",
                                    outputCol="title_tf",
                                    minDF=2.0)

    # IDF (Inverse Document Frequency)
    idf = IDF(inputCol="title_tf", outputCol="title_tfidf")

    # Process genres
    # Split genres string into array
    split_genres = udf(lambda x: x.split("|") if x else [],
                      ArrayType(StringType()))
    movies_df = movies_df.withColumn("genres_array",
                                    split_genres(col("genres")))

    # Create genre vectorizer
    genreVectorizer = CountVectorizer(inputCol="genres_array",
                                    outputCol="genre_features",
                                    minDF=1.0)

    # Create pipeline
    pipeline = Pipeline(stages=[
        tokenizer,
        remover,
        countVectorizer,
        idf,
        genreVectorizer
    ])

    # Fit and transform the data
    model = pipeline.fit(movies_df)
    processed_df = model.transform(movies_df)

    return processed_df, model

def create_feature_vector(processed_df):
    # Combine title and genre features
    assembler = VectorAssembler(
        inputCols=["title_tfidf", "genre_features"],
        outputCol="features"
    )

    # Normalize the combined features
    normalizer = Normalizer(inputCol="features", outputCol="normalized_features")

    # Create and apply the pipeline
    feature_pipeline = Pipeline(stages=[assembler, normalizer])
    final_df = feature_pipeline.fit(processed_df).transform(processed_df)

    return final_df

def get_recommendations(movie_id, final_df, n=5):
    # Get the feature vector for the input movie
    input_movie = final_df.filter(col("movieId") == movie_id).first()

    if not input_movie:
        raise ValueError(f"Movie ID {movie_id} not found in the dataset")

    input_vector = input_movie.normalized_features

    # Define cosine similarity function
    def cosine_similarity(vec1, vec2):
        return float(vec1.dot(vec2))

    similarity_udf = udf(lambda x: cosine_similarity(input_vector, x),
                        FloatType())

    # Compute similarities and get top N recommendations
    recommendations = final_df.filter(col("movieId") != movie_id) \
        .withColumn("similarity", similarity_udf(col("normalized_features"))) \
        .orderBy(col("similarity").desc()) \
        .limit(n)

    return recommendations.select("movieId", "title", "genres", "similarity")

def main():
    try:
        # Load and prepare data
        print("Loading data...")
        movies_df, ratings_df = load_and_prepare_data()

        # Preprocess features
        print("Preprocessing features...")
        processed_df, pipeline_model = preprocess_text_features(movies_df)

        # Create final feature vectors
        print("Creating feature vectors...")
        final_df = create_feature_vector(processed_df)

        # Cache the final dataframe for better performance
        final_df.cache()

        # Example usage
        movie_id = 1  # Example movie ID
        print(f"\nGetting recommendations for movie ID {movie_id}...")

        # Get the title of the input movie
        input_movie_title = final_df.filter(col("movieId") == movie_id).select("title").first()
        if input_movie_title:
            print(f"Input movie: {input_movie_title.title}")

        recommendations = get_recommendations(movie_id, final_df)

        # Show recommendations
        print("\nTop 5 recommendations:")
        recommendations.show(truncate=False)

    except Exception as e:
        print(f"An error occurred: {str(e)}")
    finally:
        # Clean up
        spark.stop()

if __name__ == "__main__":
    main()

Loading data...
Preprocessing features...
Creating feature vectors...

Getting recommendations for movie ID 1...
Input movie: Toy Story (1995)

Top 5 recommendations:
+-------+--------------------------+------------------------------------------------+----------+
|movieId|title                     |genres                                          |similarity|
+-------+--------------------------+------------------------------------------------+----------+
|3114   |Toy Story 2 (1999)        |Adventure|Animation|Children|Comedy|Fantasy     |0.76821506|
|78499  |Toy Story 3 (2010)        |Adventure|Animation|Children|Comedy|Fantasy|IMAX|0.7532216 |
|106022 |Toy Story of Terror (2013)|Animation|Children|Comedy                       |0.7050196 |
|4929   |Toy, The (1982)           |Comedy                                          |0.6424517 |
|2274   |Lilian's Story (1995)     |Drama                                           |0.6130839 |
+-------+--------------------------+---------------------

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Normalizer
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf, expr, broadcast, explode
from pyspark.sql.types import ArrayType, StringType, FloatType, IntegerType
import numpy as np

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("HybridMovieRecommender") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

def load_data():
    # Load movies and ratings data
    movies_df = spark.read.csv("", header=True, inferSchema=True)
    ratings_df = spark.read.csv("/content/rating.csv", header=True, inferSchema=True)

    # Ensure proper data types
    movies_df = movies_df.withColumn("movieId", col("movieId").cast(IntegerType()))
    ratings_df = ratings_df.withColumn("movieId", col("movieId").cast(IntegerType())) \
                          .withColumn("userId", col("userId").cast(IntegerType())) \
                          .withColumn("rating", col("rating").cast(FloatType()))

    return movies_df, ratings_df

def create_content_features(movies_df):
    # Process movie titles
    tokenizer = RegexTokenizer(inputCol="title", outputCol="title_tokens", pattern="\\W")
    remover = StopWordsRemover(inputCol="title_tokens", outputCol="filtered_tokens")
    count_vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="title_tf", minDF=2.0)
    idf = IDF(inputCol="title_tf", outputCol="title_tfidf")

    # Process genres
    split_genres = udf(lambda x: x.split("|") if x else [], ArrayType(StringType()))
    movies_df = movies_df.withColumn("genres_array", split_genres(col("genres")))
    genre_vectorizer = CountVectorizer(inputCol="genres_array", outputCol="genre_features", minDF=1.0)

    # Create and apply pipeline
    pipeline = Pipeline(stages=[
        tokenizer, remover, count_vectorizer, idf, genre_vectorizer
    ])

    content_model = pipeline.fit(movies_df)
    content_features_df = content_model.transform(movies_df)

    # Combine features
    assembler = VectorAssembler(
        inputCols=["title_tfidf", "genre_features"],
        outputCol="combined_features"
    )
    normalizer = Normalizer(inputCol="combined_features", outputCol="normalized_features")

    feature_pipeline = Pipeline(stages=[assembler, normalizer])
    final_features_df = feature_pipeline.fit(content_features_df).transform(content_features_df)

    return final_features_df

def train_collaborative_model(ratings_df):
    # Train ALS model
    als = ALS(maxIter=5,
              regParam=0.01,
              userCol="userId",
              itemCol="movieId",
              ratingCol="rating",
              coldStartStrategy="drop",
              nonnegative=True)

    model = als.fit(ratings_df)
    return model

def get_content_similarity(movie_id, content_features_df, n=100):
    # Get content-based similarities
    input_movie = content_features_df.filter(col("movieId") == movie_id).first()
    input_vector = input_movie.normalized_features

    def cosine_similarity(vec1, vec2):
        return float(vec1.dot(vec2))

    similarity_udf = udf(lambda x: cosine_similarity(input_vector, x), FloatType())

    content_recommendations = content_features_df.filter(col("movieId") != movie_id) \
        .withColumn("content_score", similarity_udf(col("normalized_features"))) \
        .select("movieId", "content_score")

    return content_recommendations

def get_hybrid_recommendations(user_id, movie_id, content_features_df,
                             collaborative_model, n=10, content_weight=0.3):

    content_scores = get_content_similarity(movie_id, content_features_df)


    users_df = spark.createDataFrame([(user_id,)], ["userId"])

    # Generate recommendations using recommendForUserSubset
    cf_recommendations = collaborative_model.recommendForUserSubset(users_df, 100)

    # Explode the recommendations array and select required columns
    cf_scores = cf_recommendations.select(
        explode("recommendations").alias("rec")
    ).select(
        col("rec.movieId").alias("movieId"),
        col("rec.rating").alias("cf_score")
    )

    # Join with content scores
    hybrid_scores = cf_scores.join(broadcast(content_scores), "movieId", "inner")

    # Calculate hybrid score and join with movie details
    hybrid_recommendations = hybrid_scores \
        .withColumn("hybrid_score",
                   (1 - content_weight) * col("cf_score") + content_weight * col("content_score")) \
        .join(content_features_df.select("movieId", "title", "genres"), "movieId") \
        .orderBy(col("hybrid_score").desc()) \
        .select("movieId", "title", "genres", "hybrid_score", "cf_score", "content_score") \
        .limit(n)

    return hybrid_recommendations

def main():
    try:
        # Load data
        print("Loading data...")
        movies_df, ratings_df = load_data()

        # Create content features
        print("Creating content features...")
        content_features_df = create_content_features(movies_df)

        # Train collaborative filtering model
        print("Training collaborative filtering model...")
        collaborative_model = train_collaborative_model(ratings_df)

        # Example usage
        user_id = 1
        movie_id = 1

        print(f"\nGetting hybrid recommendations for user {user_id} based on movie {movie_id}...")
        recommendations = get_hybrid_recommendations(
            user_id, movie_id, content_features_df, collaborative_model
        )

        # Show recommendations
        print("\nTop 10 Hybrid Recommendations:")
        recommendations.show(truncate=False)

    except Exception as e:
        print(f"An error occurred: {str(e)}")
        import traceback
        print(traceback.format_exc())
    finally:
        spark.stop()

if __name__ == "__main__":
    main()