#CSC8101 - Big Data Analytics - Spark Programming Coursework

##Task 1 - Produce summary statistics for data set

In [3]:
#Import necessary libraries 
import matplotlib.pyplot as plt
import pyspark.sql as sparksql
import pandas as pd

In [4]:
#Load in data
ratingsURL = 'https://csc8101storageblob.blob.core.windows.net/datablobcsc8101/ratings.csv'
ratings = spark.createDataFrame(pd.read_csv(ratingsURL))

* Average number of ratings per user

In [6]:
totalreviews=ratings.count() #Find total number of ratings
allusers=ratings.select("userId").distinct().count() #Find number of unique users
totalreviews/allusers #Average number of ratings per user

* Average number of ratings per movie

In [8]:
allmovies=ratings.select("movieId").distinct().count() # Find number of unique movies
totalreviews/allmovies #Average number of ratings per movie

* Histogram showing the distribution of number of ratings of users

In [10]:
display(ratings.groupby('userId').count())

* Histogram showing distirbution of number of ratings for movies

In [12]:
display(ratings.groupby('movieId').count())

Histograms of distribution of all ratings

In [14]:
display(ratings.select("rating"))

rating
3.5
3.5
3.5
3.5
3.5
3.5
4.0
4.0
4.0
4.0


Histogram of distirbution of average ratings of users

In [16]:
avgratings=ratings.groupby('userId').mean("rating")
display(avgratings)

userId,avg(rating)
17043,3.844827586206896
17048,3.455555555555556
17499,3.0689655172413794
17703,3.37593984962406
17971,3.5
17979,4.006849315068493
18147,3.678571428571429
18196,3.1021825396825395
18295,3.3511904761904763
18348,4.2040133779264215


Histogram showing the distirubtion of average movie ratings for each movie

In [18]:
movieavgratings=ratings.groupby('movieId').mean("rating")
display(movieavgratings)

movieId,avg(rating)
71530,3.242138364779874
106002,3.4040785498489425
29,3.952230046948357
474,3.732506901677639
1950,4.073789504204117
2529,3.620414108014486
3506,3.25
5409,2.834375
5556,1.8467336683417088
6721,3.7312295973884657


##Task 2 - Building a recommendation model

This task involved using the full ratings dataset to train a recommender model in order to predict the movie ratings for users. It was first of all necessary to to import the following libraries to carry out this task:

In [21]:
#Import neccesary libraries
from pyspark import since, keyword_only
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import *
from pyspark.ml.common import inherit_doc
from sklearn.model_selection import train_test_split
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

It is firstly necessary to separate training and test sets so that it is possible to measure the performance on both.

In [23]:
#Split data into training data and test data. 
train, test = ratings.randomSplit([0.7, 0.3], seed=3325)

The parameters for the ALS algorithm are set using the ratings dataset and to begin with the default rank and regParam parameters. Following this the train data is fit according to the ALS algorithm.

In [25]:
#Set als model paramters
als = ALS(rank=10, regParam=0.1, userCol='userId', itemCol='movieId', ratingCol='rating')

#fit model to train data
model = als.fit(train)

Fit the ALS model to the test set and filter out null values.

In [27]:
predict_df = model.transform(test)

predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

An evaluator is then used to check the accuracy of this model using the root mean square error (RMSE) methodology which returns the following value:

In [29]:
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predicted_ratings_df)
print("rmse on test data = %g" % rmse)

Now applying the cross validation method with the varying rank parameter for the ALS algorithm, we initially set the rank parameter  to be three different values: 5, 10 and 20:

In [31]:
als = ALS(rank=10, regParam=0.1, userCol='userId', itemCol='movieId', ratingCol='rating')

paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5,10,20]) \
    .build()
    
crossval1 = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating", predictionCol="prediction"),
                          numFolds=10)

This cross-validation model is then fitted to the train data.

In [33]:
cvModel1 = crossval1.fit(train)

Similarly to the model above, another cross validation model is produced however this time with varying regParam parameter of values 0.05, 0.1, 0.5 which is again fitted to the train data.

In [35]:
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [0.05,0.1,0.5]) \
    .build()
    
crossval2 = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating", predictionCol="prediction"),
                          numFolds=10)

cvModel2 = crossval2.fit(train)

We now find the best parameters for rank and regParam from each cross-validation model:

In [37]:
cvModel1.bestModel.rank
cvModel1.bestModel._java_obj.parent().getRank()

cvModel2.bestModel.rank
cvModel2.bestModel._java_obj.parent().getRegParam()

A value of 5 for rank and 0.05 for regParam are found to be the most suitable parameters for our cross-validation model. Another cross-validation model was then fitted using these parameters and fitted to the train data.

