# MDS Project
# Extracting Semantics from Billion Edge Graphs
# Author: Parth Goel (pg514)

In [1]:
# Importing the necessary libraries
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.sql.types import StructType,StructField, IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import col
import networkx as nx
import ast
import os
import matplotlib.pyplot as plt
import csv

In [2]:
spark = SparkSession.builder.master("local").appName("Project_group5").getOrCreate()
spark.sparkContext.getConf().setAll([('spark.executor.memory', '40g'), ('spark.cores.max', '20'), ('spark.driver.memory','40g')])

<pyspark.conf.SparkConf at 0x7f191ca57820>

In [3]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
from __future__ import print_function

## 1. Data Preparation

### 1.1 Reading the movies_labels.csv

In [4]:
# Reading the movie_label.csv
movies_label = spark.read.csv("movies_label.csv")
movies_label = movies_label.toDF('vertex_id', 'vertex_tag_label')

In [5]:
# Displaying the movies labels
# _c0: vertex id; _c1: vertex tag label
movies_label.show()

+---------+-------------------+
|vertex_id|   vertex_tag_label|
+---------+-------------------+
|    17827|            officer|
|    18716|       conscription|
|   188853|    naked in public|
|    22078|      public nudity|
|    43001|   male rear nudity|
|    20676|        male nudity|
|    21844|male frontal nudity|
|    12417|            soldier|
|    11870|               army|
|   317967|            f rated|
|    28047|         voice mail|
|    15119|      disappearance|
|    13676|            monster|
|   180654| man versus monster|
|    17434|           nobleman|
|    23220|             switch|
|    82211|          role swap|
|    12753|      role reversal|
|   317991|     egocentric man|
|   290367|        selfish man|
+---------+-------------------+
only showing top 20 rows



In [6]:
# Displaying the count of movies_label
movies_label.count()

218269

### 1.2 Getting all the files in the Data Folder

In [7]:
# Get the names of all the files in the data folder
path = "Data"
file_names = os.listdir(path)

### 1.3 Finding all the distinct layers

In [8]:
# Finding all the distinct layers in the dataset
layers = []
for files in file_names:
    layers.append(files.split("-")[1])
distinctLayers = set(layers)
print("Distinct Layers: ", distinctLayers)
print("no of Distinct Layers: ", len(distinctLayers))

Distinct Layers:  {'56', '35', '19', '21', '44', '17', '18', '42', '6', '5', '9', '47', '26', '29', '101', '13', '37', '27', '53', '527', '8', '2', '185', '84', '39', '119', '88', '34', '24', '12', '16', '10', '31', '38', '3114', '3', '22', '61', '283', '123', '62', '66', '51', '87', '55', '58', '33', '50', '1', '30', '76', '54', '23', '52', '4', '57', '15', '25', '60', '11', '1246', '28', '98', '7', '20', '14', '32', '43', '414', '48', '36', '40', '757', '41', '86', '2070', '105', '46'}
no of Distinct Layers:  78


### 1.4 Iterating through all the layers one by one to read wcc-lcc mapping files and the edges files and making the edges-lcc connection and saving it in files

