In [0]:
!wget -q http://apache.mirrors.hoobly.com/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
#Untar the Spark installer
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz

In [59]:
!ls 

spark-2.4.5-bin-hadoop2.7      spark-2.4.5-bin-hadoop2.7.tgz.1
spark-2.4.5-bin-hadoop2.7.tgz  u.data


In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate();

In [64]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [65]:
%cd /content/drive/My Drive/Big Data - Final Assignment
!ls

/content/drive/My Drive/Big Data - Final Assignment
spark-2.4.5-bin-hadoop2.7      spark-2.4.5-bin-hadoop2.7.tgz.1
spark-2.4.5-bin-hadoop2.7.tgz  u.data


In [0]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [67]:
data = sc.textFile("/content/drive/My Drive/Big Data - Final Assignment/u.data")

#Splitting the data
data = data.map(lambda x:x.split('\t'))
#Converting the data to Spark DataFrame
data_df = data.toDF()
#Showing the first 20 rows of the dataframe
data_df.show()

+---+----+---+---------+
| _1|  _2| _3|       _4|
+---+----+---+---------+
|196| 242|  3|881250949|
|186| 302|  3|891717742|
| 22| 377|  1|878887116|
|244|  51|  2|880606923|
|166| 346|  1|886397596|
|298| 474|  4|884182806|
|115| 265|  2|881171488|
|253| 465|  5|891628467|
|305| 451|  3|886324817|
|  6|  86|  3|883603013|
| 62| 257|  2|879372434|
|286|1014|  5|879781125|
|200| 222|  5|876042340|
|210|  40|  3|891035994|
|224|  29|  3|888104457|
|303| 785|  3|879485318|
|122| 387|  5|879270459|
|194| 274|  2|879539794|
|291|1042|  4|874834944|
|234|1184|  2|892079237|
+---+----+---+---------+
only showing top 20 rows



In [0]:
from pyspark.sql.types import NumericType
import pandas as pd

new_data = data_df.select("_1", "_2", "_3")

#Change the Column Names of new_data Spark Dataframe
new_data = new_data.selectExpr("_1 as user_id", "_2 as item_id", "_3 as rating")
new_df   = new_data.toPandas()

#new_data = new_data.withColumn("user_id", new_data["user_id"].cast(NumericType()))
#new_data = new_data.withColumn("item_id", new_data.item_id.cast(NumericType()))
#new_data = new_data.withColumn("ratings", new_data.ratings.cast(NumericType()))

#new_data.show()
new_df["user_id"] = pd.to_numeric(new_df["user_id"])
new_df["item_id"] = pd.to_numeric(new_df["item_id"])
new_df["rating"]  = pd.to_numeric(new_df["rating"])

In [0]:
data_new = spark.createDataFrame(new_df)

In [70]:
#Split the Ratings into Train and Test

train, test = data_new.randomSplit([0.8,0.2])

#cache the data to speed up training
train.cache()
test.cache()

train.take(5)

[Row(user_id=1, item_id=1, rating=5),
 Row(user_id=1, item_id=2, rating=3),
 Row(user_id=1, item_id=5, rating=3),
 Row(user_id=1, item_id=6, rating=5),
 Row(user_id=1, item_id=8, rating=1)]

In [71]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

#Create the model on the training data
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")

model = als.fit(train)

# Generate predictions on test_data
predictions = model.transform(test)

# Tell Spark how to evaluate predictions
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Obtain and print RMSE
rmse = evaluator.evaluate(predictions)
print ("RMSE: ", rmse)

RMSE:  0.9187412998120554


In [0]:
# Tell Spark what values to try for each hyperparameter
from pyspark.ml.tuning import ParamGridBuilder

param_grid = ParamGridBuilder()\
.addGrid(als.rank, [5, 40, 80, 120])\
.addGrid(als.regParam, [.05, .1, 1.5])\
.build()

# Tell Spark how to evaluate model performance
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build cross validation step using CrossValidator
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds = 5)

# Run the cv on the training data
model = cv.fit(train)

# Extract best combination of values from cross validation
best_model = model.bestModel

In [73]:
# Extract best combination of values from cross validation
best_model  = model.bestModel

# Generate test set predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse        = evaluator.evaluate(predictions)

# Print evaluation metrics and model parameters
print ("**Best Model**")
print ("RMSE = ", rmse)

**Best Model**
RMSE =  0.9126183644345486


In [144]:
# Getting TOP 10 Item Id's for every User based on the Best Model which we have built
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating
from pyspark.ml.recommendation import ALS, ALSModel

top10 = best_model.recommendForAllUsers(10)
top10.take(10)

[Row(user_id=471, recommendations=[Row(item_id=465, rating=4.49593448638916), Row(item_id=932, rating=4.468419551849365), Row(item_id=140, rating=4.42611026763916), Row(item_id=143, rating=4.40766716003418), Row(item_id=477, rating=4.402740478515625), Row(item_id=422, rating=4.381832599639893), Row(item_id=225, rating=4.311626434326172), Row(item_id=272, rating=4.242356300354004), Row(item_id=82, rating=4.2368693351745605), Row(item_id=393, rating=4.221230983734131)]),
 Row(user_id=463, recommendations=[Row(item_id=887, rating=4.26688289642334), Row(item_id=253, rating=4.2500410079956055), Row(item_id=19, rating=4.1852006912231445), Row(item_id=1167, rating=4.14856481552124), Row(item_id=169, rating=4.130365371704102), Row(item_id=1449, rating=4.12411642074585), Row(item_id=221, rating=4.09617805480957), Row(item_id=408, rating=4.083878517150879), Row(item_id=114, rating=4.067584037780762), Row(item_id=20, rating=4.065301895141602)]),
 Row(user_id=833, recommendations=[Row(item_id=1597

In [0]:
#Converting the Spark Dataframe to Pandas Dataframe for slicing item id's
top10_rec = top10.toPandas()

In [0]:
#Fetching only the item ids from the recommendations
for i in range(len(top10_rec)):
  lst  = top10_rec.iloc[i,1]
  top10_new = []
  top10_itemids = 0

  for j in range(len(lst)):
    lst1 = lst[j]
    top10_new.append(str(lst1[0])) 
  top10_itemids = ', '.join(top10_new)
  top10_rec.loc[i, 'recommendations'] = top10_itemids

In [143]:
#The final dataframe having the recommended item ids's for every user
top10_rec

Unnamed: 0,user_id,recommendations
0,471,"465, 932, 140, 143, 477, 422, 225, 272, 82, 393"
1,463,"887, 253, 19, 1167, 169, 1449, 221, 408, 114, 20"
2,833,"1597, 1070, 1187, 589, 179, 32, 1019, 192, 488..."
3,496,"56, 774, 42, 532, 652, 277, 89, 721, 50, 190"
4,148,"169, 408, 50, 172, 114, 168, 1449, 516, 174, 529"
...,...,...
938,208,"496, 136, 64, 1450, 435, 88, 966, 194, 316, 520"
939,401,"316, 528, 735, 958, 318, 610, 963, 162, 98, 196"
940,422,"1449, 320, 474, 98, 483, 1131, 127, 919, 64, 59"
941,517,"1177, 300, 761, 258, 50, 1056, 89, 223, 12, 408"


In [0]:
#Exporting the file to a Text file
top10_rec.to_csv(r'/content/drive/My Drive/Big Data - Final Assignment\top10recommendations1.txt', index=None, sep='\t', mode='a')