In [39]:
paramGrid3 = ParamGridBuilder() \
    .addGrid(als.rank, [5])\
    .addGrid(als.regParam, [0.05]) \
    .build()
    
crossval3 = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid3,
                          evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating", predictionCol="prediction"),
                          numFolds=10)

cvModel3 = crossval3.fit(train)

This cross-validation model was subsequently fitted to our test data and performance metric RMSE is used to test accuracy.

In [41]:
cv_predict_df = cvModel3.transform(test)
cv_predict_df = cv_predict_df.filter(cv_predict_df.prediction != float('nan'))
cv_rmse = evaluator.evaluate(cv_predict_df)
print("rmse on test data = %g" % cv_rmse)

Interstingly this RMSE value is actually higher than our inital model. This could be due to the cross-validator model overfitting to the train data. Also as each parameter was tested separately, they are treated as independent. This may affect the actual best parameters when they are used in tandem.

##Task 3 - Produce a small-scale version of the ratings.csv

In [44]:
import matplotlib.pyplot as plt
import pyspark.sql as sparksql
import pandas as pd

from pyspark import SparkContext, SparkConf

Enable cross joins in the spark configuration. This allows a table join used in Cmd x

In [46]:
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [47]:
ratingsURL = 'dbfs:/FileStore/tables/ratings.csv'
ratings = spark.read.csv(path = ratingsURL, inferSchema =True,header=True,mode="DROPMALFORMED",multiLine=True)
ratings.count()

Select the userId column and compile a list of all unique ids. Then, randomly sample 20% of them, which gives a sample size of roughly 27,000 users.

In [49]:
ratingsDistinct = ratings.select("userId").distinct().sample(False,0.2)
ratingsDistinct.count()

In [50]:
#ratingsDistinct.head(15)

Do an inner join between the original ratings table and the list of sampled user ids. This selects all movie ratings done by those 27K users and puts them in a new RDD

In [52]:
ratingssmall = ratingsDistinct.join(ratings,"userId","inner")
ratingssmall.count()

In [53]:
#ratingssmall.head(15)

Save the new ratings data as a parquet for later use

In [55]:
ratingssmall.write.parquet("dbfs:/FileStore/tables/ratings-small.parquet")

##Task 4 - Generate a user-user network from the ratings dataset

In [57]:
import matplotlib.pyplot as plt
import pyspark.sql as sparksql
import pandas as pd

from pyspark import SparkContext, SparkConf

from functools import reduce
from pyspark.sql.functions import col, lit, when
from pyspark.sql.functions import monotonically_increasing_id
from graphframes import *

Load in the parquet twice, so that we can perform database operations between them to filter the data. This sacrifices memory in return for processing time

In [59]:
ratingsURL = 'dbfs:/FileStore/tables/ratings-small.parquet'
ratings_one = spark.read.parquet(ratingsURL)
ratings_two = spark.read.parquet(ratingsURL)
ratings_one.count()

Rename the ID columns so that they may still be uniquely identified post-join

In [61]:
ratings_one = ratings_one.withColumnRenamed("userId","userId1")
ratings_two = ratings_two.withColumnRenamed("userId","userId2")

Peform an inner join between the two datasets. Only keep rows in which the movie ID is the same in each database. This will produce a database of users linked to other users by movieId. To prevent duplicate rows, and rows in which the user has rated the same movie as themselves, only rows in which the first user is strictly less than (numerically or alphabetically - details are left to spark) the second are kept.

In [63]:
useruserdf = ratings_one.join(ratings_two,(ratings_one["userId1"] < ratings_two["userId2"]) & (ratings_one["movieId"] == ratings_two["movieId"]),"inner")
useruserdf.count()

In [64]:
useruserdf.printSchema()

In [65]:
#useruserdf.head(10)

The nodes and edges needed for a future graph are created:

In [67]:
nodes = ratings_one.select("userId1").distinct()
edges = useruserdf.select("userId1","userId2")

Count the number of times a row appears with a unique user - user combination and save that figure. This number is the number of movies these users have both rated, and represents the weight of the edge between them

In [69]:
edgeweights = edges.groupby("userId1","userId2").count()
#edgeweights.count()

In [70]:
edgeweights.printSchema()

Rename the column to reflect the fact that count is a representation of weight. Remove all rows in which the weight is less than two, to prevent the graph being overconnected and therefore unprocessable.

In [72]:
edgeweights_original=edgeweights
edgeweights = edgeweights.withColumnRenamed("count","weight")
edgeweights = edgeweights.filter(edgeweights.weight > 2)

In [73]:
edgeweights_original.count()

In [74]:
edgeweights.count()

##Task 5 - Discover the connected components of the graph and select the largest for the next task

The nodes and edges from Task 4, are correlated into a graph using the GraphFrame command