In [9]:
# Iterating through all the layers
for layer in distinctLayers:
    # Iniitializing an empty RDD
    emptyRDD = spark.sparkContext.emptyRDD()

    # Creating the schema for the empty Dataframe
    schema = StructType([
      StructField('source', IntegerType(), True),
      StructField('target', IntegerType(), True),
      StructField('waveIndex', IntegerType(), True),
      StructField('wcc', IntegerType(), True),
      StructField('fragmentIndex', IntegerType(), True),
      ])

    # Initializing the empty Dataframe using the empty RDD and schema
    merged_edges = spark.createDataFrame(emptyRDD, schema)
    # merged_edges.printSchema()
    
    # Iterating through the files
    for files in file_names:
        # Checking if it is the correct layer
        if files.startswith("layer-" + layer + "-"):
            # Checking if it is the wcc-lcc mapping file
            if files.endswith('.wcc-lcc'):
                wcc_lcc_mapping = spark.read.csv("Data/" + files)
                wcc_lcc_mapping = wcc_lcc_mapping.toDF('wcc', 'lcc')
            # Checking if it is the edges file
            elif files.endswith('.csv'):
                edges = spark.read.csv("Data/" + files)
                edges = edges.toDF('source', 'target', 'waveIndex', 'wcc', 'fragmentIndex')
                merged_edges = merged_edges.union(edges)
    
    # Adding a lcc column in the merged edges dataframe
    merged_edges.createOrReplaceTempView("merged_edges")
    wcc_lcc_mapping.createOrReplaceTempView("wcc_lcc_mapping")
    edges_lcc_mapping = spark.sql("select merged_edges.source, merged_edges.target, merged_edges.waveIndex, merged_edges.wcc, merged_edges.fragmentIndex, wcc_lcc_mapping.lcc from merged_edges left join wcc_lcc_mapping on merged_edges.wcc = wcc_lcc_mapping.wcc")

    # Finding the degrees of all the vertices
    temp_edges = edges_lcc_mapping.drop("target", "waveIndex", "wcc", "fragmentIndex", "lcc")
    df_degree = temp_edges.groupBy('source').count()
    df_degree = df_degree.toDF("vertex", "count")
    
    # Saving the Degree of the vertices
    df_degree.coalesce(1).write.options(header='True', delimiter=',').csv("Degrees/layer-" + layer)
    
    # Removing Duplicates
    # Adding a sum and product column to find duplicates
    edges_lcc_mapping = edges_lcc_mapping.withColumn("sum", col("source") + col("target"))
    edges_lcc_mapping = edges_lcc_mapping.withColumn("product", col("source") * col("target"))
    
    # Dropping the rows in which sum and product is same to get every edge once
    edges_lcc_mapping = edges_lcc_mapping.dropDuplicates(['sum', 'product'])
    
    # Dropping the sum and product columns as we dont need it anymore
    edges_lcc_mapping = edges_lcc_mapping.drop("sum", "product")
    
    # Saving the undirected graph edges
    edges_lcc_mapping.coalesce(1).write.options(header='True', delimiter=',').csv("Undirected Edges/layer-" + layer)

### 1.5 Saving the edges layer wise in files but this time connecting the vertices both ways instead of using degrees to find direction

In [8]:
# Iterating through all the layers
for layer in distinctLayers:
    # Iniitializing an empty RDD
    emptyRDD = spark.sparkContext.emptyRDD()

    # Creating the schema for the empty Dataframe
    schema = StructType([
      StructField('source', IntegerType(), True),
      StructField('target', IntegerType(), True),
      StructField('waveIndex', IntegerType(), True),
      StructField('wcc', IntegerType(), True),
      StructField('fragmentIndex', IntegerType(), True),
      ])

    # Initializing the empty Dataframe using the empty RDD and schema
    merged_edges = spark.createDataFrame(emptyRDD, schema)
    # merged_edges.printSchema()
    
    # Iterating through the files
    for files in file_names:
        # Checking if it is the correct layer
        if files.startswith("layer-" + layer + "-"):
            # Checking if it is the wcc-lcc mapping file
            if files.endswith('.wcc-lcc'):
                wcc_lcc_mapping = spark.read.csv("Data/" + files)
                wcc_lcc_mapping = wcc_lcc_mapping.toDF('wcc', 'lcc')
            # Checking if it is the edges file
            elif files.endswith('.csv'):
                edges = spark.read.csv("Data/" + files)
                edges = edges.toDF('source', 'target', 'waveIndex', 'wcc', 'fragmentIndex')
                merged_edges = merged_edges.union(edges)
    
    # Adding a lcc column in the merged edges dataframe
    merged_edges.createOrReplaceTempView("merged_edges")
    wcc_lcc_mapping.createOrReplaceTempView("wcc_lcc_mapping")
    edges_lcc_mapping = spark.sql("select merged_edges.source, merged_edges.target, merged_edges.waveIndex, merged_edges.wcc, merged_edges.fragmentIndex, wcc_lcc_mapping.lcc from merged_edges left join wcc_lcc_mapping on merged_edges.wcc = wcc_lcc_mapping.wcc")
    
    # Saving the undirected graph edges
    edges_lcc_mapping.coalesce(1).write.options(header='True', delimiter=',').csv("Undirected Edges Two-way/layer-" + layer)

