****ML ALGORITHM IN PYSPARK****

**Recommender System using Pyspark**

Collaborative filtering is implemented by the machine learning library Spark MLlib using Alternating Least Squares.

Collaborative filtering involves making predictions (filtering) about a user’s interests by compiling preferences or taste data from numerous users (collaborating). The essential premise is that, if two users A and B share the same opinion on a subject, A is more likely to share B’s opinion on a related but unrelated subject, x, than the opinion of a randomly selected user.

In [None]:
!apt-get update
# Download Java Virtual Machine (JVM)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Get:2 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
0% [2 InRelease 15.6 kB/114 kB 14%] [Waiting for headers] [Waiting for headers]                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [2 InRelease 15.6 kB/114 kB 14%] [Waiting for headers] [3 InRelease 3,622 B/0% [2 InRelease 15.6 kB/114 kB 14%] [Waiting for headers] [Waiting for headers]                                                                               Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease [1,581 B]
0% [2 InRelease 21.4 kB/114 kB 19%] [Waiting for headers] [Waiting for headers]               

In [None]:
# Download Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# Unzip the file
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.2.1-bin-hadoop3.2'

In [None]:
!ls

book_ratings.csv  spark-3.2.1-bin-hadoop3.2
sample_data	  spark-3.2.1-bin-hadoop3.2.tgz


In [None]:
import findspark
findspark.init()

In [None]:
# Check the location for Spark
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

In [None]:
#importing the required pyspark library
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

#Setup Spark Session
spark = SparkSession.builder.appName('Recommender').getOrCreate()
spark


In [None]:
#CSV file can be downloaded from the link mentioned above.
data = spark.read.csv('book_ratings.csv',
					inferSchema=True,header=True)

data.show(5)


+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



In [None]:
# Get the number of rows
num_rows = data.count()

# Get the number of columns
num_cols = len(data.columns)

# Print the shape of the DataFrame
print("Number of rows:", num_rows)
print("Number of columns:", num_cols)

Number of rows: 981756
Number of columns: 3


In [None]:
data.describe().show()


+-------+------------------+-----------------+------------------+
|summary|           book_id|          user_id|            rating|
+-------+------------------+-----------------+------------------+
|  count|            254812|           254812|            254812|
|   mean|1274.9756408646374|24688.24237477042| 3.825373216332041|
| stddev|  736.058995940083|14983.10160239284|1.0099376939054863|
|    min|                 1|                1|                 1|
|    max|              2550|            53424|                 5|
+-------+------------------+-----------------+------------------+



In [None]:
# Dividing the data using random split into train_data and test_data
# in 80% and 20% respectively
train_data, test_data = data.randomSplit([0.8, 0.2])


In [None]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5,
		regParam=0.01,
		userCol="user_id",
		itemCol="book_id",
		ratingCol="rating")

#Fitting the model on the train_data
model = als.fit(train_data)


In [None]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test_data)

#Displaying predictions calculated by the model
predictions.show()


+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|      2|   9731|     4| 3.7728379|
|      1|  16913|     5| 4.1102757|
|      1|  32305|     5|  4.497166|
|      2|  11868|     5|   3.95726|
|      1|   6630|     5|  4.614361|
|      2|  13794|     1|  3.255959|
|      1|  18361|     4|  4.434146|
|      1|  21487|     4|  4.294687|
|      1|  25214|     4| 4.4809976|
|      1|  25164|     4|  4.033773|
|      1|  31001|     4|  4.667442|
|      2|   1169|     3| 3.7644792|
|      2|   6063|     1|  3.644886|
|      1|    314|     5| 4.3220677|
|      2|  10509|     2|  4.251693|
|      2|  12874|     4|  4.716813|
|      1|  51838|     5| 4.5926948|
|      1|  33890|     3|  3.960748|
|      1|  39423|     3| 3.7693806|
|      2|  14372|     3|  4.600501|
+-------+-------+------+----------+
only showing top 20 rows



In [None]:
#Printing and calculating RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = nan


