#TASK 1

In [2]:
#importing the dataset
import pandas as pd
ratingsURL = 'https://csc8101storageblob.blob.core.windows.net/datablobcsc8101/ratings.csv'
ratings = spark.createDataFrame(pd.read_csv(ratingsURL))

## Data Exploration

In [4]:
# total number of data
ratings.count()

###Graphical and Numerical Summaries

#### Ratings Histogram

In [7]:
display(ratings)

userId,movieId,rating,timestamp
1,2,3.5,1112486027
1,29,3.5,1112484676
1,32,3.5,1112484819
1,47,3.5,1112484727
1,50,3.5,1112484580
1,112,3.5,1094785740
1,151,4.0,1094785734
1,223,4.0,1112485573
1,253,4.0,1112484940
1,260,4.0,1112484826


#### Average number of ratings per users

In [9]:
# To calculate the average number of ratings per user, the ratings dataframe is grouped by the userId and the entries per user counted. 
# In the next command the created dataframe with number of ratings per user is then used to calculate the average. 
from pyspark.sql.functions import avg
ratingsPerUser = ratings.select("userId", "rating").groupBy("userId").count()
ratingsPerUser.show(5)

In [10]:
ratingsPerUser.agg({"count": 'avg'}).show()

#### Average number of ratings per movie

In [12]:
# Calculating the average number of ratings per movie is similarly done to the average number of ratings per user, only that the ratings dataframe is this time grouped by the movieId. 
ratingsPerMovie = ratings.select("movieId", "rating").groupBy("movieId").count()
ratingsPerMovie.show(5)

In [13]:
ratingsPerMovie.agg({"count": 'avg'}).show()

#### Histogram of the distribution of movie ratings per user

In [15]:
# Creating the two dataframes 'ratingsPerUser' and 'ratingsPerMovie' simplify the creation of a histogram showing the distribution of the ratings. 
display(ratingsPerUser)

userId,count
9233,49
9458,120
9715,32
9945,163
9968,28
9978,21
10156,242
10422,185
10871,44
10959,26


#### Histogram showing the distribution of movie ratings per user

In [17]:
display(ratingsPerMovie)

movieId,count
2529,12702
474,18836
26,2755
72011,3600
29,8520
45726,1017
60756,1467
71429,310
4823,3304
5385,929


# TASK 2

In [19]:
#loading necessary library
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

#### Data Cleaning

In [21]:
# timestamp column is dropped as it will not be useful for the rest of analysis.
ratings = ratings.drop('timestamp')

#### Test Train split

In [23]:
# The data will be split into a trainning for the fit and a testing set for the evaulation later. A 50% 50% train and test split is used here as with the rest of the models used for this task.
(train, test) = ratings.randomSplit([0.50, 0.50], seed = 1234)

#### ALS Construction

In [25]:
# The ALS is constructed by using the userIds to represent users, the movieIds to represent the items and the ratings column to represent the rating to fill.  Since the rating is not negative it, the non negative is True and preferences are not implicit. Also, a cold start strategy of drop is used to resolve the cold start problem (users with no reviews).

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          nonnegative = True, implicitPrefs = False,
          coldStartStrategy = 'drop')

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

#### Tuning Hyperparameters using Cross Validation

In [27]:
# By using k fold cross validation, we can compare ALS models using different hyper parameters to pick the ideal hyperparameters. For ALS, the parameters we will tune will be the rank & regParam which refers to the number of features to discover throughout the run.
# Grid
paramGrid = ParamGridBuilder() \
           .addGrid(als.rank, [10, 5, 1]) \
           .addGrid(als.regParam, [0.001,0.01,0.1])\
           .build()

#### Cross Validation and Best Model

In [29]:
cv = CrossValidator(
  estimator= als, estimatorParamMaps=paramGrid,
  evaluator=evaluator, numFolds=3)

In [30]:
# Running cross validation and choosing best model
cvmodel = cv.fit(train)
bestModel = cvmodel.bestModel

In [31]:
# Rank and regParam of model
print("rank: ", bestModel.rank, "\nregParam: ", bestModel._java_obj.parent().getRegParam())

#### Best Model Results

In [33]:
predictions = bestModel.transform(test)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [34]:
display(predictions)

userId,movieId,rating,prediction
53338,148,1.0,2.8997886
92852,148,3.0,2.69553
44979,148,3.0,2.9774113
1259,148,5.0,3.0970263
41389,148,3.0,3.5407796
60081,148,2.0,2.9708133
77165,148,3.0,2.6584544
59570,148,3.0,2.8659701
64843,148,3.5,2.7303085
101628,148,1.0,2.603192


The model seems to quite well here considering the sparsity of the matrix, at least according to the root mean square error. However, perhaps the algorithm can be improved by feeding it's communities of users instead of the whole users datasets, where users of said communities are likely to have similar prefereneces and hence predicting ratings would be easier. This will be done using Girwan Newman in the remaining tasks.

# TASK 3

In [37]:
# Loading libraries
import pandas as pd
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql import Row
from operator import add
from collections import Counter