### 1.6 Making directed edges and saving to files 

In [26]:
# Mapping edges to RDD from dataframe
def mapper(x):
    source = x.source
    target = x.target
    waveIndex = x.waveIndex
    wcc = x.wcc
    fragmentIndex = x.fragmentIndex
    lcc = x.lcc
    return (source, target, waveIndex, wcc, fragmentIndex, lcc)

In [30]:
# Directing edges depending on number of neighbours
def directEdges(x, degree_dict):
    source = int(x[0])
    target = int(x[1])
    waveIndex = int(x[2])
    wcc = int(x[3])
    fragmentIndex = int(x[4])
    lcc = int(x[5])
    if degree_dict[source] <= degree_dict[target]:
        return (source, target, waveIndex, wcc, fragmentIndex, lcc)
    else:
        return (target, source, waveIndex, wcc, fragmentIndex, lcc)

In [37]:
# Converting the undirected graph to directed graph using number of neighbours
for layer in distinctLayers:
    degree_path = "Degrees/layer-" + layer
    undirected_edges_path = "Undirected Edges/layer-" + layer
    
    degree_file_names = os.listdir(degree_path)
    undirected_edges_file_names = os.listdir(undirected_edges_path)
    
    for file in degree_file_names:
        if file.endswith(".csv"):
            degree = spark.read.options(header='True').csv(degree_path + "/" + file)
        
    for file in undirected_edges_file_names:
        if file.endswith(".csv"):
            undirected_edges = spark.read.options(header='True').csv(undirected_edges_path + "/" + file)
    
    # Mapping all the edges
    edges = undirected_edges.rdd.map(lambda x: mapper(x))
    
    # Converting number of neighbours dataframe to dictionary
    degree_dict = degree.rdd.map(lambda x: (int(x["vertex"]), int(x["count"]))).collectAsMap()
    
    # Directing edges depending on degree
    directed_edges_rdd = edges.map(lambda x: directEdges(x, degree_dict))
    
    # Converting rdd to dataframe
    directed_edges_df = directed_edges_rdd.toDF(['source', 'target', 'waveIndex', 'wcc', 'fragmentIndex', 'lcc'])
    
    # Saving the undirected graph edges
    directed_edges_df.coalesce(1).write.options(header='True', delimiter=',').csv("Directed Edges/layer-" + layer)

### 1.7 Saving the edges buildings wise in files

In [50]:
# Iterating through directed edges layer file to store it building wise
for layer in distinctLayers:
    directed_edge_path = "Directed Edges/layer-" + layer
    directed_edges_file_names = os.listdir(directed_edge_path)
    
    for file in directed_edges_file_names:
        if file.endswith(".csv"):
            directed_edges = spark.read.options(header='True').csv(directed_edge_path + "/" + file)
    
    # Finding the distinct lcc values (i.e., buildings)
    buildings = directed_edges.select('lcc').distinct().collect()
    
    # Iterating through buildings to find all the edges
    for building in buildings:
        # Making a SQL query to find all the rows of the same lcc
        directed_edges.createOrReplaceTempView("de")
        building_edges = spark.sql("select * from de where lcc = {}".format(building[0]))
        
        # Saving the undirected graph edges per building
        building_edges.coalesce(1).write.mode('append').options(header='True', delimiter=',').csv("Building Edges/building-" + building[0])

