# Spark

- By default, Spark does not try to infer column types



In [3]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/05/16 21:04:05 WARN Utils: Your hostname, Yashs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.189 instead (on interface en0)
24/05/16 21:04:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/16 21:04:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [19]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

In [3]:
# !gzip -d fhvhv_tripdata_2021-01.csv.gz

In [4]:
df = spark.read\
    .option('header','True')\
    .csv('fhvhv_tripdata_2021-01.csv')
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   NULL|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   NULL|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   NULL|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   NULL|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   NULL|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

We see that the columns are all in string. By default, Spark does not try to infer column types.

In [5]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)



#### Using inferschema

In [6]:
df2 = spark.read\
    .option('header','True')\
    .option('inferschema', 'true')\
    .csv('fhvhv_tripdata_2021-01.csv')
df2.printSchema()


[Stage 3:>                                                        (0 + 10) / 10]

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)



                                                                                

In [7]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:23:56', dropoff_datetime='2021-01-01 00:38:05', PULocationID='233', DOLocationID='142', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:42:51', dropoff_datetime='2021-01-01 00:45:50', PULocationID='142', DOLocationID='143', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:48:14', dropoff_datetime='2021-01-01 01:08:42', PULocationID='143', DOLocationID='78', SR_Flag=None)]

In [5]:
# !head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [6]:
import pandas as pd

df_pandas = pd.read_csv('head.csv')
df_pandas

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
0,HV0003,B02682,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,
1,HV0003,B02682,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,
2,HV0003,B02764,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,
3,HV0003,B02764,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,
4,HV0003,B02764,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,
...,...,...,...,...,...,...,...
995,HV0003,B02871,2021-01-01 00:33:43,2021-01-01 00:47:52,255,61,
996,HV0005,B02510,2021-01-01 00:53:10,2021-01-01 01:21:09,114,21,
997,HV0003,B02512,2021-01-01 00:18:35,2021-01-01 00:26:53,167,42,
998,HV0003,B02512,2021-01-01 00:35:39,2021-01-01 00:42:59,116,116,


In [7]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 7 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   hvfhs_license_num     1000 non-null   object 
 1   dispatching_base_num  1000 non-null   object 
 2   pickup_datetime       1000 non-null   object 
 3   dropoff_datetime      1000 non-null   object 
 4   PULocationID          1000 non-null   int64  
 5   DOLocationID          1000 non-null   int64  
 6   SR_Flag               0 non-null      float64
dtypes: float64(1), int64(2), object(4)
memory usage: 54.8+ KB


## Create a Spark DataFrame from a pandas.DataFrame.

In [10]:
spark.createDataFrame(df_pandas).printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- SR_Flag: double (nullable = true)



24/05/11 21:39:30 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [15]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

## Defining schema

1. Structfield comesz from scala
2. types.StructField comes from python

Spark provides spark.sql.types.StructType class to define the structure of the DataFrame and It is a collection or list on StructField objects.

Spark provides spark.sql.types.StructField` class to define the column name(String), column type (DataType), nullable column (Boolean) and metadata (MetaData)



In [19]:
from pyspark.sql import types

schema  = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True), 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True)]
)

df = spark.read \
    .option("header","True") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')
    
df.printSchema() 

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [21]:
# Create 24 partitions in our dataframe.
df = df.repartition(24)
# Parquetize and write to fhvhv/2021/01/ folder.
df.write.parquet('fhvhv/2021/01/')

24/05/09 16:29:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/09 16:29:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/05/09 16:29:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/05/09 16:29:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/05/09 16:29:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/09 16:29:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/09 16:29:03 WARN MemoryManager: Total allocation exceeds 95.00%

# 5.3.2 Spark DataFrames

- Actions vs tranfromations
- Functions and UDFs


In [8]:
df = spark.read.parquet('fhvhv/2021/01/')
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



## DF select and filter

In [9]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003')

#this wont output anything cuz spark is a lazy evaluation

df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02395|2021-01-01 01:23:11|2021-01-01 01:30:12|         147|         168|   NULL|
|           HV0003|              B02617|2021-01-01 18:52:46|2021-01-01 18:57:51|          14|          14|   NULL|
|           HV0005|              B02510|2021-01-02 12:12:21|2021-01-02 12:18:32|         188|          61|   NULL|
|           HV0003|              B02617|2021-01-01 13:14:49|2021-01-01 13:21:31|         243|         127|   NULL|
|           HV0003|              B02872|2021-01-04 00:16:27|2021-01-04 00:26:44|         232|          79|   NULL|
|           HV0003|              B02866|2021-01-03 20:09:24|2021-01-03 20:40:35|

## Spark functions

- .withColumn() is a transformation that adds a new column to the dataframe.
Adding a new column with the same name as a previously existing column will overwrite the existing column.
- .select() is another transformation that selects the stated columns.
- F.to_date() is a built-in Spark function that converts a timestamp to date format (year, month and day only, no hour and minute).
- .show() is an action.

In [10]:
from pyspark.sql import functions as F
import pyspark.sql.types as types

In [None]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-01|  2021-01-01|         147|         168|
| 2021-01-01|  2021-01-01|          14|          14|
| 2021-01-02|  2021-01-02|         188|          61|
| 2021-01-01|  2021-01-01|         243|         127|
| 2021-01-04|  2021-01-04|         232|          79|
| 2021-01-03|  2021-01-03|         123|         197|
| 2021-01-01|  2021-01-01|         245|         221|
| 2021-01-03|  2021-01-03|          16|          53|
| 2021-01-01|  2021-01-01|          78|         119|
| 2021-01-01|  2021-01-01|          83|         138|
| 2021-01-04|  2021-01-04|          62|         261|
| 2021-01-01|  2021-01-01|         205|         216|
| 2021-01-03|  2021-01-03|          29|          55|
| 2021-01-03|  2021-01-03|         129|         129|
| 2021-01-02|  2021-01-02|          35|          76|
| 2021-01-03|  2021-01-03|         208|       

## User defined functions

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udf.html#pyspark.sql.functions.udf

In [None]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

crazy_stuff('B02884')  # Return 's/b44'

crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show(5)

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/95b| 2021-01-01|  2021-01-01|         147|         168|
|  e/a39| 2021-01-01|  2021-01-01|          14|          14|
|  e/9ce| 2021-01-02|  2021-01-02|         188|          61|
|  e/a39| 2021-01-01|  2021-01-01|         243|         127|
|  e/b38| 2021-01-04|  2021-01-04|         232|          79|
+-------+-----------+------------+------------+------------+
only showing top 5 rows



                                                                                

24/05/11 23:39:59 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 5714707 ms exceeds timeout 120000 ms
24/05/11 23:39:59 WARN SparkContext: Killing executors is not supported by current scheduler.
24/05/11 23:40:03 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

# 5.3.4 SQL with Spark


In [1]:
# next in code/05.ipynb and 06.ipynb