In [38]:
# Taking 27000 users from the dataset
sampleUsers1 = ratings.select("userId").distinct().sample(fraction=0.2)
list = sampleUsers1.take(27000)
sampleUsers = spark.createDataFrame(list,['userId'])
sampleUsers.count()

In [39]:
# creating ratings_small file for parquet
from pyspark.sql.functions import when
ratings_small = ratings[ratings["userId"].isin(sampleUsers.toPandas()["userId"].tolist())]
ratings_small.count()

In [40]:
#dbutils.fs.rm('small-rating.parquet', True) 

In [41]:
# saving ratings_small as parquet file. This won't work if file already exists.
ratings_small.write.parquet("ratings-small.parquet")

# TASK 4

In [43]:
# dropping 'timestamp' & 'rating' as they are not necessary for analysis for rest of the project.
ratings_small = ratings_small.drop('timestamp')
ratings_small = ratings_small.drop('rating')

#### Dataframes

In [45]:
# Two dataframes will be used to help divide the task up, each representing one of the user ids in the edge
ratings_small = spark.read.parquet("ratings-small.parquet")
df1 = ratings_small.select("movieId", "userId")
df2 = ratings_small.select("movieId", "userId").withColumnRenamed("movieId", "movieId2").withColumnRenamed("userId", "userId2")
UserMap = df1.join(df2, df1.movieId == df2.movieId2)

In [46]:
UserMap = UserMap.select("userId", "userId2")
UserMap.show(5)

In [47]:
UserMap.count()

#### Edges

In [49]:
# finding duplicate edges
from pyspark.sql.functions import col, sum
edgesWithDuplicates = UserMap.groupBy(UserMap.columns).count().filter(UserMap.userId != UserMap.userId2)
edgesWithDuplicates.orderBy("count", ascending=False).show(5)

In [50]:
# total number of duplicate edges
edgesWithDuplicates.count()

In [51]:
# Set User 1 to the smaller id and user2 to the larger id.... then remove duplicates
edgesRDD = edgesWithDuplicates.select("userId", "userId2", "count").rdd.map(lambda x: (min(x["userId"], x["userId2"]), max(x["userId"], x["userId2"]), x["count"]))

In [52]:
edges1 = spark.createDataFrame(edgesRDD, ["src", "dst", "count"])
edges = edges1.dropDuplicates()
edges.count()

In [53]:
edges.agg({"count": 'avg'}).show()

In [54]:
# filtering the edges which are greater than average
edgesFiltered = edges.filter("count>15")
edgesFiltered.count()

#### Vertices

In [56]:
verticesList = edgesFiltered.select("src").distinct().toPandas().values.tolist() + edgesFiltered.select("dst").distinct().toPandas().values.tolist()
verticesFiltered = spark.createDataFrame(verticesList, ["id"]).distinct()
verticesFiltered.show(5)

In [57]:
verticesFiltered.count()

In [58]:
import os,sys
import pyspark.sql.functions as f

In [59]:
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import*

#### Graph

In [61]:
# graph
graph = GraphFrame(verticesFiltered, edgesFiltered)
display(graph.edges)

# TASK 5

In [63]:
sc.setCheckpointDir("dbfs:/tmp/group1/checkpoint")
# Computes the connected components of the graph.
connectedComp = graph.connectedComponents()  # DataFrame with new vertices column “component”
connectedComp.count()
display(connectedComp.sort("component", ascending = False))

id,component
61135,3
116612,3
60882,3
27469,3
48899,3
15322,3
68463,3
92357,3
101756,3
102454,3


#### Subgraph

In [65]:
# finding max component for subgraph
max_component = connectedComp.filter(connectedComp["component"]==3)
edges_max_component_src = edges.join(max_component,max_component.id == 
                          edgesFiltered.src,'inner').select(edgesFiltered.src,edgesFiltered.dst)
edges_max_component = edges_max_component_src.join(max_component,max_component.id == 
                          edges_max_component_src.dst,'inner').select(edgesFiltered.src,edgesFiltered.dst)
display(edges_max_component)

src,dst
270,1697
1145,1697
1371,1697
1480,1697
113,1697
167,1697
1599,1697
130,1697
734,1697
1415,1697


In [66]:
edges_max_component.write.mode('overwrite').parquet("edges_max_component.parquet")

In [67]:
test = spark.read.parquet("edges_max_component.parquet")
display(test)

src,dst
48,85
3,85
60,85
27,85
63,85
12,85
22,85
25,85
61,85
78,85


In [68]:
g2 = (connectedComp.describe("component").filter("summary = 'max'").select("component").collect()[0].asDict()['component'])

# TASK 6

In [70]:
import networkx as nx
import pandas as pd
import numpy as np
#from collections import OrderedDict

In [71]:
# set the number of top_k
top_k = 3

#### Sample graph for testing GN implementation.

In [73]:
vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])