### 1.8 Saving the edges building wise in files with Two-way connection

In [16]:
# Iterating through undirected edges two-way layer file to store it building wise
for layer in distinctLayers:
    undirected_edge_path = "Undirected Edges Two-way/layer-" + layer
    undirected_edges_file_names = os.listdir(undirected_edge_path)
    
    for file in undirected_edges_file_names:
        if file.endswith(".csv"):
            undirected_edges = spark.read.options(header='True').csv(undirected_edge_path + "/" + file)

    # Finding the distinct lcc values (i.e., buildings)
    buildings = undirected_edges.select('lcc').distinct().collect()
    
    # Iterating through buildings to find all the edges
    for building in buildings:
        # Making a SQL query to find all the rows of the same lcc
        undirected_edges.createOrReplaceTempView("ude")
        building_edges = spark.sql("select * from ude where lcc = {}".format(building[0]))
        
        # Saving the undirected graph edges per building
        building_edges.coalesce(1).write.mode('append').options(header='True', delimiter=',').csv("Building Edges Two-way/building-" + building[0])

### 1.9 Finding distinct vertices in buildings and saving it along with their tags

In [9]:
# Iterating through undirected edges two-way layer file to store it building wise
for layer in distinctLayers:
    undirected_edge_path = "Undirected Edges Two-way/layer-" + layer
    undirected_edges_file_names = os.listdir(undirected_edge_path)
    
    for file in undirected_edges_file_names:
        if file.endswith(".csv"):
            undirected_edges = spark.read.options(header='True').csv(undirected_edge_path + "/" + file)
            
    # Dropping the uneccessary columns
    undirected_edges = undirected_edges.drop("target", "waveIndex", "wcc", "fragmentIndex")
    
    # Dropping rows where the source and the lcc are same to get every vertex only once
    distinct_vertices = undirected_edges.dropDuplicates(['source', 'lcc'])
    
    # Mapping all the vertices with their tag names from movies_label.csv
    movies_label.createOrReplaceTempView("movies_label")
    distinct_vertices.createOrReplaceTempView("distinct_vertices")
    vertex_tags_mapping = spark.sql("select distinct_vertices.source as vertex, distinct_vertices.lcc, movies_label.vertex_tag_label from distinct_vertices left join movies_label on distinct_vertices.source = movies_label.vertex_id")
    
    # Finding the distinct lcc values (i.e., buildings)
    buildings = vertex_tags_mapping.select('lcc').distinct().collect()
    
    vertex_tags_mapping.createOrReplaceTempView("vtm")
    
    # Iterating through buildings to find all the edges
    for building in buildings:
        # Making a SQL query to find all the rows of the same lcc
        building_tags = spark.sql("select * from vtm where lcc = {}".format(building[0]))
        
        # Saving the undirected graph edges per building
        building_tags.coalesce(1).write.mode('append').options(header='True', delimiter=',').csv("Building Tags/building-" + building[0])

## 2. Page Rank

### 2.1 Iterating over all the buildings and implementing Page Rank on directed edges and saving in files

In [18]:
# Mapping edges to RDD from dataframe
def edge_mapper(x):
    u = x.source
    v = x.target
    return (int(u), int(v))

In [7]:
# Get the names of all the files in the Building Edges folder
be_path = "Building Edges"
be_file_names = os.listdir(be_path)

