# Data Split Even

# approach 1

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split

def load_and_preprocess_data_balanced():
    # Load data
    movies = pd.read_csv('data/movies.csv')
    ratings = pd.read_csv('data/ratings.csv')

    # Merge datasets on movieId
    merged_data = pd.merge(ratings, movies, on='movieId')

    train_data = pd.DataFrame()
    test_data = pd.DataFrame()

    for user_id in merged_data['userId'].unique():
        user_data = merged_data[merged_data['userId'] == user_id]
        # Adjust split ratio if number of ratings is odd
        # Favor training set by allocating the extra review to it
        split_ratio = 0.5 if len(user_data) % 2 == 0 else (len(user_data) // 2 + 1) / len(user_data)
        user_train, user_test = train_test_split(user_data, test_size=1-split_ratio, shuffle=True)
        train_data = pd.concat([train_data, user_train])
        test_data = pd.concat([test_data, user_test])

    return train_data, test_data


In [2]:
train_data_balanced, test_data_balanced = load_and_preprocess_data_balanced()

In [3]:
train_data_balanced

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
4992,1,661,5.0,964982838,James and the Giant Peach (1996),Adventure|Animation|Children|Fantasy|Musical
3570,1,527,5.0,964984002,Schindler's List (1993),Drama|War
1146,1,163,5.0,964983650,Desperado (1995),Action|Romance|Western
10740,1,2018,5.0,964980523,Bambi (1942),Animation|Children|Drama
5764,1,1024,5.0,964982876,"Three Caballeros, The (1945)",Animation|Children|Musical
...,...,...,...,...,...,...
91071,578,78316,4.0,1300991370,Letters to Juliet (2010),Drama|Romance
54008,578,7323,4.0,1300989914,"Good bye, Lenin! (2003)",Comedy|Drama
98345,578,52668,4.0,1300991167,In the Land of Women (2007),Comedy|Drama|Romance
85986,578,3155,4.5,1300989860,Anna and the King (1999),Drama|Romance


In [14]:
train_data_balanced.describe()

Unnamed: 0,userId,movieId,rating,timestamp
count,50539.0,50539.0,50539.0,50539.0
mean,326.100754,19457.665862,3.50089,1206067000.0
std,182.606379,35509.657219,1.047026,216376100.0
min,1.0,1.0,0.5,828124600.0
25%,177.0,1200.0,3.0,1019124000.0
50%,325.0,2997.0,3.5,1186088000.0
75%,477.0,8360.0,4.0,1435994000.0
max,610.0,193587.0,5.0,1537799000.0


24/02/04 05:45:38 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 911585 ms exceeds timeout 120000 ms
24/02/04 05:45:38 WARN SparkContext: Killing executors is not supported by current scheduler.
24/02/04 05:45:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [4]:
test_data_balanced

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
12971,1,2616,4.0,964983080,Dick Tracy (1990),Action|Crime
5742,1,1009,3.0,964981775,Escape to Witch Mountain (1975),Adventure|Children|Fantasy
5094,1,733,4.0,964982400,"Rock, The (1996)",Action|Adventure|Thriller
3957,1,590,4.0,964982546,Dances with Wolves (1990),Adventure|Drama|Western
776,1,70,3.0,964982400,From Dusk Till Dawn (1996),Action|Comedy|Horror|Thriller
...,...,...,...,...,...,...
56310,578,76251,4.5,1300996806,Kick-Ass (2010),Action|Comedy
84070,578,45720,4.5,1300996651,"Devil Wears Prada, The (2006)",Comedy|Drama
98614,578,79879,0.5,1300991503,Piranha (Piranha 3D) (2010),Action|Horror|Thriller
74845,578,613,3.0,1300989672,Jane Eyre (1996),Drama|Romance


In [5]:
# Verify the split for a few users

for user_id in train_data_balanced['userId'].unique()[:5]:  # Check for the first 5 users
    train_reviews = train_data_balanced[train_data_balanced['userId'] == user_id]
    test_reviews = test_data_balanced[test_data_balanced['userId'] == user_id]
    diff = abs(len(train_reviews) - len(test_reviews))
    assert diff <= 1, f"User {user_id} does not have an equal or nearly equal split"
    print(f"User {user_id}: Train reviews = {len(train_reviews)}, Test reviews = {len(test_reviews)}")




User 1: Train reviews = 116, Test reviews = 116
User 5: Train reviews = 22, Test reviews = 22
User 7: Train reviews = 76, Test reviews = 76
User 15: Train reviews = 68, Test reviews = 67
User 17: Train reviews = 53, Test reviews = 52


In [6]:
!pip install pyspark




In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()

# Convert the training DataFrame to a Spark DataFrame
train_data_spark = spark.createDataFrame(train_data_balanced)

# ALS model parameters
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

# Train the ALS model
model = als.fit(train_data_spark)

# Convert the test DataFrame to a Spark DataFrame for evaluation
test_data_spark = spark.createDataFrame(test_data_balanced)

# Generate predictions
predictions = model.transform(test_data_spark)

# Show some predictions
predictions.select("userId", "movieId", "prediction").show(5)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/04 03:02:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/04 03:02:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    27|   1580| 3.5466533|
|    91|   3175| 3.2984014|
|    93|   1591|  4.309012|
|    93|   1580|   4.52996|
|   132|  44022| 2.3105717|
+------+-------+----------+
only showing top 5 rows



In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, lit

# Assuming the SparkSession is already initialized and the ALS model is trained
# movies = pd.read_csv('data/movies.csv')


def recommend_movies_for_user(user_id, movies, num_recommendations=5):
    """
    Generate movie recommendations for a specific user.
    
    Parameters:
    - user_id: The ID of the user for whom recommendations are to be generated.
    - movies: DataFrame containing the movie details.
    - num_recommendations: The number of recommendations to generate.
    """
    # Ensure the movies DataFrame is available here
    
    # Create a DataFrame of all movies that the user has not rated yet
    all_movies = train_data_spark.select("movieId").distinct()
    rated_movies = train_data_spark.filter(train_data_spark.userId == user_id).select("movieId", "userId")
    movies_to_rate = all_movies.subtract(rated_movies.select("movieId"))
    movies_to_rate = movies_to_rate.withColumn("userId", lit(user_id))

    # Make predictions for the user
    recommendations = model.transform(movies_to_rate).orderBy("prediction", ascending=False).limit(num_recommendations)
    
    # Join with the movies DataFrame to get the movie titles
    recommendations = recommendations.join(movies, recommendations.movieId == movies.movieId).select(recommendations.userId, movies.title, recommendations.prediction)
    
    recommendations.show()

# Load your movies DataFrame before calling the function
movies_df = spark.read.csv('data/movies.csv', header=True)  # Adjust path as necessary

# Example: Recommend movies for user ID 1, passing the movies DataFrame as an argument
recommend_movies_for_user(1, movies_df)


                                                                                

+------+--------------------+----------+
|userId|               title|prediction|
+------+--------------------+----------+
|     1|Enchanted April (...|  7.840041|
|     1| My Left Foot (1989)| 7.9870996|
|     1|Dead Alive (Brain...|  7.950803|
|     1|Virgin Suicides, ...| 7.6996264|
|     1|      Minions (2015)|  7.754133|
+------+--------------------+----------+



In [13]:
train_data_spark['userId'] == 1

Column<'(userId = 1)'>