In [None]:
%env PYTHONHASHSEED 3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

from math import sqrt
import pyspark
from pyspark.sql import *

env: PYTHONHASHSEED=3


In [None]:
from pyspark import SparkContext, SparkConf
import os

directory = "/content/drive/My Drive/twitter"

spark = SparkSession.builder.master("local[*]").appName('Twitter Analysis').config(
    "spark.executor.memory", "1g").config("spark.ui.port", "4050"
        ).getOrCreate()
sc = spark.sparkContext

from google.colab import drive
drive.mount('/content/drive')

raw_edges = sc.textFile('/content/drive/My Drive/twitter_analysis/edges_rdd.txt') #This is our pre-processed file containing all our twitter graph edges.

files = [a_file for a_file in os.listdir(directory) if a_file.endswith(".edges")]
ego_users = [int(os.path.splitext(file)[0]) for file in files] #Generates list of all ego users, with which to filter the PageRank output

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def get_sources_and_destinations(raw_edges_file):
  edges_rdd = raw_edges_file.map(lambda x: (int(x.split(',')[0].split('r=')[1].strip("'")), int(x.split(',')[1].split('d=')[1].split(')')[0].strip("'"))))
  #edges_rdd = edges.map(lambda x: tuple(x.split(',')))
  #edges_rdd = edges_rdd.map(lambda x: x[0].split("'")[1]).map(lambda x: (int(x.split()[0]), int(x.split()[1]))) #Formats our RDD to (source, destination) pairs of graph nodes
  graph_rdd = edges_rdd.groupByKey().map(lambda x: (x[0], list(x[1])))
  return graph_rdd

[(228270980,
  [228270980,
   18996905,
   14925700,
   157829215,
   175671498,
   189753564,
   196680777,
   328590469,
   27633075,
   18996905,
   34428380,
   72818790,
   86221475,
   112090416,
   284133650,
   17116707,
   28465635,
   303600495,
   151338729,
   376946114,
   328590469,
   290309885,
   303600495,
   455186682,
   56860418,
   314316607,
   376946114,
   107830991,
   17868918,
   272881486,
   121533789,
   41824048,
   81049395,
   19705747,
   34428380,
   302847930,
   317076339,
   69548480,
   18996905,
   242771870,
   440749539,
   302847930,
   116036694,
   455186682,
   83943787,
   123371682,
   149538028,
   67864340,
   117674417,
   175671498,
   151338729,
   232818548,
   116036694,
   227679610,
   28465635,
   290309885,
   297801196,
   173770873,
   15205600,
   56312991,
   41824048,
   116036694,
   117946816,
   123371682,
   72818790,
   99467249,
   40981798,
   314316607,
   69592091,
   354142522,
   135552009,
   149538028,
   678

In [None]:
def get_col_trans_matrix(graph_rdd):
  def get_length(destinations):
    destinations = set(destinations)
    output = {}
    for item in destinations:
      output[item] = 1/len(destinations)
    return output
  col_matrix = graph_rdd.map(lambda x: (x[0], get_length(x[1])))
  return col_matrix

In [None]:
def col_to_row_matrix(col_trans_matrix):
    row_matrix = col_trans_matrix.flatMap(lambda column: ((row, (column[0], column[1][row])) for row in column[1])).groupByKey().sortByKey()
    return row_matrix

In [None]:
def row_multiply(row, R):
    result = 0
    for column, value in row:
      if column in R:
        result += value * R[column]
    return result

In [None]:
def produce_main_input(file):
  return col_to_row_matrix(get_col_trans_matrix(get_sources_and_destinations(file)))
input = produce_main_input(raw_edges) #Takes a small sample (5% of original) of the full dataset for the purpose of testing
input.persist() #Caches to make future calls to the RDD faster

PythonRDD[2711] at RDD at PythonRDD.scala:53

In [None]:
def page_rank_main2(input, iterations=90, convergence_threshold=0.001):
    graph_rows = input
    N = graph_rows.count()
    damping_factor = 0.85
    R = graph_rows.map(lambda x: (x[0], 1/N)).collectAsMap()
    previous_R = R.copy()
    for t in range(iterations):
        vecR = sc.broadcast(R)
        row_results = graph_rows.map(lambda kv: (kv[0], row_multiply(kv[1], vecR.value)))
        R = row_results.reduceByKey(lambda a, b: damping_factor*a + (1 - damping_factor)/N + b).collectAsMap()
        # Check if values have converged
        delta = sum(abs(R[i] - previous_R[i]) for i in R.keys())
        if delta < convergence_threshold:
            break # Stops the loop, preventing unnecessary iterations
        previous_R = R.copy()
    # Sort by rank and return the top 10 results
    top_results = row_results
    graph_rows.unpersist()
    return top_results

In [None]:
output = page_rank_main2(input).filter(lambda x: x[0] in ego_users).sortBy(lambda x: -x[1])
#print(list(input.take(10)[0][1]))

#input.foreach(check_input_format)

In [None]:
influential_users = output.take(20)

[(15924858, 4.90058107820006e-08),
 (14719129, 3.207965894969852e-08),
 (53235381, 2.7570931641620476e-08),
 (11928542, 2.2523779847808495e-08),
 (215824411, 1.8603808367007002e-08),
 (90880254, 1.677947212834481e-08),
 (18996905, 1.637088554005256e-08),
 (262310943, 1.5883770268231367e-08),
 (100318079, 1.3887877080591807e-08),
 (24542441, 1.3149872984118173e-08),
 (345569115, 1.266690683134676e-08),
 (333881828, 1.1180297928444672e-08),
 (9855382, 1.0847822098161042e-08),
 (18951737, 9.157765110039011e-09),
 (7861312, 8.77741493135869e-09),
 (9460682, 7.362634503030724e-09),
 (16674149, 7.3367907959959455e-09),
 (20446839, 6.841133262197326e-09),
 (73298877, 6.841133262197326e-09),
 (158419434, 6.553332510696499e-09)]

In [None]:
# This implementation has been modified from the one produced in lab 4, to add a convergence threshold + account for potential dead ends and spider traps (since I can't
# guarantee whether the input graph is cyclical or not).
# Originally, I was aiming to have a convergence threshold of 0.0001, but in wanting to reduce the runtime of my algorithm - I made it 0.001.
def page_rank_main(input, iterations=100, convergence_threshold=0.001):
  graph_rows = input.persist() # Caches the RDD to speed up future calls
  N = graph_rows.count()
  R = graph_rows.map(lambda x: (x[0], int(1/N))).collectAsMap()
  previous_R = R.copy()
  for t in range(iterations):
    vecR = sc.broadcast(R)
    row_results = graph_rows.map(lambda kv: (kv[0], row_multiply(kv[1],vecR.value)))
    R = row_results.collectAsMap()
    # Check if values have converged
    delta = sum(abs(R[i] - previous_R[i]) for i in R.keys())
    if delta < convergence_threshold:
      break # Stops the loop, preventing unnecessary iterations
    previous_R = R.copy()
  print("R:",sorted(R.items()))
  return row_results.sortBy(lambda kv: -kv[1]).take(10)

In [None]:
%config NotebookApp.iopub_data_rate_limit=100000000