# Big Data Essentials
#### L11:  Data Processing with Spark


<br>
<br>
<br>
<br>
Yanfei Kang <br>
yanfeikang@buaa.edu.cn <br>
School of Economics and Management <br>
Beihang University <br>
http://yanfei.site <br>

# Two interfaces of Spark

- RDD (before Spark 2.0)

- Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood. 

# Spark SQL, DataFrames and Datasets 

- Spark SQL is a Spark module for *structured data processing*. 

- Unlike RDD, Spark SQL provides Spark with more information about the structure of both the data and the computation being performed. 

- There are several ways to interact with Spark SQL including SQL and the Dataset API. 

## Spark SQL


- One use of Spark SQL is to execute SQL queries.

- Spark SQL can also be used to read data from an existing Hive installation.

- When running SQL from within another programming language the results will be returned as a Dataset/DataFrame.

## Spark Datasets

- A Dataset is a distributed collection of data.

- Dataset can be constructed from JVM objects and then manipulated using functional transformations (`map, flatMap, filter`, etc.). 

- The Dataset API is available in **Scala** and **Java**. 

- Python **does not have the support for the Dataset API**. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).

## Spark DataFrame

- A DataFrame is a Dataset organized into named columns. 

- It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. 

- DataFrames can be constructed from a wide array of sources such as: 

    - structured data files, 
    - tables in Hive, 
    - external databases, or 
    - existing RDDs. 
    
- The DataFrame API is available in Scala, Java, Python, and R. 

## Spark session and spark context

### What is SparkContext
Spark SparkContext is an entry point to Spark and defined in org.apache.spark package since 1.x and used to programmatically create Spark RDD, accumulators and broadcast variables on the cluster. Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object sc is default available in spark-shell and it can be programmatically created using SparkContext class.

### What is SparkSession
SparkSession introduced in version 2.0 and and is an entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame and DataSet. It’s object spark is default available in spark-shell and it can be created programmatically using SparkSession builder pattern.

## Start a Spark session


In [4]:
import findspark
findspark.init('/usr/lib/spark-current/')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark").getOrCreate()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:40395)
Traceback (most recent call last):
  File "/usr/lib/spark-current/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark-current/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:40395)

## Creating DataFrames


### Convert an RDD to a DataFrame

In [3]:
# sc.stop()
sc = spark.sparkContext # make a spark context for RDD