In [58]:
# Iterating over building files
for file in be_file_names:
    building_edges_file_names = os.listdir("Building Edges/" + file)

    for csv_file in building_edges_file_names:
        if csv_file.endswith(".csv"):
            building_edges = spark.read.options(header='True').csv("Building Edges/" + file + "/" + csv_file)
    
    # Dropping uneccessary columns
    building_edges = building_edges.drop("waveIndex", "wcc", "fragmentIndex", "lcc")
    
    # Converting df to rdd returning source and target
    building_edges_rdd = building_edges.rdd.map(lambda x: edge_mapper(x))
    
    # Converting rdd to list
    building_edges_list = building_edges_rdd.collect()
    
    # Building a Directed Graph
    G = nx.DiGraph()
    
    # Adding all the edges to the Graph
    G.add_edges_from(building_edges_list)
    
    # Implementing Page Rank
    pr = nx.pagerank(G)
    
    # Converting pr to list to sort it
    pr_list = []
    for key, value in pr.items():
        pr_list.append([key, value])
    
    # Sorting the Page Rank 
    sorted_pr_list = sorted(pr_list, key=lambda x: (x[1], x[0]))
    sorted_pr_list.reverse()
    
    # Writing the sorted page rank in a file
    with open('Page Rank/' + file, 'w') as f:
          
        # using csv.writer method from CSV package
        write = csv.writer(f)

        write.writerow(['vertexID', 'pageRank'])
        write.writerows(sorted_pr_list)

### 2.2 Iterating over all the buildings and implementing Page Rank on Two-way edges and saving in files

In [19]:
# Get the names of all the files in the Building Edges Two-way folder
bet_path = "Building Edges Two-way"
bet_file_names = os.listdir(bet_path)

In [20]:
# Iterating over building files
for file in bet_file_names:
    building_edges_file_names = os.listdir("Building Edges Two-way/" + file)

    for csv_file in building_edges_file_names:
        if csv_file.endswith(".csv"):
            building_edges = spark.read.options(header='True').csv("Building Edges Two-way/" + file + "/" + csv_file)
    
    # Dropping uneccessary columns
    building_edges = building_edges.drop("waveIndex", "wcc", "fragmentIndex", "lcc")
    
    # Converting df to rdd returning source and target
    building_edges_rdd = building_edges.rdd.map(lambda x: edge_mapper(x))
    
    # Converting rdd to list
    building_edges_list = building_edges_rdd.collect()
    
    # Building a Directed Graph
    G = nx.DiGraph()
    
    # Adding all the edges to the Graph
    G.add_edges_from(building_edges_list)
    
    # Implementing Page Rank
    pr = nx.pagerank(G)
    
    # Converting pr to list to sort it
    pr_list = []
    for key, value in pr.items():
        pr_list.append([key, value])
    
    # Sorting the Page Rank 
    sorted_pr_list = sorted(pr_list, key=lambda x: (x[1], x[0]))
    sorted_pr_list.reverse()
    
    # Writing the sorted page rank in a file
    with open('Page Rank Two-way/' + file, 'w') as f:
          
        # using csv.writer method from CSV package
        write = csv.writer(f)

        write.writerow(['vertexID', 'pageRank'])
        write.writerows(sorted_pr_list)

## 3. Streaming Kmeans Clustering

### 3.1 Installing and importing the library to return optimal number of clusters

In [4]:
! pip install --user gap-stat
from gap_statistic import OptimalK



### 3.2 Iterating over all the building tag files to implement Kmeans Clustering

In [5]:
# Importing the text vectorization library
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
from sklearn.cluster import KMeans

In [6]:
# # Initializing spark context
# sc = SparkContext.getOrCreate()
# ssc = StreamingContext(sc, 1)

In [7]:
# Get the names of all the files in the Building Tags folder
bt_path = "Building Tags"
bt_file_names = os.listdir(bt_path)

In [8]:
# Mapping tags to RDD from dataframe
def tags_mapper(x):
    u = x.vertex_tag_label
    return (u)

In [10]:
# # Iterating over all the building tag files to implement Streaming Kmeans Clustering
# for file in bt_file_names:
#     building_tags_file_names = os.listdir("Building Tags/" + file)

#     for csv_file in building_tags_file_names:
#         if csv_file.endswith(".csv"):
#             building_tags = spark.read.options(header='True').csv("Building Tags/" + file + "/" + csv_file)
            
