# ETL with PySpark SQL

### Importing and creating SparkSession

In [None]:
import os, sys
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
# Create a SparkSession. No need to create SparkContext
# You automatically get it as part of the SparkSession
spark = SparkSession.builder\
                .master("local[*]")\
                .appName("ETL")\
                .config("spark.executor.logs.rolling.time.interval", "daily")\
                .getOrCreate()

# there are some config you might want to set:
# https://spark.apache.org/docs/latest/configuration.html

# now we can go to http://localhost:4040 (default port) in order to see Spark's web UI

## Setting filesystem and files

In [4]:
#print(os.getcwd()) # show current working directory

#datasetDir = "../../datasets/" # local files
datasetDir = "hdfs://localhost:19000/" # hadoop filesystem

In [15]:
# load all CSV's files from HiggsTwitter dataset (http://snap.stanford.edu/data/higgs-twitter.html)

# First, we set the filename
file = datasetDir + "HiggsTwitter/higgs-social_network.edgelist.gz"
# Second, it's recommended to specify the dataframe's schema to avoid spark calculate it
schema = StructType([StructField("follower", IntegerType()), StructField("followed", IntegerType())])
# Finally, we create the dataframe with previous variables
# Also we can specifying the separator with 'sep' (default separator for CSV is ',')
socialDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-retweet_network.edgelist.gz"
schema = StructType([StructField("tweeter", IntegerType()), StructField("tweeted", IntegerType()), StructField("occur", IntegerType())])
retweetDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-reply_network.edgelist.gz"
schema = StructType([StructField("replier", IntegerType()), StructField("replied", IntegerType()), StructField("occur", IntegerType())])
replyDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-mention_network.edgelist.gz"
schema = StructType([StructField("mentioner", IntegerType()), StructField("mentioned", IntegerType()), StructField("occur", IntegerType())])
mentionDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-activity_time.txt.gz"
schema = StructType([StructField("userA", IntegerType()), \
                     StructField("userB", IntegerType()), \
                     StructField("timestamp", IntegerType()), \
                    StructField("interaction", StringType())])
                    #Interaction can be: RT (retweet), MT (mention) or RE (reply)
activityDF = spark.read.csv(path=file, sep=" ", schema=schema)

### Convert CSV's dataframes to Apache Parquet files

In [None]:
socialDF.write.save(datasetDir + "HiggsTwitter/higgs-social_network.parquet")
retweetDF.write.save(datasetDir + "HiggsTwitter/higgs-retweet_network.parquet")
replyDF.write.save(datasetDir + "HiggsTwitter/higgs-reply_network.parquet")
mentionDF.write.save(datasetDir + "HiggsTwitter/higgs-mention_network.parquet")
activityDF.write.save(datasetDir + "HiggsTwitter/higgs-activity_time.parquet")

### Load the parquet files into new dataframes

In [5]:
socialDFpq = spark.read.load(datasetDir + "HiggsTwitter/higgs-social_network.parquet")
retweetDFpq = spark.read.load(datasetDir + "HiggsTwitter/higgs-retweet_network.parquet")
replyDFpq = spark.read.load(datasetDir + "HiggsTwitter/higgs-reply_network.parquet")
mentionDFpq = spark.read.load(datasetDir + "HiggsTwitter/higgs-mention_network.parquet")
activityDFpq = spark.read.load(datasetDir + "HiggsTwitter/higgs-activity_time.parquet")

### Working with dataframes

In [6]:
# two ways of showing dataframe's schema
socialDFpq.printSchema()
socialDFpq.schema

root
 |-- follower: integer (nullable = true)
 |-- followed: integer (nullable = true)



StructType(List(StructField(follower,IntegerType,true),StructField(followed,IntegerType,true)))

In [7]:
# showing some data from dataframes
socialDFpq.show(3)
retweetDFpq.show(3)
replyDFpq.show(3)
mentionDFpq.show(3)
activityDFpq.show(3)

+--------+--------+
|follower|followed|
+--------+--------+
|       1|       2|
|       1|       3|
|       1|       4|
+--------+--------+
only showing top 3 rows

+-------+-------+-----+
|tweeter|tweeted|occur|
+-------+-------+-----+
| 298960| 105232|    1|
|  95688|   3393|    1|
| 353237|  62217|    1|
+-------+-------+-----+
only showing top 3 rows

+-------+-------+-----+
|replier|replied|occur|
+-------+-------+-----+
| 161345|   8614|    1|
| 428368|  11792|    1|
|  77904|  10701|    1|
+-------+-------+-----+
only showing top 3 rows

+---------+---------+-----+
|mentioner|mentioned|occur|
+---------+---------+-----+
|   316609|     5011|    1|
|   439696|    12389|    1|
|    60059|     6929|    1|
+---------+---------+-----+
only showing top 3 rows

