# 1. <a name="first-look"></a> First Look at Spark/PySpark

In the first part of this notebook we aim to explore the following using [Apache Spark](https://spark.apache.org/):
- Reading CSV files
- Spark partitions - See: [Spark Partitioning Understanding](https://sparkbyexamples.com/spark/spark-partitioning-understanding/)
- Saving data to Parquet for local experiments
- Spark Master UI

# 2. Spark DataFrames

In the second part we continue the Spark/PySpark exploration with the below topics:
- Actions vs transformations
- Functions and UDFs

## Links:
- [Jump to First Look](#first-look) 
- [Jump to Spark Dataframes](#spark-dataframes)

In [1]:
import findspark
import pandas as pd

In [2]:
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/10 16:36:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### *Note:*
*Comment out the next two cells if they were already executed and the files created.*


In [5]:
## Download data
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

In [6]:
## Unzip
!gunzip fhvhv_tripdata_2021-01.csv.gz

In [7]:
# Check number of rows
!wc -l fhvhv_tripdata_2021-01.csv

11908469 fhvhv_tripdata_2021-01.csv


In [8]:
# Read as spark
df = spark.read \
     .option("header", "true") \
     .csv("fhvhv_tripdata_2021-01.csv")

In [9]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:23:56', dropoff_datetime='2021-01-01 00:38:05', PULocationID='233', DOLocationID='142', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:42:51', dropoff_datetime='2021-01-01 00:45:50', PULocationID='142', DOLocationID='143', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:48:14', dropoff_datetime='2021-01-01 01:08:42', PULocationID='143', DOLocationID='78', SR_Flag=None)]

In [10]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

In [11]:
# Get first 1000 rows of original data and save to CSV. To be used for illustration purposes.
!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [12]:
# Read CSV to pandas in order to see what Pandas attemps to do with the columns data types.
df_pandas = pd.read_csv("head.csv")

In [13]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

## Create Spark dataframe from Pandas dataframe

In [15]:
# Create Spark dataframe from Pandas dataframe
pd.DataFrame.iteritems = pd.DataFrame.items
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

### We are going to enforce a schema in Spark
To do that we are going to use the `pyspark.sql` method from Pyspark library.

In [16]:
from pyspark.sql import types

In [17]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [18]:
# Now we are reading as Spark DataFrame but with the schema created.
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [20]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

In [24]:
df = df.repartition(24)

In [25]:
df.write.parquet('fhvhv/2021/01/')

                                                                                

# 2. <a name="spark-dataframes"></a> Spark DataFrames

In [35]:
df = spark.read.parquet('fhvhv/2021/01/')

In [36]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



### Actions vs Transformations

In Spark there is a distinction things that are executed immediately and things that are not (lazy evaluation)
The former are called `Actions` and the later `Transformations`

#### Transformations
Transformations are **lazy** (not executed immediatelly)

- Selecting columns
- Filtering
- Applying a function to a column


#### Actions
Actions are **eager** (executed right away)

- show(), head()
- write
- read

In [44]:
df.select('hvfhs_license_num', 'pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0005') \
  .show()

+-----------------+-------------------+-------------------+------------+------------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-----------------+-------------------+-------------------+------------+------------+
|           HV0005|2021-01-03 17:17:21|2021-01-03 17:26:18|         255|          34|
|           HV0005|2021-01-04 05:25:51|2021-01-04 05:45:19|         225|         233|
|           HV0005|2021-01-05 07:07:33|2021-01-05 07:16:16|         231|          87|
|           HV0005|2021-01-06 11:21:01|2021-01-06 11:31:58|          22|          26|
|           HV0005|2021-01-02 18:07:12|2021-01-02 18:13:56|           9|          73|
|           HV0005|2021-01-04 08:30:25|2021-01-04 08:40:38|         140|         263|
|           HV0005|2021-01-05 18:33:50|2021-01-05 18:41:01|          62|          61|
|           HV0005|2021-01-03 14:47:06|2021-01-03 14:57:21|         232|         107|
|           HV0005|2021-01-06 06:01:40|2021-01-06 06:1

In [45]:
from pyspark.sql import functions as F

In [46]:
# We define a dummy (and useless) function to demonstrate UDFs in Spark
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [47]:
crazy_stuff('B02884')

's/b44'

In [48]:
# Make the python function defined above, a Spark UDFs function.
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [49]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

[Stage 17:>                                                         (0 + 1) / 1]

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/9ce| 2021-01-03|  2021-01-03|         255|          34|
|  e/b42| 2021-01-05|  2021-01-05|         189|         107|
|  e/b33| 2021-01-02|  2021-01-02|          88|         137|
|  e/b38| 2021-01-02|  2021-01-03|         238|         224|
|  e/b3b| 2021-01-06|  2021-01-06|         169|         208|
|  e/b33| 2021-01-07|  2021-01-07|          75|          88|
|  e/acc| 2021-01-07|  2021-01-07|         210|         210|
|  e/acc| 2021-01-02|  2021-01-02|         243|          69|
|  e/b35| 2021-01-04|  2021-01-04|         250|         213|
|  s/b3d| 2021-01-03|  2021-01-03|          87|          79|
|  e/a39| 2021-01-03|  2021-01-03|          68|         181|
|  s/acd| 2021-01-04|  2021-01-04|          95|         236|
|  s/b13| 2021-01-02|  2021-01-02|         262|         236|
|  e/9ce| 2021-01-04|  2

                                                                                