## Creating spark session

In [1]:
from pyspark.sql import SparkSession

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
spark = SparkSession.builder \
            .master("local[8]") \
            .appName("Higgs Twitter ETL") \
            .config("spark.some.config.option", "some-value")\
            .getOrCreate()

23/10/03 13:42:50 WARN Utils: Your hostname, me-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.42.129 instead (on interface ens33)
23/10/03 13:42:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/03 13:42:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Reading edgelist files

In [4]:
from pyspark.sql.types import IntegerType, StructField, StructType

In [13]:
schema = StructType([StructField('follower', IntegerType(), True), StructField('followed', IntegerType(), True)])
socialDF = spark.read.csv('data/higgs-social_network.edgelist.gz', sep=" ", schema=schema).dropna()
print(socialDF.count(), len(socialDF.columns), '\n')
socialDF.show(5)

[Stage 27:>                                                         (0 + 1) / 1]

14855842 2 

+--------+--------+
|follower|followed|
+--------+--------+
|       1|       2|
|       1|       3|
|       1|       4|
|       1|       5|
|       1|       6|
+--------+--------+
only showing top 5 rows



                                                                                

In [14]:
schema = StructType([StructField('retweeter', IntegerType(), True), StructField('author', IntegerType(), True)])
retweetDF = spark.read.csv('data/higgs-retweet_network.edgelist.gz', sep=" ", schema=schema).dropna()
print(retweetDF.count(), len(socialDF.columns), '\n')
retweetDF.show(5)

[Stage 33:>                                                         (0 + 1) / 1]

328132 2 

+---------+------+
|retweeter|author|
+---------+------+
|   298960|105232|
|    95688|  3393|
|   353237| 62217|
|     4974|  3571|
|   241892|     8|
+---------+------+
only showing top 5 rows



                                                                                

## Spark SQL with dataframes API

In [17]:
followerCountDF = socialDF.groupBy('followed')\
.count()\
.withColumnRenamed('followed', 'user') \
.withColumnRenamed('count', 'num_followers')\
.fillna(0)

followerCountDF.show(5)

[Stage 43:>                                                         (0 + 1) / 1]

+----+-------------+
|user|num_followers|
+----+-------------+
| 148|          738|
| 463|        10953|
| 471|         1584|
| 496|           49|
| 833|            8|
+----+-------------+
only showing top 5 rows



                                                                                

In [18]:
retweetCountDF = retweetDF\
.groupBy('author')\
.count()\
.withColumnRenamed('author', 'user') \
.withColumnRenamed('count', 'num_retweets')\
.fillna(0)

retweetCountDF.show(5)

[Stage 46:>                                                         (0 + 1) / 1]

+------+------------+
|  user|num_retweets|
+------+------------+
| 28664|         109|
|172959|           1|
|122128|           1|
|102524|           1|
| 63087|           1|
+------+------------+
only showing top 5 rows



                                                                                

In [26]:
from pyspark.sql.functions import asc, coalesce

# Outer join the dataframes and create a filled user column that can be used as an index for both dfs
userDF = followerCountDF\
.join(retweetCountDF, followerCountDF.user==retweetCountDF.user, 'outer')\
.na.fill({'num_followers': 0, 'num_retweets': 0})
userDF = userDF.withColumn('filled_user', coalesce(followerCountDF['user'], retweetCountDF['user']))
userDF.sort(asc('filled_user')).show(10)

[Stage 93:>                                                         (0 + 1) / 1]

+----+-------------+----+------------+-----------+
|user|num_followers|user|num_retweets|filled_user|
+----+-------------+----+------------+-----------+
|   1|        16280|null|           0|          1|
|   2|         4707|null|           0|          2|
|   3|          137|null|           0|          3|
|   4|         8643|   4|          77|          4|
|   5|         2194|   5|          24|          5|
|   6|        27088|   6|          83|          6|
|   7|         2146|   7|          15|          7|
|   8|        32106|   8|         841|          8|
|   9|          567|null|           0|          9|
|  10|        10204|null|           0|         10|
+----+-------------+----+------------+-----------+
only showing top 10 rows



                                                                                

In [28]:
userDataDF = userDF.select('filled_user', 'num_followers', 'num_retweets').sort(asc('filled_user'))
userDataDF.show(5)

[Stage 115:>                                                        (0 + 1) / 1]

+-----------+-------------+------------+
|filled_user|num_followers|num_retweets|
+-----------+-------------+------------+
|          1|        16280|           0|
|          2|         4707|           0|
|          3|          137|           0|
|          4|         8643|          77|
|          5|         2194|          24|
+-----------+-------------+------------+
only showing top 5 rows



                                                                                

## Spark sql with SQL

In [56]:
socialDF.createOrReplaceTempView('social')
retweetDF.createOrReplaceTempView('retweet')
userdataDF2 = spark.sql(
     """
     select
     coalesce(follower_count.user, retweet_count.user) as my_user,
     follower_count.user,
     retweet_count.user,
     follower_count.num_followers,
     retweet_count.num_retweets
     
     from 
     (select 
     followed as user, 
     count(follower) as num_followers
     from social
     group by followed) as follower_count
     
     full outer join
     
     (select 
     author as user, 
     count(retweeter) as num_retweets
     from retweet
     group by author) as retweet_count
     
     on follower_count.user = retweet_count.user
     order by my_user asc
     """
 )
userdataDF2.show(5)

[Stage 212:>                                                        (0 + 1) / 1]

+-------+----+----+-------------+------------+
|my_user|user|user|num_followers|num_retweets|
+-------+----+----+-------------+------------+
|      1|   1|null|        16280|        null|
|      2|   2|null|         4707|        null|
|      3|   3|null|          137|        null|
|      4|   4|   4|         8643|          77|
|      5|   5|   5|         2194|          24|
+-------+----+----+-------------+------------+
only showing top 5 rows



                                                                                