#     # Dropping the lcc column as we don't need it now
#     building_tags = building_tags.drop("lcc")
    
#     # Converting df to rdd returning vertex_tag_label
#     building_tags_rdd = building_tags.rdd.map(lambda x: tags_mapper(x))

#     # Converting rdd to list
#     building_tags_list = building_tags_rdd.collect()
    
#     # Vectorizing the tags
#     vectorizer = TfidfVectorizer()
#     X = vectorizer.fit_transform(building_tags_list)
    
#     # Getting shape of matrix
#     x, y = X.shape
    
#     # Converting sparse matrix to numpy array
#     X = X.toarray()
    
#     # Finding the Optimal number of clusters
#     optimalK = OptimalK(n_jobs=4, parallel_backend='multiprocessing')
#     n_clusters = optimalK(X = X, cluster_array=np.arange(1, x))
    
#     # Converting numpy array to rdd
#     rdd = sc.parallelize(X)
    
#     # Converting rdd to dense vectors
#     trainingData = rdd.map(lambda line: Vectors.dense([float(x) for x in line]))
        
#     # Setting up training queue
#     trainingQueue = [trainingData]
    
#     # Setting up training stream
#     trainingStream = ssc.queueStream(trainingQueue)
    
#     # We create a model with random clusters and specify the optimal number of clusters to find
#     model = StreamingKMeans(k=n_clusters, decayFactor=0.8).setRandomCenters(y, 1.0, 0)
    
#     # Making a window of 10,000 on the training data
#     trainingWindow = trainingStream.window(10000, 1)
    
#     # Now training the model on training data
#     model.trainOn(trainingWindow)
    
#     # Predicting the values on the training data
#     result = model.predictOn(trainingStream)
    
#     # Saving function
#     def save_file(rdd):
#         window = rdd.zip(trainingData.map(lambda x: x.toArray().tolist()))
#         window.coalesce(1).saveAsTextFile('Cluster Predictions/' + file)
        
#     result.foreachRDD(save_file)
    
#     ssc.start()
#     ssc.stop(stopSparkContext = False, stopGraceFully = True)

In [None]:
# Iterating over all the building tag files to implement Streaming Kmeans Clustering
for i in range(len(bt_file_names)):
        
    building_tags_file_names = os.listdir("Building Tags/" + bt_file_names[i])

    for csv_file in building_tags_file_names:
        if csv_file.endswith(".csv"):
            building_tags = spark.read.options(header='True').csv("Building Tags/" + bt_file_names[i] + "/" + csv_file)
                        
    # Dropping the lcc column as we don't need it now
    building_tags = building_tags.drop("lcc")
    
    # Converting df to rdd returning vertex_tag_label
    building_tags_rdd = building_tags.rdd.map(lambda x: tags_mapper(x))

    # Converting rdd to list
    building_tags_list = building_tags_rdd.collect()
    
    # Vectorizing the tags
    vectorizer = TfidfVectorizer()
    X = vectorizer.fit_transform(building_tags_list)
        
    # Getting shape of matrix
    x, y = X.shape
    
    # Converting sparse matrix to numpy array
    X = X.toarray()
    
    # Initializing number of cluster tries
    tries = min(x, 20)
    
    # Finding the Optimal number of clusters
    optimalK = OptimalK(n_jobs=4, parallel_backend='joblib')
    if y <= 1000:
        n_clusters = optimalK(X = X, cluster_array=np.arange(1, tries))
    else:
        n_clusters = 20
            
    # Applying K means clustering
    kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(X)
        
    # Saving the clustered results    
    with open('Cluster Predictions/' + bt_file_names[i] + '.csv', 'w') as f:
        writer = csv.writer(f)
        writer.writerow(['vertex_tag_label', 'Cluster'])
        writer.writerows(zip(building_tags_list, kmeans.labels_.tolist()))

In [None]:
print("Done")