#  Machine learning library in spark (mllib) 
- In this example 'mllib' in spark is used to calculate the mean squared error between the prediction and original test data.
- This example is taken from datacamp. 

In [None]:
#--------------------------Important libraries--------------------------------
import findspark
findspark.init("path_to_spark-3.1.2-bin-hadoop3.2")
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate(SparkConf())
spark=SparkSession.builder.getOrCreate()


# Import the library for ALS
from pyspark.mllib.recommendation import ALS, Rating

#-------------------------------------------------------------




In [46]:
# Load the data into RDD
data = sc.textFile("ratings.csv")

# Split the RDD 
ratings = data.map(lambda l: l.split(','))

# Transform the ratings RDD 
ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))  # collecting only first three in every tuple.

# Split the data into training and test
training_data, test_data = ratings_final.randomSplit([0.8, 0.2])   # 80% and 20% training and test respectively

# For the checking
#print(ratings.take(10))
#print(ratings_final.take(10))
#print(training_data.take(10))
#print(test_data.take(10))

In [47]:
# Create the ALS model on the training data
model = ALS.train(training_data, rank=10, iterations=10)

# Drop the ratings column 
testdata_no_rating = test_data.map(lambda p: (p[0], p[1]))

# Predict the model  
predictions = model.predictAll(testdata_no_rating)

# Return the first 2 rows of the RDD
predictions.take(2)

# Just for checking
#print(testdata_no_rating.take(10))
#print(test_data.take(10))
#print(predictions.take(10))

[Rating(user=452, product=1084, rating=3.958058608806706),
 Rating(user=548, product=1084, rating=3.609724971040655)]

In [48]:
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))

# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))

# Join the ratings data with predictions data

rates_and_preds = rates.join(preds)

#-------------------------------Just uncomment the following to see the position and values in tuples-------------
#print(ratings_final.take(5))
#print(rates.take(20))
#print(predictions.take(20))
#print(preds.take(5))
#print(rates_and_preds.take(5))
#-----------------------------------------------------------------------------------------------------------------------
# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))

Mean Squared Error of the model for the test data = 1.37


 # Conclusion:
 MSE looks quite high.

In [50]:
# Those who do not understand the working of join, They can uncomment the following two examples and can observe the working
# Example 1:
#A=["a","b","c","d"]
#B=" understanding the join "
#print(B.join(A))

# Example 2

#x = sc.parallelize([((2, 1), 4)])
#y = sc.parallelize([((2, 1), 3)])
#C=x.join(y)
#print(C.collect())