Get the data file from 

Data is from http://socialnetworks.mpi-sws.mpg.de/data/facebook-links.txt.gz

In [0]:
# !wget -q http://socialnetworks.mpi-sws.mpg.de/data/facebook-links.txt.gz
# !gunzip  facebook-links.txt.gz

Setting up environment 

In [2]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

#hide
importing all the libraries which are needed

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.recommendation import Rating
import pandas
import pyspark.sql.dataframe

Initialing pyspark and reading the raw data

In [19]:
spark = SparkSession(sc)
rawData = sc.textFile('./facebook-links.txt',3)

In [20]:

def cleanFile(rawFile):
  ''' we are changing the data into useful form

      ARGS
      rawFile:  the data which we need to parse into useful form

      RETURNS
      tuple of user and friend [user,friend] 
  '''
  user,frnd,waste = rawFile[0].split('\t')
  return [int(user),int(frnd)]

Code Below
*  calling cleanfile function on rawData to convert the rdd in useful form
*  Also dispaly sample of the data




In [21]:
userFriendRDD  = rawData.map(lambda x: x.split(',')).map(cleanFile)
userFriendRDD.take(10)

[[1, 2],
 [1, 3],
 [1, 4],
 [1, 5],
 [1, 6],
 [1, 7],
 [1, 8],
 [1, 9],
 [1, 10],
 [1, 11]]

making new RDD by swaping columns as the the original RDD has neighbor nodes (Friends) which are not in the main node field(user)

In [22]:
reversedUserFriendRDD = userFriendRDD.map(lambda x: [x[1],x[0]])

#Generate Jaccard similarity Score

In [23]:
def unionList(list1,list2):
  ''' doing to union of the given list
  
    ARGS
    list1: list one on which we need to perform union action
    list2: list two on which we need to perform union action

    RETURN
     the list after performing the union
  '''
  return list(set(list1)| set(list2))

In [24]:
def intersctionList(list1,list2):
  ''' doing to intersection of the given list

  ARGS
  list1: list one on which we need to perform intersction action
  list2: list two on which we need to perform intersction action

  RETURN
    the list after performing the intersction
  '''
  return list(set(list1) & set(list2))

In [25]:
def getScore(list1,list2):
  ''' calculating jaccard_similarity score of the given list
  ARGS
  list1: list on which we need to calcualte the score
  list2: list on which we need to calcualte the score

  RETURN 
  the Jaccard_similarity score of given list
  '''

  return round(len(intersctionList(list1,list2))/len(unionList(list1,list2)),5)


#Dictionay
converting userFriendRDD into matrix form and then into Dictionary 

In [26]:
def addUnknowNodes(node,dictionary):
  ''' going to add value in the dictions if not present 

    ARGS
    node: tuple of [node,neighbour]/[user,friend]
    dictionary: the dictionary in which we are going to add tuple

    RETURN
    nothing
  '''  
  key,value =node

  if key not in dictionary.keys():
    dictionary[key] = value

Converting the rdd to into a martrix form 

In [27]:
userFriendMatrixRDD_withList = userFriendRDD.groupByKey().map(lambda x : (x[0], list(x[1])))
reversedUserFriendRDDMatrix = reversedUserFriendRDD.groupByKey().map(lambda x : [x[0], list(x[1])])

Converting the above rdd's into dictionary

In [28]:
userFriendDict = userFriendMatrixRDD_withList.collectAsMap()
reverse = reversedUserFriendRDDMatrix.collect()

In [29]:
temp = userFriendRDD.filter(lambda x: x[1] not in userFriendDict.keys()).map(lambda x: [x[1],x[0]])
userFriendRDD = userFriendRDD.union(temp)

Deleting refrence to free up memory

In [30]:
del userFriendMatrixRDD_withList, reversedUserFriendRDDMatrix,temp

In [31]:
for val in reverse:
  addUnknowNodes(val,userFriendDict)

In [32]:
len(userFriendDict)

63731

In [33]:
def addRating(x):
  ''' adding Jaccard_similarity score as 3rd column for each pair the come
  
      ARGS
      x: tuple of [user,friend]/[node,neighbor] 

      RETURN 
      return a tuple of [user,friend,Jaccard_similarity_score] of the given node 
      and it's neighour

  '''

  list1= userFriendDict[x[0]]
  try:
    list2= userFriendDict[x[1]]
  except:
    list2 =[-1]
  score = getScore(list1,list2)
  return [x[0],x[1], score]

#Prediction model 


Randomly Partitioning the RDD to have trainingRDD and testRDD

In [34]:
trainingRDD,testRDD =  userFriendRDD.randomSplit([0.8, 0.2])

Traing the model witht he training RDD
We are using pyspark.mllib library 

refrence for the model
https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html


In [35]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import time

def trainModel(rank,numIterations,trainingRDD):
  ''' We are training the model here

  ARGS 
  rank: Number of features to use (also referred to as the number of latent factors)

  numIterations:  Number of iterations of ALS.

  trainingRDD: the RDD using which we are going to train our model. should be in the form of [user,friend]/[node/neighbor]
  
  Return:  [trained model, time took to train the model]
  '''
  import time

  start_time1 = time.time()
  rank = rank
  numIterations = numIterations

  model = ALS.train(trainingRDD.map(addRating), rank, numIterations)
  print("--- CPU time for training %s  seconds ---" % (time.time() - start_time1))
  return [model,float(time.time() - start_time1)]


We are using in build library function to predict the result for testRDD 

In [36]:
def makePrediction(model,testRDD):
  ''' We are making the predictions here for the test data

    ARGS
    model: the trained model using which we are going to make the rredictions
    testRDD: the testRDD/ test data

    Return: the predictions RDD
  '''

  prediction = model.predictAll(testRDD).map(lambda r: ((r[0], r[1]), r[2]))
  return prediction
  # print("--- CPU time for predicting %s  seconds ---" % (time.time() - start_time2))
  # print("--- CPU time for overall process %s  seconds ---" % (time.time() - start_time1))

Checking the mean square error of the predicted result

In [37]:
def getMeanSqError(testRDD,prediction):
  ''' Calculating the mean square error here

      ARGS
      testRDD: the testRDD using which we made predictions
      prediction: the predictions made by the model using testRDD

      return MSE
  '''
  ratesAndPreds = testRDD.map(addRating).map(lambda r: ((r[0], r[1]), r[2])).join(prediction)
  MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
  return MSE
  print("Mean Squared Error = " + str(MSE))
dataGathered = []


In [None]:
rank = 10
numIterations =10

model,time  = trainModel(rank,numIterations,trainingRDD)
prediction = makePrediction(model,testRDD)
mse = getMeanSqError(testRDD,prediction)
dataGathered.append([numIterations,mse,time])

In [None]:
del time,mse,prediction

In [0]:
rec = model.recommendProductsForUsers(2)

NameError: ignored