+------+------+----------+-----------+
| userA| userB| timestamp|interaction|
+------+------+----------+-----------+
|223789|213163|1341100972|         MT|
|223789|213163|1341100972|         RE|
|376989| 50329|1341101181|       

## Spark SQL using DataFrames API

In [9]:
# Users who have most followers
socialDFpq.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

# Users who have most mentions
mentionDFpq.groupBy("mentioned").agg(count("occur").alias("mentions")).orderBy(desc("mentions")).show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

+---------+--------+
|mentioned|mentions|
+---------+--------+
|       88|   11953|
|      677|    3906|
|     2417|    2533|
|    59195|    1601|
|     3998|    1587|
+---------+--------+
only showing top 5 rows



In [10]:
# Of the top 5 followed users, how many mentions has each one?

# top_f contains "top 5 users who have most followers"
top_f = socialDFpq.groupBy("followed").agg(count("follower").alias("followers")).orderBy(desc("followers")).limit(5)

top_f.join(mentionDFpq, top_f.followed == mentionDFpq.mentioned)\
    .groupBy(top_f.followed, top_f.followers)\
        .agg(sum(mentionDFpq.occur).alias("mentions"))\
    .orderBy(desc("followers")).show()

+--------+---------+--------+
|followed|followers|mentions|
+--------+---------+--------+
|    1503|    51386|     150|
|     206|    48414|     397|
|      88|    45221|   15687|
|     138|    44188|     347|
|    1062|    40120|      84|
+--------+---------+--------+



## Spark SQL using SQL language

In [11]:
# create temporary views so we can use SQL statements
socialDFpq.createOrReplaceTempView("social")
retweetDFpq.createOrReplaceTempView("retweet")
replyDFpq.createOrReplaceTempView("reply")
mentionDFpq.createOrReplaceTempView("mention")
activityDFpq.createOrReplaceTempView("activity")

In [12]:
# Users who have most followers
spark.sql("select followed, count(follower) as followers from social group by followed order by followers desc").show(5)

# Users who have most mentions
spark.sql("select mentioned, count(occur) as mentions from mention group by mentioned order by mentions desc").show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

+---------+--------+
|mentioned|mentions|
+---------+--------+
|       88|   11953|
|      677|    3906|
|     2417|    2533|
|    59195|    1601|
|     3998|    1587|
+---------+--------+
only showing top 5 rows



In [13]:
# Of the top 5 followed users, how many mentions has each one?
spark.sql("""
select 5_top_f.followed, 5_top_f.followers, sum(m.occur) as mentions
    from 
        -- subquery that contains top 5 of followed users
        (select followed, count(follower) as followers from social group by followed order by followers desc limit 5) 5_top_f, 
        mention as m
    where 5_top_f.followed = m.mentioned
    group by 5_top_f.followed, followers
    order by followers desc
        """).show()

+--------+---------+--------+
|followed|followers|mentions|
+--------+---------+--------+
|    1503|    51386|     150|
|     206|    48414|     397|
|      88|    45221|   15687|
|     138|    44188|     347|
|    1062|    40120|      84|
+--------+---------+--------+



## Performance testing

### GZIP Compressed CSV file vs Parquet file

In [16]:
%%time
# GZIP Compressed CSV
socialDF.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 20.8 s


In [17]:
%%time
# Parquet file
socialDFpq.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 5.19 s


### Cached DF vs not cached DF

This time we will cache the 2 previous dataframes (socialDF and socialDFpq) and see how faster is.

In [18]:
# cache dataframes
socialDF.cache()
socialDFpq.cache()

# remove from cache
#socialDF.unpersist()
#socialDFpq.unpersist()

DataFrame[follower: int, followed: int]

- Note: The first time we run cached dataframes can be slower, but the next times they should run faster.

In [21]:
%%time
# GZIP Compressed CSV (dataframe cached)
socialDF.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 3.82 s


In [24]:
%%time
# Parquet file (dataframe cached)
socialDFpq.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 3.75 s


### Accessing local files vs local HDFS (Hadoop Distributed File System)

Using only 1 cluster should make no real difference accessing local files or local HDFS.

#### Local file

In [25]:
datasetDir = "../../datasets/" # local files
file = datasetDir + "HiggsTwitter/higgs-social_network.edgelist.gz"
schema = StructType([StructField("follower", IntegerType()), StructField("followed", IntegerType())])

socialDFLocal = spark.read.csv(path=file, sep=" ", schema=schema)

In [35]:
%%time
# Local file with one worker thread
socialDFLocal.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 16.4 s


#### Local HDFS file

In [30]:
datasetDir = "hdfs://localhost:19000/" # hadoop filesystem
file = datasetDir + "HiggsTwitter/higgs-social_network.edgelist.gz"
schema = StructType([StructField("follower", IntegerType()), StructField("followed", IntegerType())])

socialDFHDFS = spark.read.csv(path=file, sep=" ", schema=schema)

In [34]:
%%time
# Local file with one worker thread
socialDFHDFS.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 18.2 s