In [74]:
edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "a", "friend"),
  ("a", "c", "friend"),  
  ("c", "a", "friend"),
  ("b", "c", "friend"),
  ("c", "b", "friend"),
  ("b", "d", "friend"),
  ("d", "b", "friend"),
  ("d", "e", "friend"),
  ("e", "d", "friend"),
  ("d", "g", "friend"),
  ("g", "d", "friend"),
  ("e", "f", "friend"),
  ("f", "e", "friend"),
  ("g", "f", "friend"),
  ("f", "g", "friend"),
  ("d", "f", "friend"),
  ("f", "d", "friend")
], ["src", "dst", "relationship"])

In [75]:
src = edges.toPandas()["src"].tolist()
dst = edges.toPandas()["dst"].tolist()
edges_new = pd.DataFrame()
edges_new['src'] = src
edges_new['dst'] = dst
edges_new[['src','dst']]
G=nx.from_pandas_edgelist(edges_new,'src','dst')

In [76]:
# list all the nodes of the graph
G.number_of_nodes()
G.nodes

In [77]:
def _single_source_shortest_path_basic(G,s):
    S = [] # S is a dictionary. It stores the visited nodes
    P = {} # A dictionary to store parent node
    for v in G:
        P[v] = []
    sigma = dict.fromkeys(G, 0.0)    # sigma[v]=0 for v in G
                                    # set label for each node with 0 
    D = {}  #store the shortest path for each node to the begin node 
    sigma[s] = 1.0 # starting from vertex s and set the label of 1
    D[s] = 0 #initialise all the node to the begin node is 0
    Q = [s] # stack
    while Q:   # use BFS to find shortest paths
        v = Q.pop(0)
        S.append(v)
        Dv = D[v]
        sigmav = sigma[v]
        for w in G[v]:
            if w not in D:
                Q.append(w)
                D[w] = Dv + 1
                print("D[w] = ",w,D[w])
            if D[w] == Dv + 1:   # this is a shortest path, count paths
                sigma[w] += sigmav
                print("sigma[w] = ",w,sigma[w])
                P[w].append(v)  # predecessors
    print (S)
    print(P)
    print(sigma)
    return S, P, sigma 

In [78]:
def betweenness(G):
    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G
    # b[e]=0 for e in G.edges()
    betweenness.update(dict.fromkeys(G.edges(), 0.0))
    for s in G:
        # single source shortest paths
        # use BFS
        S, P, sigma = _single_source_shortest_path_basic(G,s)
        # accumulation
        betweenness = _accumulate_edges(betweenness, S, P, sigma, s)
    # rescaling
    for n in G:  # remove nodes to only return edges
        del betweenness[n]
    betweenness = _rescale_e(betweenness, len(G))
    return betweenness

In [79]:
betweenness(G)

In [80]:
def _accumulate_edges(betweenness, S, P, sigma, s):
    delta = dict.fromkeys(S, 0)
    while S:
        w = S.pop()
        coeff = (1 + delta[w]) / sigma[w]
        for v in P[w]:
            c = sigma[v] * coeff
            if (v, w) not in betweenness:
                betweenness[(w, v)] += c
            else:
                betweenness[(v, w)] += c
            delta[v] += c
        if w != s:
            betweenness[w] += delta[w]
    return betweenness

In [81]:
# rescaling
def _rescale_e(betweenness, n):
    scale = 0.5
      for v in betweenness:
         betweenness[v] *= scale
    return betweenness

In [82]:
# compute the edge betweenness

def CmtyGirvanNewmanStep(G):
    init_ncomp = nx.number_connected_components(G)    #number of components
    ncomp = init_ncomp
    while ncomp <= init_ncomp:
        #return a dict betweenness as the value
#         bw = nx.edge_betweenness(G, weight='weight')    #edge betweenness for G 
        bw = betweenness(G)
        print("current betweenness of each edge")
        print(bw)
        #find the edge with max centrality
        max_ = max(bw.values())
        #find the edge with the highest centrality and remove all of them if there is more than one!
        for k, v in bw.items():
            if float(v) == max_:
                G.remove_edge(k[0],k[1])    #remove the central edge
        ncomp = nx.number_connected_components(G)    #recalculate the no of components

In [83]:
# This method runs GirvanNewman algorithm and list communities as output after removing the top-K edges
def runGirvanNewman(G,k):
    while k>0:    
        CmtyGirvanNewmanStep(G)
        comps = list(nx.connected_components(G)) 
        print("communities:")
        print(comps)
        k = k-1
        if G.number_of_edges() == 0:
            break

In [84]:
def main():
    runGirvanNewman(G,top_k)
if __name__ == "__main__":
      main()

##Girvan-Newman algorithm for subgraph

In [86]:
#create networkx graph
subG = spark.read.parquet("edges_max_component.parquet")
sub_src = subG.toPandas()["src"].tolist()
sub_dst = subG.toPandas()["dst"].tolist()
edges_sub = pd.DataFrame()
edges_sub['src'] = sub_src
edges_sub['dst'] = sub_dst
edges_sub[['src','dst']]
sub_G=nx.from_pandas_edgelist(edges_sub,'src','dst')

In [87]:
runGirvanNewman(sub_G,top_k)