In [None]:
#Filtering user with user id "5461" with book id on which it has given the reviews
user1 = test_data.filter(test_data['user_id']==5461).select(['book_id','user_id'])

#Displaying user1 data
user1.show()


+-------+-------+
|book_id|user_id|
+-------+-------+
|      7|   5461|
|     15|   5461|
|     43|   5461|
|     48|   5461|
|     66|   5461|
|    111|   5461|
|    116|   5461|
|    117|   5461|
|    118|   5461|
|    121|   5461|
|    130|   5461|
|    148|   5461|
|    222|   5461|
|    255|   5461|
|    306|   5461|
|    321|   5461|
|    339|   5461|
|    401|   5461|
|    454|   5461|
|    489|   5461|
+-------+-------+
only showing top 20 rows



In [None]:
#Traning and evaluating for user1 with our model trained with the help of training data
recommendations = model.transform(user1)

#Displaying the predictions of books for user1
recommendations.orderBy('prediction',ascending=False).show()


+-------+-------+----------+
|book_id|user_id|prediction|
+-------+-------+----------+
|    339|   5461| 4.7859526|
|    489|   5461| 4.7581034|
|    561|   5461| 4.6387863|
|   1266|   5461| 4.6110306|
|    306|   5461|   4.55613|
|    401|   5461|   4.49409|
|     15|   5461| 4.4881763|
|    117|   5461| 4.4501696|
|    111|   5461|  4.285496|
|    148|   5461| 4.2722573|
|    222|   5461| 4.2612677|
|     43|   5461| 4.2518234|
|     48|   5461|  4.203925|
|     66|   5461| 4.1053185|
|    639|   5461| 4.0750923|
|   1566|   5461| 3.9123073|
|    121|   5461| 3.8474429|
|    130|   5461| 3.8462512|
|    118|   5461| 3.8150206|
|    731|   5461| 3.7864227|
+-------+-------+----------+
only showing top 20 rows



**random forest and decision tree**

In [None]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=e2accb5c72e546722e4e0b03a5d10ebc8afebe85118b4615e4cbfa6a5ee82a34
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler

# Create a Spark session
spark = SparkSession.builder.appName("BookPrediction").getOrCreate()