In [77]:
#Rename nodes and edges to be able to produce graph
new_nodes=nodes.withColumn("id",monotonically_increasing_id())
new_nodes=new_nodes.withColumnRenamed("userId1","name")
new_edgeweights=edgeweights.withColumnRenamed("userId1","src")
new_edgeweights=new_edgeweights.withColumnRenamed("userId2","dst")
new_edgeweights=new_edgeweights.withColumnRenamed("count","relationship")
#Create graph with new nodes and edges
g = GraphFrame(new_nodes, new_edgeweights)

We find the connected components within the graph and these are displayed below in tabular form:

In [79]:
#find the connected componets
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
result = g.connectedComponents()
#display(result)

Code below can be used for finding strongly connected components. This code was not used as it takes a significant amount of time to run.

In [81]:
#strong_result = g.stronglyConnectedComponents(maxIter=10)
#display(strong_result.select("id", "component"))

As the above code does not work, this workaround was used. This code finds all components, then counts the number of times a component appears and orders it by that count.

In [83]:
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
result = g.connectedComponents()

In [84]:
result.groupBy('component').count().orderBy('count', ascending=False).show()

The strongest connected component can then be extracted manually by calling it directly.

In [86]:
names=result.filter(result.component == 2)

In [87]:
names.show()

The original edges could then be filitered by the id's within the strongly connected component. This would give you the connections within this component. This was not carried out, as we were unsure of how to code this wihin spark.

##Task 6 - Implementing the Girvan-Newman Algorithm

To create and test the GN algorithm, a fictional small sample graph is used (as stated in the brief). Sample vertices indicates the 7 nodes of this test graph labeled a-g. Sample edges between these nodes are also created, with source and destination specified. A total of 18 edges were created in total.

In [91]:
#Task 6 Sample GraphFrame
sample_vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])
sample_edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "a", "friend"),
  ("a", "c", "friend"),  
  ("c", "a", "friend"),
  ("b", "c", "friend"),
  ("c", "b", "friend"),
  ("b", "d", "friend"),
  ("d", "b", "friend"),
  ("d", "e", "friend"),
  ("e", "d", "friend"),
  ("d", "g", "friend"),
  ("g", "d", "friend"),
  ("e", "f", "friend"),
  ("f", "e", "friend"),
  ("g", "f", "friend"),
  ("f", "g", "friend"),
  ("d", "f", "friend"),
  ("f", "d", "friend")
], ["src", "dst", "relationship"])
sample_g = GraphFrame(sample_vertices, sample_edges)

For the algorithm, the relationship variable was removed from the edges data. An adjaceny list was then made for all of the nodes, indicating which nodes were directly connected.

In [93]:
edges = sample_edges.select("src", "dst")
adjacency_list = edges.rdd.groupByKey().mapValues(list).collect()

Girvan-Newman Algorithm based on multiple lists. Nodes are internally referenced by a numerical ID which always corresponds to the index to speed up processing time. Inputs are the adjacency list, the (numeric) node you wish to use as the root and the number of edges in the adjacency list (comments are provided to describe each step) :

In [95]:
#Step 2 - Traverse Bredth wise from a node. Label depth of node and amount of times node reached in same number of steps as level
def search(adjacency,start_node,n_edges):
  #As dataframes don't seem to work, just maintain lists of variables. Index of variable remains constant through each list.
  local_n = 0
  nodes = [adjacency[start_node][0]]
  #Parents are stored by their index in nodes list
  parents = [None]
  levels = [0]
  paths = [0]
  credits = [1]
  
  #Initialise at starting node
  current_node = start_node
  #index is absolute index in above lists of current nodes. Current nodes is the index in the adjacency list of the current node
  index = 0
  
  #Continue calculating until all edges have been considered.
  while local_n < n_edges:
    
    #Find index of current node in adjacency list
    current_node = nodes[index]
    for i in range(0,len(adjacency)):
      if nodes[index] == adjacency[i][0]:
        current_node = i
        break
        
    #For all leaf nodes of current node
    for i in range(0,len(adjacency[current_node][1])):
      
      #Check if leaf node already found. If so, initialise values for that node.
      if adjacency[current_node][1][i] not in nodes:
        nodes.append(adjacency[current_node][1][i])
        parents.append([index])
        levels.append(levels[index] + 1)
        paths.append(1)
        credits.append(1)
        
      #If node already found
      else:
        #Find index of leaf node in node list
        for j in range(0,len(nodes)):
          if nodes[j] == adjacency[current_node][1][i]:
                      node_index = j
                      break
        
        #If node is at greater depth than current node, update information about node
        #As search is breadth first, it should be impossible to find a node which is less deep than current node or which is more than one level deeper
        if levels[node_index] == levels[index] + 1:
          parents[node_index].append(index)
          #For clean comparison, paths is kept. Actual value of paths is inherently the length of the lsit of parents by construction
          paths[node_index] = paths[node_index] + 1
                       
      #Update number of edges
      local_n = local_n + 1
    #Analyse next node
    index = index + 1
  
  #From bottom of tree, feed credits of leaf to parent based on number of paths to leaf
  for i in range(len(nodes)-1,0,-1):
    for j in range(0,len(parents[i])):
      credits[parents[i][j]] = credits[parents[i][j]] + (credits[i]/paths[i])
  #print(nodes)
  #print(parents)
  #print(levels)
  #print(paths)
  #print(credits)
  
  results = []
  for i in range(0,len(nodes)):
    results.append([nodes[i], credits[i]])
  #return([nodes,credits])
  return(results)