In [4]:
# Load a text file and convert each line to a Row.
from pyspark.sql import Row
lines = sc.textFile("/opt/apps/ecm/service/spark/3.1.2-hadoop3.2-1.0.0/package/spark-3.1.2-hadoop3.2-1.0.0/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
people


PythonRDD[2] at RDD at PythonRDD.scala:53

In [5]:
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.show()

                                                                                

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [6]:
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()

+------+---+
|  name|age|
+------+---+
|Justin| 19|
+------+---+



In [7]:
teenagers.toPandas() # We could export the Spark DataFrame to a usual Pandas DataFrame

Unnamed: 0,name,age
0,Justin,19


### Create Spark DataFrame directly from a file

In [8]:
sdf = spark.read.csv("/opt/apps/ecm/service/spark/3.1.2-hadoop3.2-1.0.0/package/spark-3.1.2-hadoop3.2-1.0.0/examples/src/main/resources/people.csv")
sdf.show() # Displays the content of the DataFrame to stdout

+------------------+
|               _c0|
+------------------+
|      name;age;job|
|Jorge;30;Developer|
|  Bob;32;Developer|
+------------------+



In [27]:
sdf2 = spark.read.json("/opt/apps/ecm/service/spark/3.1.2-hadoop3.2-1.0.0/package/spark-3.1.2-hadoop3.2-1.0.0/examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
sdf2.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



The CSV file dose not have a header of the data, but we could create a description (**schema** in Spark) for it .

In [10]:
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.

# Create a schema
schemaString = ["name", "age"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
schema = StructType(fields)
schema

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

In [11]:
sdf_withschema = spark.createDataFrame(people, schema)
sdf_withschema.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



### Export DataFrame to a local disk

In [12]:
sdf.write.mode('overwrite').csv("myspark/")
import os 
os.listdir("myspark")

['.part-00000-81087960-1ac9-4f0f-a1de-934aff5b88ac-c000.csv.crc',
 'part-00000-81087960-1ac9-4f0f-a1de-934aff5b88ac-c000.csv',
 '._SUCCESS.crc',
 '_SUCCESS']

### Read file and infer the schema from the header

In [2]:
## Load a local file 
air0 = spark.read.options(header='true', inferSchema='true').csv("/home/yanfei/lectures/data/airdelay_small.csv") 
air0

NameError: name 'spark' is not defined

In [14]:
# We specify the correct schema by hand
from pyspark.sql.types import *
schema_sdf = StructType([
        StructField('Year', IntegerType(), True),
        StructField('Month', IntegerType(), True),
        StructField('DayofMonth', IntegerType(), True),
        StructField('DayOfWeek', IntegerType(), True),
        StructField('DepTime', DoubleType(), True),
        StructField('CRSDepTime', DoubleType(), True),
        StructField('ArrTime', DoubleType(), True),
        StructField('CRSArrTime', DoubleType(), True),
        StructField('UniqueCarrier', StringType(), True),
        StructField('FlightNum', StringType(), True),
        StructField('TailNum', StringType(), True),
        StructField('ActualElapsedTime', DoubleType(), True),
        StructField('CRSElapsedTime',  DoubleType(), True),
        StructField('AirTime',  DoubleType(), True),
        StructField('ArrDelay',  DoubleType(), True),
        StructField('DepDelay',  DoubleType(), True),
        StructField('Origin', StringType(), True),
        StructField('Dest',  StringType(), True),
        StructField('Distance',  DoubleType(), True),
        StructField('TaxiIn',  DoubleType(), True),
        StructField('TaxiOut',  DoubleType(), True),
        StructField('Cancelled',  IntegerType(), True),
        StructField('CancellationCode',  StringType(), True),
        StructField('Diverted',  IntegerType(), True),
        StructField('CarrierDelay', DoubleType(), True),
        StructField('WeatherDelay',  DoubleType(), True),
        StructField('NASDelay',  DoubleType(), True),
        StructField('SecurityDelay',  DoubleType(), True),
        StructField('LateAircraftDelay',  DoubleType(), True)
    ])

air = spark.read.options(header='true').schema(schema_sdf).csv("/home/yanfei/lectures/data/airdelay_small.csv")
air

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: double, CRSDepTime: double, ArrTime: double, CRSArrTime: double, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: double, CRSElapsedTime: double, AirTime: double, ArrDelay: double, DepDelay: double, Origin: string, Dest: string, Distance: double, TaxiIn: double, TaxiOut: double, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: double, WeatherDelay: double, NASDelay: double, SecurityDelay: double, LateAircraftDelay: double]

## Descriptive  Statistics

In [15]:
air.describe().show()



144199 [Thread-4] WARN  org.apache.spark.sql.catalyst.util.package  - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-------+-------+-----------------+------------------+------------------+--------------------+----------------+--------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|              Year|             Month|       DayofMonth|         DayOfWeek|           DepTime|        CRSDepTime|           ArrTime|        CRSArrTime|UniqueCarrier|         FlightNum|           TailNum|ActualElapsedTime|    CRSElapsedTime|

                                                                                

In [16]:
air.describe(['ArrDelay']).show()



+-------+------------------+
|summary|          ArrDelay|
+-------+------------------+
|  count|           5432958|
|   mean|  6.97897995898367|
| stddev|30.191156753519472|
|    min|           -1238.0|
|    max|            1779.0|
+-------+------------------+



                                                                                

### Print the schema in a tree format

In [17]:
air.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: double (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Carrier

### Select columns

In [18]:
air.select(["ArrDelay","AirTime","Distance"]).show()

+--------+-------+--------+
|ArrDelay|AirTime|Distance|
+--------+-------+--------+
|     2.0|   25.0|   127.0|
|    29.0|  248.0|  1623.0|
|     8.0|   null|   622.0|
|    -2.0|   70.0|   451.0|
|    11.0|  133.0|  1009.0|
|    13.0|  177.0|  1562.0|
|   -12.0|  181.0|  1589.0|
|    11.0|  364.0|  2611.0|
|    13.0|   53.0|   304.0|
|     9.0|   null|   888.0|
|    -8.0|  293.0|  2537.0|
|    15.0|   null|  1723.0|
|   -14.0|   null|  1736.0|
|    55.0|  285.0|  1927.0|
|    23.0|  149.0|   991.0|
|    64.0|   35.0|   193.0|
|    29.0|   25.0|    77.0|
|    -8.0|   null|   447.0|
|    -6.0|   91.0|   678.0|
|    35.0|  127.0|   998.0|
+--------+-------+--------+
only showing top 20 rows



In [19]:
air.select(air['UniqueCarrier'], air['ArrDelay']>0).show()

+-------------+--------------+
|UniqueCarrier|(ArrDelay > 0)|
+-------------+--------------+
|           XE|          true|
|           CO|          true|
|           AA|          true|
|           WN|         false|
|           CO|          true|
|           AA|          true|
|           DL|         false|
|           AA|          true|
|           US|          true|
|           AA|          true|
|           AS|         false|
|           UA|          true|
|           TW|         false|
|           NW|          true|
|           NW|          true|
|           AA|          true|
|           DH|          true|
|           WN|         false|
|           AA|         false|
|           CO|          true|
+-------------+--------------+
only showing top 20 rows



In [20]:
# group data with respect to some columns 
air.groupBy(["UniqueCarrier","DayOfWeek"]).count().show() 

                                                                                

+-------------+---------+------+
|UniqueCarrier|DayOfWeek| count|
+-------------+---------+------+
|           PS|        6|   406|
|           CO|        4| 55764|
|       ML (1)|        7|   442|
|           XE|        4| 14896|
|           TZ|        4|  1455|
|           OO|        3| 17310|
|           EA|        7|  6197|
|           OO|        4| 17666|
|           F9|        2|  1679|
|           EA|        5|  6295|
|           HA|        5|  1519|
|           UA|        4| 89272|
|           EV|        4|  9729|
|           DL|        6|106031|
|           FL|        5|  6962|
|           YV|        3|  4165|
|           AQ|        2|  1035|
|       ML (1)|        2|   502|
|           DL|        3|110827|
|           YV|        6|  3828|
+-------------+---------+------+
only showing top 20 rows



In [21]:
## Group and sort
aircount=air.groupBy("UniqueCarrier").count()
aircount.sort("count",ascending=False).show()



+-------------+------+
|UniqueCarrier| count|
+-------------+------+
|           DL|765388|
|           WN|703368|
|           AA|684522|
|           US|649056|
|           UA|611957|
|           NW|473820|
|           CO|373858|
|           TW|179081|
|           HP|173509|
|           MQ|164790|
|           AS|129863|
|           OO|120223|
|           XE| 94311|
|           EV| 67148|
|           OH| 60630|
|           FL| 47540|
|           EA| 43723|
|           PI| 41489|
|           DH| 32900|
|           B6| 29111|
+-------------+------+
only showing top 20 rows



                                                                                

### Data cleaning

In [22]:
## Returns a new DataFrame omitting rows with null values
air_without_na = air.na.drop()
air_without_na.show()



+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------



In [23]:
air_without_na.count()

                                                                                

0

In [24]:
air.count() # original file size

5548754

In [1]:
air.show()

NameError: name 'air' is not defined

In [25]:
## Replace null values
air.na.fill("unknown").show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2006|    7|         6|        4| 2055.0|    2055.0| 2150.0|    2148.0|           XE|     2619

In [26]:
air.na.replace('NA', "unknown").show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2006|    7|         6|        4| 2055.0|    2055.0| 2150.0|    2148.0|           XE|     2619

## Statistics

In [52]:
air.corr("Distance","ArrDelay")

0.008481756987561132

In [53]:
air.cov("Distance","ArrDelay")

140.57953260215643

In [54]:
air.filter(air.ArrDelay > 60).show() # filter with certain conditions 

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|    5|        17|        4| 1944.0|    1830.0| 2034.0|    1930.0|           AA|     1011

### User-defined functions

In [55]:
## air2 = air.select(["DayOfWeek","ArrDelay","AirTime","Distance"])
air2_pdf = air.select(["DayOfWeek", "ArrDelay","AirTime","Distance"]).toPandas()
air2_pdf

Unnamed: 0,DayOfWeek,ArrDelay,AirTime,Distance
0,4.0,2.0,25.0,127.0
1,7.0,29.0,248.0,1623.0
2,,,,
3,5.0,-2.0,70.0,451.0
4,7.0,11.0,133.0,1009.0
...,...,...,...,...
5548749,3.0,13.0,59.0,318.0
5548750,1.0,22.0,34.0,181.0
5548751,1.0,11.0,71.0,551.0
5548752,,,,


In [57]:
import pandas as pd
def myfun(pdf):
    out = dict() 
    out["ArrDelay"] = pdf.ArrDelay.mean()
    out["AirTime"]  = pdf.AirTime.mean()
    out["Distance"] = pdf.Distance.mean()
    
    return pd.DataFrame(out, index=[0])

myfun(air2_pdf)

Unnamed: 0,ArrDelay,AirTime,Distance
0,7.350591,102.688519,729.997977
