In [1]:
import pyspark
from pyspark import SparkContext

In [2]:
sc.stop()

In [3]:
sc=SparkContext("local","first app")

In [4]:
sqlContext = SQLContext(sc)

In [5]:
from pyspark.sql.types import *

In [6]:
#structure of the dataframe is defined
schema = StructType([StructField("userId", IntegerType(), True),StructField("movieId", IntegerType(), True),StructField("rating", FloatType(), True),StructField("timestamp", StringType(), True)])

In [7]:
#load the data from input csv file
df = sqlContext.read.csv("/Users/vihitakesiraju/Downloads/ml-25m/ratings.csv",header='true',schema=schema)

In [8]:
#total number of rows
df.count()

25000095

In [9]:
#datatype of each field
df.dtypes

[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'float'),
 ('timestamp', 'string')]

In [10]:
#splitting the data into training and testing
train,test=df.randomSplit([0.9, 0.1], seed=12345)

In [11]:
#count of training data
train.count()

22500405

In [12]:
#count of testing data
test.count()

2499690

In [13]:
#downloading the training data into training.csv file
train.write.csv ('training_data.csv', header=True)

In [15]:
import glob
import pandas as pd
path ="/Users/vihitakesiraju/Documents/LSA/training_data.csv"
filenames = glob.glob(path + "/*.csv")
dftr = []
for filename in filenames:
    dftr.append(pd.read_csv(filename))
training_frame = pd.concat(dftr, ignore_index=True)

In [16]:
training_frame.to_csv('training.csv',index=False,header='true')

In [17]:
#downloading testing data into testing.csv file
test.write.csv('testing_data.csv', header=True)

In [18]:
import glob
import pandas as pd
path ="/Users/vihitakesiraju/Documents/LSA/testing_data.csv"
filenames = glob.glob(path + "/*.csv")
dfte = []
for filename in filenames:
    dfte.append(pd.read_csv(filename))
testing_frame = pd.concat(dfte, ignore_index=True)

In [19]:
testing_frame.to_csv('testing.csv',index=False,header='true')

In [20]:
training_frame.head(5)

Unnamed: 0,userId,movieId,rating,timestamp
0,129488,4011,3.5,1480319934
1,129488,4014,3.0,1481992836
2,129488,4034,3.5,1480005535
3,129488,4235,3.5,1479989862
4,129488,4239,3.5,1479833282


In [21]:
testing_frame.head(5)

Unnamed: 0,userId,movieId,rating,timestamp
0,129488,4027,3.5,1480006375
1,129488,4643,2.5,1479986340
2,129488,4776,3.0,1479990199
3,129488,4848,3.5,1479990596
4,129488,4993,3.5,1480413788


In [22]:
trainrating=train.rdd.map(lambda w:w[2])

In [23]:
trainrating.take(5)

[5.0, 3.5, 5.0, 5.0, 4.0]

In [24]:
count_rating = trainrating.count()

In [25]:
sum_rating = trainrating.sum()


In [26]:
avg_rating = sum_rating/count_rating

In [27]:
print(avg_rating)

3.533899945356539


In [28]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row


In [29]:
#implementing ALS model on training dataset
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(train)

In [30]:
#creating dataframe with only userid and movieid
testdata=test['userId','movieId']

In [31]:
testdata.head(2)

[Row(userId=1, movieId=899), Row(userId=1, movieId=1237)]

In [32]:
#predicting the rating based on training data
predictions = model.transform(testdata)

In [33]:
predictions.take(2)

[Row(userId=72337, movieId=148, prediction=2.895411252975464),
 Row(userId=151614, movieId=148, prediction=2.779909133911133)]

In [34]:
#form an rdd with original ratings 
testrdd=test['userId','movieId','rating']

In [35]:
testrdd.take(5)

[Row(userId=1, movieId=899, rating=3.5),
 Row(userId=1, movieId=1237, rating=5.0),
 Row(userId=1, movieId=7937, rating=3.0),
 Row(userId=1, movieId=27266, rating=4.5),
 Row(userId=2, movieId=110, rating=5.0)]

In [36]:
pr=predictions.alias('pr')

In [37]:
trddr=testrdd.alias('trddr')

In [38]:
cond=[pr.userId==trddr.userId, pr.movieId==trddr.movieId]

In [39]:
#create a dataframe with both predicted and original data
finalrating=pr.join(trddr,cond,how='inner')

In [40]:
finalrating.take(5)

[Row(userId=67, movieId=2193, prediction=3.3077235221862793, userId=67, movieId=2193, rating=4.0),
 Row(userId=80, movieId=3598, prediction=2.0839927196502686, userId=80, movieId=3598, rating=1.0),
 Row(userId=99, movieId=858, prediction=3.3273472785949707, userId=99, movieId=858, rating=3.0),
 Row(userId=112, movieId=1196, prediction=3.4976935386657715, userId=112, movieId=1196, rating=4.0),
 Row(userId=123, movieId=2502, prediction=3.526134967803955, userId=123, movieId=2502, rating=4.0)]

In [41]:
#calculating mse value for predicted and original data
mse=finalrating.rdd.map(lambda r:((r[2]-r[5])**2)).mean()

In [42]:
mse

0.6550567566413966

In [43]:
import math
rmse=math.sqrt(mse)

In [44]:
rmse

0.8093557664225274

In [49]:
#export the predicted ratings to csv file
pred_rating.write.csv ('predicted_original_rating.csv', header=True)

In [50]:
import glob
import pandas as pd
path ="/Users/vihitakesiraju/Documents/LSA/predicted_original_rating.csv"
filenames = glob.glob(path + "/*.csv")
dfpr = []
for filename in filenames:
    dfpr.append(pd.read_csv(filename))
predicted_frame = pd.concat(dfpr, ignore_index=True)

In [51]:
predicted_frame.to_csv('output.csv',index=False,header='true')