This algorithm is then applied for each node (in this algorithm, the node chosen, is the starting node in the BFS method). Node a is shown as an example:

In [97]:
g=search(adjacency_list,0,18)
a=search(adjacency_list,1,18)
e=search(adjacency_list,2,18)
c=search(adjacency_list,3,18)
b=search(adjacency_list,4,18)
d=search(adjacency_list,5,18)
f=search(adjacency_list,6,18)
a

These weightings, for all nodes, can then be added together and divided by 2. This will indicate which nodes are of most imporatnce in the graph. The connections betwen these important nodes will illustrate the communities present within the graph. The code to do this is not present, as we were not sure how to programme this operation within Spark.

To test possible scalibility of the algorithm, the number of nodes and edges for the test graph was doubled. This is shown below:

In [100]:
#Scalibility
scale_vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60),
  ("h", "Hanna", 75),
  ("i", "Isak", 45),
  ("j", "Jeffery", 26),
  ("k", "Kris", 73),
  ("l", "Lucy", 34),
  ("m", "Mason", 12),
  ("n", "Newt", 62)],
  ["id", "name", "age"])
scale_edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "a", "friend"),
  ("a", "c", "friend"),  
  ("c", "a", "friend"),
  ("b", "c", "friend"),
  ("c", "b", "friend"),
  ("b", "d", "friend"),
  ("d", "b", "friend"),
  ("d", "e", "friend"),
  ("e", "d", "friend"),
  ("d", "g", "friend"),
  ("g", "d", "friend"),
  ("e", "f", "friend"),
  ("f", "e", "friend"),
  ("g", "f", "friend"),
  ("f", "g", "friend"),
  ("d", "f", "friend"),
  ("f", "d", "friend"),
  ("f", "h", "friend"),
  ("h", "i", "friend"),
  ("i", "h", "friend"),
  ("h", "j", "friend"),  
  ("j", "h", "friend"),
  ("i", "j", "friend"),
  ("j", "i", "friend"),
  ("i", "k", "friend"),
  ("k", "i", "friend"),
  ("k", "l", "friend"),
  ("l", "k", "friend"),
  ("k", "n", "friend"),
  ("n", "d", "friend"),
  ("l", "m", "friend"),
  ("m", "l", "friend"),
  ("n", "m", "friend"),
  ("m", "n", "friend"),
  ("k", "m", "friend"),
  ("m", "k", "friend")
], ["src", "dst", "relationship"])
scale_g = GraphFrame(scale_vertices, scale_edges)

Similarly to before, the edges are filtered and an adjacency list is created. The time scale for these operations, is similar to the original test graph.

In [102]:
scale_edges = scale_edges.select("src", "dst")
scale_adjacency_list = scale_edges.rdd.groupByKey().mapValues(list).collect()

The algorithm is then applied to this new adjacency list, as shown below:

In [104]:
search(scale_adjacency_list,0,37)

The time scale for the application of the algorithm to this new adjacency list is similar to that of the original test graph. This shows that on this scale (doubling the nodes/edges) there is little difference in the computational time.

To further test scalability, the algorithm was applied to a subset of edges (with 100 taken) from the original graph of the movie ratings data.

In [107]:
edges_test=new_edgeweights.orderBy('src')
edges_test=edges_test.select("src", "dst")
edges_test=edges_test.limit(100)

In [108]:
test_adjacency_list = edges_test.rdd.groupByKey().mapValues(list).collect()

In [109]:
search(test_adjacency_list,0,100)

Again the time taken to apply the algorithm to this subset of movie ratings data was similar to that of the test graphs. However the creation of the adjacency list took much longer. This could indicate that while the algorithm has potentially good scalability, the task needed to create an adjacency list is not.

It should be noted, that even in this test, only 100 data points are being investigated. A better test of scalability would to test against a much larger set of data points. This was not possible in this report, due to insufficent computing power.