# Learning Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, DecimalType, StructType, StructField

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35), ("Brooke", 25)], ["name", "age"])

In [4]:
data_df.show(3)

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
+------+---+


## Schemas

In [5]:
# Using the Spark DataFrame API
schema = StructType([
    StructField("author", StringType(), False),
    StructField("title", StringType(), False),
    StructField("pages", IntegerType(), False),
])

# Using DDL
schema = "author STRING, title STRING, pages INT"

In [6]:
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
    [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter","LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web","twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
]

In [7]:
spark = SparkSession.builder.appName("example_schema").getOrCreate()

blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()
print(blogs_df.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (

In [8]:
blogs_df.schema

StructType([StructField('Id', IntegerType(), True), StructField('First', StringType(), True), StructField('Last', StringType(), True), StructField('Url', StringType(), True), StructField('Published', StringType(), True), StructField('Hits', IntegerType(), True), StructField('Campaigns', ArrayType(StringType(), True), True)])

In [10]:
schema = StructType([
    StructField(name="id", dataType=IntegerType(), nullable=True),
    StructField("nome", StringType(), True),
    StructField("email", StringType(), True),
    StructField("genero", StringType(), True),
    StructField("cidade", StringType(), True),
    StructField("pais", StringType(), True),
    StructField("empresa", StringType(), True),
    StructField("salario", DecimalType(), True),
])

DATASET_PATH = './funcionarios.json'
employee_df = spark.read.json(DATASET_PATH)
employee_df.show(5)

## Columns and expressions

In [33]:
from pyspark.sql.functions import expr, col, concat, asc, desc

In [12]:
blogs_df.columns

['Id', 'First', 'Last', 'Url', 'Published', 'Hits', 'Campaigns']

In [26]:
# Access a particular column with col and it returns a Column type
blogs_df.select(col('Id'))

DataFrame[Id: int]

In [27]:
# Use an expression to compute a value
blogs_df.select(expr('Hits * 2')).show(5)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
|     15318|
|     21136|
|     81156|
+----------+


In [28]:
# Use col to compute value
blogs_df.select(col('Hits')*2).show(5)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
|     15318|
|     21136|
|     81156|
+----------+


In [29]:
# Use an expression to compute big hitters for blogs
# This adds a new column, Big Hitters, based on the conditional expression
blogs_df.withColumn('Big Hitters', (expr('Hits > 10000'))).show()

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|       true|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|       true|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+


In [37]:
# Concatenate three columns, create a new column, and show the newly created concatenated column

blogs_df.withColumn('AuthorsId', concat(expr('First'), expr('Last'), expr('Id'))).select(col('AuthorsId')).show(4)

+-------------+
|    AuthorsId|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
+-------------+


In [39]:
# The statements below return the same value, showing that expr is the same as a col method call
blogs_df.select(expr('Hits')).show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+


In [41]:
blogs_df.select(col('Hits')).show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+


In [42]:
blogs_df.select('Hits').show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+


In [48]:
# Sorting
blogs_df.sort(blogs_df.Id.desc()).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+


In [49]:
blogs_df.orderBy(blogs_df.Id.desc()).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+


In [51]:
blogs_df.sort('Id', ascending=False).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+


In [53]:
blogs_df.sort(desc('Id')).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+


## Rows

In [10]:
from pyspark.sql import Row

In [11]:
blog_row = Row(6, 'Reynold', 'Xin', 'https://tinyurl.6', 255568, '3/2/2015', ['twitter', 'Linkedin'])

In [13]:
blog_row

<Row(6, 'Reynold', 'Xin', 'https://tinyurl.6', 255568, '3/2/2015', ['twitter', 'Linkedin'])>

In [14]:
blog_row[1]

'Reynold'

In [15]:
rows = [Row('Matei Zaharia', 'CA'), Row('Reynold Xin', 'CA')]
authors_df = spark.createDataFrame(rows, ['Authors', 'State'])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+


## Common DataFrame Operations

In [16]:
from pyspark.sql.types import *

In [19]:
fire_schema = StructType([
    StructField(name='CallNumber', dataType=StringType(), nullable=True),
    StructField(name='UnitId', dataType=StringType(), nullable=True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
])

In [25]:
dataset_path = './Fire_Incidents.csv'
df_fire = spark.read.schema(fire_schema).csv(path=dataset_path, header=True)

In [26]:
df_fire.select('StationArea', 'IncidentNumber').show(5)

+-----------+--------------+
|StationArea|IncidentNumber|
+-----------+--------------+
|         11|      80283040|
|         37|      80283030|
|         01|      80283090|
|         36|      80283140|
|         14|      80283190|
+-----------+--------------+


In [29]:
parquet_file_path = './parquet_format'
df_fire.write.format('parquet').save(parquet_file_path)

In [30]:
table_name = 'table_format'
df_fire.write.format('parquet').save(table_name)

In [34]:
df_fire = df_fire.select('IncidentNumber', 'AvailableDtTm', 'CallType').where(col('CallType') != 'Medical Incident')
df_fire.show()

+--------------+-------------------+--------------------+
|IncidentNumber|      AvailableDtTm|            CallType|
+--------------+-------------------+--------------------+
|      80283040|2008-04-01T18:15:19|       150 Elsie St.|
|      80283030|2008-04-01T18:06:30|       85 Turner Tr.|
|      80283090|2008-04-01T18:45:23|         175 6th St.|
|      80283140|2008-04-01T19:08:39|       633 Hayes St.|
|      80283190|2008-04-01T19:23:48|27th Av. / Cabril...|
|      80283370|2008-04-01T20:31:41|    165 Belgrave Av.|
|      80283290|2008-04-01T20:12:29|Grant Av. / Post St.|
|      80283500|2008-04-01T21:18:32|Cortland Av. / An...|
|      80283550|2008-04-01T22:00:33|2nd St. / Brannan...|
|      80283520|2008-04-01T21:24:23|      300 Ortega St.|
|      80283590|2008-04-01T22:37:29|         241 6th St.|
|      80283710|2008-04-01T23:22:46|Mendell St. / New...|
|      80281120|2008-04-01T00:28:41|      982 Market St.|
|      80281140|2008-04-01T00:39:01|   1485 Bayshore Bl.|
|      8028127