In [1]:
import csv
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [2]:
class TWITTER():
    def __init__(self, directory):
         # For each user create key/value pairs,(current_user ,#users followed by current_user)
        self.following = sc.textFile(directory).map(lambda x: (x.split(" "*4)[0], 1)) \
                                               .combineByKey((lambda x: x),
                                                             # using combiner to minimize data shuffled across servers 
                                                             (lambda x,y: x + y),
                                                             (lambda x,y: x + y)) \
                                               .persist()
        
        # For each user create key/value pairs,(current_user ,#followers)
        self.followers = sc.textFile(directory).map(lambda x: (x.split(" "*4)[1], 1)) \
                                               .combineByKey((lambda x: x),
                                                             (lambda x,y: x + y),
                                                             (lambda x,y: x + y))
    # Calculate each user’s score (N*M)
    def user_score(self):
        s = self.following.join(self.followers).mapValues(lambda x: x[0] * x[1])
        return s
    
    # Save results as CSV 
    def save_file(self, file_name):
        with open(file_name, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerows(self.user_score().collect())
        
        

In [3]:
data = TWITTER(r"C:\Users\taylankabbani2019\Downloads\Connectivity_Score\*")

print(data.user_score().take(5))

# data.save_file('Connectivity_Score')
sc.stop()

[('user4686', 293910), ('user280', 65604), ('user1611', 456918), ('user376', 233618), ('user2302', 10120)]


# Using Spark DataFrame and DataFrame operations.

In [4]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import *
from pyspark import SparkContext
sc = SparkContext()
sql = SQLContext(sc)

In [5]:
class TWITTER_df():
    def __init__(self, directory):
        # Creating DataFrame with 2 columns (‘userA’ and ‘userB’) from the input RDD
        self.df = sql.createDataFrame(sc.textFile(directory).map(lambda x: (x.split(" "*4)[0], x.split(" "*4)[1])) \
                                        .map(lambda rdd: Row(userA = rdd[0],
                                                            userB = rdd[1])))
    def user_score(self):
        
        # Dataframe represents number of users followed by each user
        following = self.df.groupBy('userA').count().withColumnRenamed('userA','user').persist()
        
        # Dataframe represents number of followers of each user
        followers = self.df.groupBy('userB').count().withColumnRenamed('userB','user')
        
        # Calculate each user’s score (N*M)
        return following.join(followers, 'user').rdd.map(lambda x: (x[0],x[1]*x[2]))

In [6]:
data = TWITTER_df(r"C:\Users\taylankabbani2019\Downloads\Connectivity_Score\*")
print(data.user_score().take(5))

[('user108', 42120), ('user1222', 395760), ('user1270', 262521), ('user1564', 423864), ('user1590', 415826)]