# Create the feature vector assembler
featureCols = ["book_id", "user_id"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# Transform the data using the feature vector assembler
dataAssembled = assembler.transform(data)

# Split the data into training and test sets
(trainingData, testData) = dataAssembled.randomSplit([0.8, 0.2])

# Train a Decision Tree model
dt = DecisionTreeClassifier(labelCol="rating", featuresCol="features")
dtModel = dt.fit(trainingData)

# Make predictions on the test data using the Decision Tree model
dtPredictions = dtModel.transform(testData)

# Evaluate the model using RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
dtRMSE = evaluator.evaluate(dtPredictions)
print("Decision Tree RMSE: {:.2f}".format(dtRMSE))

# Single user recommendation example
user_id = 5461

# Filter the data for the given user
user_data = data.filter(data.user_id == user_id)

# Assemble the features for the user data
user_data_assembled = assembler.transform(user_data)

# Make predictions for the user using the Decision Tree model
user_predictions = dtModel.transform(user_data_assembled)

# Show the recommendations for the user
user_recommendations = user_predictions.select("user_id", "book_id", "prediction")
user_actual_ratings = user_data.join(user_recommendations, ["user_id", "book_id"], "left")

user_actual_ratings.show()


Decision Tree RMSE: 1.10
+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|   5461|      1|     3|       5.0|
|   5461|      2|     4|       5.0|
|   5461|      3|     2|       5.0|
|   5461|      5|     5|       5.0|
|   5461|      7|     5|       5.0|
|   5461|      8|     4|       5.0|
|   5461|      9|     4|       5.0|
|   5461|     10|     3|       5.0|
|   5461|     11|     4|       5.0|
|   5461|     14|     4|       5.0|
|   5461|     15|     5|       5.0|
|   5461|     16|     4|       5.0|
|   5461|     19|     5|       5.0|
|   5461|     22|     4|       5.0|
|   5461|     28|     5|       5.0|
|   5461|     31|     4|       5.0|
|   5461|     32|     5|       5.0|
|   5461|     33|     4|       5.0|
|   5461|     35|     4|       5.0|
|   5461|     37|     4|       5.0|
+-------+-------+------+----------+
only showing top 20 rows



This code performs the training and evaluation of a Decision Tree model for book prediction, and generates recommendations for a specific user based on the trained model.

Spark Session Creation: The code starts by creating a Spark session using SparkSession.builder. This session will be used for interacting with Spark.

Feature Vector Assembly: The code defines a VectorAssembler that combines the "book_id" and "user_id" columns into a single feature vector column called "features".

Data Transformation: The code applies the VectorAssembler to the dataset using assembler.transform, creating a new DataFrame dataAssembled with the additional "features" column.

Data Splitting: The code splits the data into training and test sets using randomSplit, with 80% of the data for training (trainingData) and 20% for testing (testData).

Model Training: The code trains a Decision Tree model using DecisionTreeClassifier. It sets the "rating" column as the label column and the "features" column as the features column. The model is trained on the trainingData DataFrame.

Model Evaluation: The code makes predictions on the test data using the trained Decision Tree model (dtModel.transform). It then evaluates the model's performance using the RMSE metric (RegressionEvaluator). The calculated RMSE value is printed using print.

Single User Recommendation: The code specifies a single user (user_id) for which recommendations need to be generated. It filters the data DataFrame to retrieve the data for the specified user (user_data).

User Data Transformation: The code applies the VectorAssembler to the user data using assembler.transform, creating a new DataFrame user_data_assembled with the additional "features" column.

User Prediction: The code makes predictions for the user data using the trained Decision Tree model (dtModel.transform). The predictions are stored in the user_predictions DataFrame.

User Recommendation Display: The code selects the "user_id", "book_id", and "prediction" columns from the user_predictions DataFrame. It then joins this information with the original user data (user_data) to include the actual ratings in the user_actual_ratings DataFrame. Finally, it displays the recommendations for the user using user_actual_ratings.show().

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

# Create a Spark session
spark = SparkSession.builder.appName("BookPrediction").getOrCreate()

# Create the feature vector assembler
featureCols = ["book_id", "user_id"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# Transform the data using the feature vector assembler
dataAssembled = assembler.transform(data)

# Split the data into training and test sets
(trainingData, testData) = dataAssembled.randomSplit([0.8, 0.2])

# Train a Random Forest model
rf = RandomForestClassifier(labelCol="rating", featuresCol="features")
rfModel = rf.fit(trainingData)

# Make predictions on the test data using the Random Forest model
rfPredictions = rfModel.transform(testData)

# Evaluate the model using RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rfRMSE = evaluator.evaluate(rfPredictions)
print("Random Forest RMSE: {:.2f}".format(rfRMSE))

# Single user recommendation example
user_id = 5461

# Filter the data for the given user
user_data = data.filter(data.user_id == user_id)

# Assemble the features for the user data
user_data_assembled = assembler.transform(user_data)

# Make predictions for the user using the Random Forest model
user_predictions = rfModel.transform(user_data_assembled)

# Show the recommendations for the user
user_recommendations = user_predictions.select("user_id", "book_id", "prediction")
user_actual_ratings = user_data.join(user_recommendations, ["user_id", "book_id"], "left")

user_actual_ratings.show()


Random Forest RMSE: 1.09
+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|   5461|      1|     3|       4.0|
|   5461|      2|     4|       4.0|
|   5461|      3|     2|       4.0|
|   5461|      5|     5|       4.0|
|   5461|      7|     5|       4.0|
|   5461|      8|     4|       4.0|
|   5461|      9|     4|       4.0|
|   5461|     10|     3|       4.0|
|   5461|     11|     4|       4.0|
|   5461|     14|     4|       4.0|
|   5461|     15|     5|       4.0|
|   5461|     16|     4|       4.0|
|   5461|     19|     5|       4.0|
|   5461|     22|     4|       4.0|
|   5461|     28|     5|       4.0|
|   5461|     31|     4|       4.0|
|   5461|     32|     5|       4.0|
|   5461|     33|     4|       4.0|
|   5461|     35|     4|       4.0|
|   5461|     37|     4|       4.0|
+-------+-------+------+----------+
only showing top 20 rows



This code performs the training and evaluation of a Random Forest model for book prediction, and generates recommendations for a specific user based on the trained model.

Spark Session Creation: The code starts by creating a Spark session using SparkSession.builder. This session will be used for interacting with Spark.

Feature Vector Assembly: The code defines a VectorAssembler that combines the "book_id" and "user_id" columns into a single feature vector column called "features".

Data Transformation: The code applies the VectorAssembler to the dataset using assembler.transform, creating a new DataFrame dataAssembled with the additional "features" column.

Data Splitting: The code splits the data into training and test sets using randomSplit, with 80% of the data for training (trainingData) and 20% for testing (testData).

Model Training: The code trains a Random Forest model using RandomForestClassifier. It sets the "rating" column as the label column and the "features" column as the features column. The model is trained on the trainingData DataFrame.

Model Evaluation: The code makes predictions on the test data using the trained Random Forest model (rfModel.transform). It then evaluates the model's performance using the RMSE metric (RegressionEvaluator). The calculated RMSE value is printed using print.

Single User Recommendation: The code specifies a single user (user_id) for which recommendations need to be generated. It filters the data DataFrame to retrieve the data for the specified user (user_data).

User Data Transformation: The code applies the VectorAssembler to the user data using assembler.transform, creating a new DataFrame user_data_assembled with the additional "features" column.

User Prediction: The code makes predictions for the user data using the trained Random Forest model (rfModel.transform). The predictions are stored in the user_predictions DataFrame.

User Recommendation Display: The code selects the "user_id", "book_id", and "prediction" columns from the user_predictions DataFrame. It then joins this information with the original user data (user_data) to include the actual ratings in the user_actual_ratings DataFrame. Finally, it displays the recommendations for the user using user_actual_ratings.show()

EXECUTION TIME FOR COLLABORATIVE FILTERING IN PYSPARK

In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Create a Spark session
spark = SparkSession.builder.appName("BookPrediction").getOrCreate()

# Dividing the data using random split into train_data and test_data in 80% and 20% respectively
start_time = time.time()
train_data, test_data = data.randomSplit([0.8, 0.2])
end_time = time.time()
data_split_execution_time = end_time - start_time

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5,
          regParam=0.01,
          userCol="user_id",
          itemCol="book_id",
          ratingCol="rating")

# Fitting the model on the train_data
start_time = time.time()
model = als.fit(train_data)
end_time = time.time()
model_training_execution_time = end_time - start_time

# Evaluate the model by computing the RMSE on the test data
start_time = time.time()
predictions = model.transform(test_data)
end_time = time.time()
prediction_execution_time = end_time - start_time

# Printing and calculating RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
start_time = time.time()
rmse = evaluator.evaluate(predictions)
end_time = time.time()
rmse_calculation_execution_time = end_time - start_time

# Filtering user with user id "5461" with book id on which it has given the reviews
user1 = test_data.filter(test_data['user_id'] == 5461).select(['book_id', 'user_id'])

# Training and evaluating for user1 with our model trained with the help of training data
start_time = time.time()
recommendations = model.transform(user1)
end_time = time.time()
user_recommendation_execution_time = end_time - start_time

# Printing the execution times
print("Data Split Execution Time: {:.2f} seconds".format(data_split_execution_time))
print("Model Training Execution Time: {:.2f} seconds".format(model_training_execution_time))
print("Prediction Execution Time: {:.2f} seconds".format(prediction_execution_time))
print("RMSE Calculation Execution Time: {:.2f} seconds".format(rmse_calculation_execution_time))
print("User Recommendation Execution Time: {:.2f} seconds".format(user_recommendation_execution_time))


Data Split Execution Time: 0.03 seconds
Model Training Execution Time: 14.17 seconds
Prediction Execution Time: 0.14 seconds
RMSE Calculation Execution Time: 4.25 seconds
User Recommendation Execution Time: 0.10 seconds


EXECUTION TIME FOR DECISION TREE IN PYSPARK

---



In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler

# Create a Spark session
spark = SparkSession.builder.appName("BookPrediction").getOrCreate()

# Create the feature vector assembler
featureCols = ["book_id", "user_id"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# Transform the data using the feature vector assembler
dataAssembled = assembler.transform(data)

# Split the data into training and test sets
(trainingData, testData) = dataAssembled.randomSplit([0.8, 0.2])

# Measure the execution time for training the Decision Tree model
start_time = time.time()
dt = DecisionTreeClassifier(labelCol="rating", featuresCol="features")
dtModel = dt.fit(trainingData)
end_time = time.time()
execution_time = end_time - start_time
print("Training Execution Time: {:.2f} seconds".format(execution_time))

# Measure the execution time for making predictions on the test data
start_time = time.time()
dtPredictions = dtModel.transform(testData)
end_time = time.time()
execution_time = end_time - start_time
print("Prediction Execution Time: {:.2f} seconds".format(execution_time))

# Measure the execution time for the single user recommendation example
start_time = time.time()
user_id = 5461
user_data = data.filter(data.user_id == user_id)
user_data_assembled = assembler.transform(user_data)
user_predictions = dtModel.transform(user_data_assembled)
user_recommendations = user_predictions.select("user_id", "book_id", "prediction")
user_actual_ratings = user_data.join(user_recommendations, ["user_id", "book_id"], "left")
end_time = time.time()
execution_time = end_time - start_time
print("Single User Recommendation Execution Time: {:.2f} seconds".format(execution_time))


Training Execution Time: 15.81 seconds
Prediction Execution Time: 0.75 seconds
Single User Recommendation Execution Time: 0.31 seconds


EXECUTION TIME FOR RANDOM FOREST IN PYSPARK

In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

# Create a Spark session
spark = SparkSession.builder.appName("BookPrediction").getOrCreate()

# Create the feature vector assembler
featureCols = ["book_id", "user_id"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# Transform the data using the feature vector assembler
dataAssembled = assembler.transform(data)  # Replace 'data' with your DataFrame variable

# Split the data into training and test sets
(trainingData, testData) = dataAssembled.randomSplit([0.8, 0.2])

# Measure the execution time for training the Random Forest model
start_time = time.time()
rf = RandomForestClassifier(labelCol="rating", featuresCol="features")
rfModel = rf.fit(trainingData)
end_time = time.time()
execution_time = end_time - start_time
print("Training Execution Time: {:.2f} seconds".format(execution_time))

# Measure the execution time for making predictions on the test data
start_time = time.time()
rfPredictions = rfModel.transform(testData)
end_time = time.time()
execution_time = end_time - start_time
print("Prediction Execution Time: {:.2f} seconds".format(execution_time))

# Measure the execution time for the single user recommendation example
start_time = time.time()
user_id = 5461
user_data = data.filter(data.user_id == user_id)  # Replace 'data' with your DataFrame variable
user_data_assembled = assembler.transform(user_data)
user_predictions = rfModel.transform(user_data_assembled)
user_recommendations = user_predictions.select("user_id", "book_id", "prediction")
user_actual_ratings = user_data.join(user_recommendations, ["user_id", "book_id"], "left")
end_time = time.time()
execution_time = end_time - start_time
print("Single User Recommendation Execution Time: {:.2f} seconds".format(execution_time))


Training Execution Time: 27.09 seconds
Prediction Execution Time: 0.24 seconds
Single User Recommendation Execution Time: 0.36 seconds
