# First look at Spark/PySpark

## Instancing Spark

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
import pandas as pd

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/02/24 16:58:50 WARN Utils: Your hostname, oladeMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.102 instead (on interface en0)
24/02/24 16:58:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/24 16:58:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read data

In [30]:
#!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

--2024-02-23 14:46:53--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
Resolving github.com (github.com)... 198.18.0.13
Connecting to github.com (github.com)|198.18.0.13|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240223T064654Z&X-Amz-Expires=300&X-Amz-Signature=984634acde874b5fa89609fb03a5b267569ef15d87fcd240a2408b6e721ddc75&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-02-23 14:46:54--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4

In [37]:
#先解压csv.gz
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')

In [78]:
!wc -l fhvhv_tripdata_2021-01.csv

 11908469 fhvhv_tripdata_2021-01.csv


In [72]:
#pyspark中读取数据行数
df.count()

                                                                                

11908468

In [80]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   NULL|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   NULL|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   NULL|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   NULL|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   NULL|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

In [39]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

In [81]:
df.head(1)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None)]

可以发现所有的字段都是string，需要把字段更改到合适的数据类型

## 纠正schema

Write the first 1001 lines of the CSV file to another file

假设前面的读入数据都不存在，因为原始数据很大，我们先读入一部分数据，来检查现有的datatypes

In [57]:
!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

Count the lines in the new CSV file

In [58]:
!wc -l head.csv

    1001 head.csv


We will now use Pandas to infer the datatypes. Let's import Pandas and read the small CSV file.  

使用pandas推断数据类型

In [4]:
df_pandas = pd.read_csv('head.csv')

Let's see the inferred types.

In [5]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

24/02/24 16:59:06 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Spark can create dataframes from Pandas dataframes. Let's see how Spark converts it and check the datatypes as defined in Spark.

将pandas dataframe转换成spark dataframe

In [21]:
spark.createDataFrame(df_pandas).show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|    NaN|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|    NaN|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|    NaN|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|    NaN|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|    NaN|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

In [6]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

This datatype description is from Scala, which is the language that Spark is created with. Let's clean up the output and create our usable schema from it.

此数据类型描述来自 Scala，这是创建 Spark 所使用的语言。从上面的输出创建下面这个模型

In [None]:
# Don't run this block!
'''
StructType(
    List(
        StructField(hvfhs_license_num,StringType,true),
        StructField(dispatching_base_num,StringType,true),
        StructField(pickup_datetime,StringType,true),
        StructField(dropoff_datetime,StringType,true),
        StructField(PULocationID,LongType,true),
        StructField(DOLocationID,LongType,true),
        StructField(SR_Flag,DoubleType,true)
    )
)
'''

The timestamps were not originally inferred by Pandas but we know which fields should be datatypes. We will convert longs (`int64`) to integers for memory optimization, and convert `SR_Flag` to string.

LongType 一个字符8字节
integerType 一个字符4字节

In [7]:
from pyspark.sql import types

In [8]:
schema = types.StructType(
    [
        types.StructField('hvfhs_license_num', types.StringType(), True),
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True)
    ]
)

In [9]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [10]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   NULL|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   NULL|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   NULL|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   NULL|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   NULL|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

In [11]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

In [12]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True)])

## Partitions

We will now create 24 partitions in our dataframe

In [13]:
df = df.repartition(24)

Let's parquetize the dataframe. This will create 24 smaller parquet files.

This operation may take a while.

Check the created files

In [16]:
!ls -lh fhvhv/2021/01/

total 445440
-rw-r--r--  1 ola  staff     0B  2 24 17:01 _SUCCESS
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00000-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00001-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00002-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00003-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00004-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00005-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00006-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 17:01 part-00007-1c4d861a-89e1-4fba-9e3d-5d100923ec51-c000.snappy.parquet
-rw-r--r--  1 ola  staff   8.9M  2 24 

# Spark Dataframe

In [31]:
df = spark.read.parquet('fhvhv/2021/01/')

Check the schema

In [32]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [18]:
new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')

In [20]:
new_df.head(1)

                                                                                

[Row(pickup_datetime=datetime.datetime(2021, 1, 3, 4, 6, 35), dropoff_datetime=datetime.datetime(2021, 1, 3, 4, 11, 42), PULocationID=235, DOLocationID=169)]

In [21]:
new_df_2 = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')\
        .filter(df.hvfhs_license_num == 'HV0003')

In [24]:
new_df_2.show(1)

[Stage 17:>                                                         (0 + 8) / 8]

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-01 05:39:00|2021-01-01 06:05:49|          76|         256|
+-------------------+-------------------+------------+------------+
only showing top 1 row



                                                                                

## Functions and UDFs

In [26]:
from pyspark.sql import functions as F

Using built-in functions (`F.to_date()`)

In [30]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

[Stage 29:>                                                         (0 + 8) / 8]

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-03|  2021-01-03|         235|         169|
| 2021-01-04|  2021-01-04|         213|          81|
| 2021-01-01|  2021-01-01|          80|         224|
| 2021-01-02|  2021-01-02|          52|          40|
| 2021-01-01|  2021-01-01|         168|          42|
| 2021-01-01|  2021-01-01|         183|         213|
| 2021-01-03|  2021-01-03|         171|          53|
| 2021-01-03|  2021-01-03|          42|         244|
| 2021-01-02|  2021-01-02|         174|         169|
| 2021-01-03|  2021-01-03|         259|         162|
| 2021-01-01|  2021-01-01|         168|         159|
| 2021-01-05|  2021-01-05|         108|          89|
| 2021-01-05|  2021-01-05|         262|          47|
| 2021-01-01|  2021-01-01|         136|         241|
| 2021-01-02|  2021-01-02|          95|          95|
| 2021-01-03|  2021-01-03|          37|       

                                                                                

Creating a UDF

In [31]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [32]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('dispatching_base_num','base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

[Stage 34:>                                                         (0 + 1) / 1]

+--------------------+-------+-----------+------------+------------+------------+
|dispatching_base_num|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+--------------------+-------+-----------+------------+------------+------------+
|              B02871|  a/b37| 2021-01-04|  2021-01-04|         149|         108|
|              B02510|  e/9ce| 2021-01-03|  2021-01-03|         239|         244|
|              B02764|  e/acc| 2021-01-02|  2021-01-02|         236|         233|
|              B02887|  e/b47| 2021-01-05|  2021-01-05|          82|         129|
|              B02872|  e/b38| 2021-01-02|  2021-01-02|         143|         239|
|              B02510|  e/9ce| 2021-01-03|  2021-01-03|         198|          37|
|              B02764|  e/acc| 2021-01-03|  2021-01-03|         157|          36|
|              B02872|  e/b38| 2021-01-04|  2021-01-04|         171|          16|
|              B02510|  e/9ce| 2021-01-01|  2021-01-01|          28|         216|
|              B

                                                                                