In [3]:
# using the parquet files created in 5.3.1.
# may need to recreate spark session
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/05 14:57:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# read parquest files into dataframe
df_parquet = spark.read.parquet('fhvhv/2021/01/')

                                                                                

In [5]:
# show dataframe --> should also show column datatype based on schema
df_parquet

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: double]

In [7]:
#user freindly way of showing scehma
df_parquet.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)



In [8]:
# example of choosing needed cols -- > no data is shown
df_parquet.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')

DataFrame[pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int]

In [12]:
# example of choosing needed cols, with filter -- > no data is shown unless .show() is added
df_parquet.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df_parquet.hvfhs_license_num == 'HV0003') \
    .show()

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-05 15:25:51|2021-01-05 15:38:17|         229|         236|
|2021-01-01 22:56:14|2021-01-01 23:06:41|          49|          80|
|2021-01-01 01:51:36|2021-01-01 02:18:24|         197|          11|
|2021-01-03 17:09:35|2021-01-03 17:29:54|          82|         101|
|2021-01-05 14:07:12|2021-01-05 14:15:05|          72|          35|
|2021-01-06 07:26:20|2021-01-06 07:45:31|           3|         128|
|2021-01-03 15:27:46|2021-01-03 15:54:08|         164|         138|
|2021-01-03 13:38:27|2021-01-03 13:50:05|         134|         196|
|2021-01-05 17:51:58|2021-01-05 18:37:13|          82|         222|
|2021-01-05 10:29:27|2021-01-05 10:44:56|         218|         265|
|2021-01-06 10:46:40|2021-01-06 10:50:16|          84|          84|
|2021-01-05 18:23:01|2021-01-05 18:33:27|       

In [15]:
# example of choosing needed cols, with filter and surfacing top 5 --> head or take can be used
df_parquet.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df_parquet.hvfhs_license_num == 'HV0003') \
    .head(5)

[Row(pickup_datetime=datetime.datetime(2021, 1, 5, 15, 25, 51), dropoff_datetime=datetime.datetime(2021, 1, 5, 15, 38, 17), PULocationID=229, DOLocationID=236),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 22, 56, 14), dropoff_datetime=datetime.datetime(2021, 1, 1, 23, 6, 41), PULocationID=49, DOLocationID=80),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 1, 51, 36), dropoff_datetime=datetime.datetime(2021, 1, 1, 2, 18, 24), PULocationID=197, DOLocationID=11),
 Row(pickup_datetime=datetime.datetime(2021, 1, 3, 17, 9, 35), dropoff_datetime=datetime.datetime(2021, 1, 3, 17, 29, 54), PULocationID=82, DOLocationID=101),
 Row(pickup_datetime=datetime.datetime(2021, 1, 5, 14, 7, 12), dropoff_datetime=datetime.datetime(2021, 1, 5, 14, 15, 5), PULocationID=72, DOLocationID=35)]

In [17]:
# using SQL would be easier that writing the query commands in spark
# spark is more flexible

In [18]:
# functions that already exist in spark
# convention used below as you can type "F." then "tab" to see function list
from pyspark.sql import functions as F

In [24]:
# example of using to_date function
# adding two new cols to dataframe and selecting cols accordingly
# giving the name of "new" column with an existing column_name will overwrite the latter
df_parquet \
    .withColumn('pickup_date', F.to_date(df_parquet.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df_parquet.dropoff_datetime)) \
    .select('dispatching_base_num', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+--------------------+-----------+------------+------------+------------+
|dispatching_base_num|pickup_date|dropoff_date|PULocationID|DOLocationID|
+--------------------+-----------+------------+------------+------------+
|              B02882| 2021-01-05|  2021-01-05|         229|         236|
|              B02875| 2021-01-01|  2021-01-01|          49|          80|
|              B02888| 2021-01-01|  2021-01-01|         197|          11|
|              B02764| 2021-01-03|  2021-01-03|          82|         101|
|              B02876| 2021-01-05|  2021-01-05|          72|          35|
|              B02864| 2021-01-06|  2021-01-06|           3|         128|
|              B02510| 2021-01-01|  2021-01-01|         168|         138|
|              B02872| 2021-01-03|  2021-01-03|         164|         138|
|              B02869| 2021-01-03|  2021-01-03|         134|         196|
|              B02682| 2021-01-05|  2021-01-05|          82|         222|
|              B02877| 2021-01-05|  20

In [25]:
# example of creating a function
# function is to take dispatching_base_num, and if divisible by 7 do x else do y
# input is called base_num
# from input start at position 1 and convert to int

def check_base_num(base_num):
    num = int(base_num[1:])
    if num % 7 ==0:
        return f's/{num:03x}'
    else:
        return f'e/{num:03x}'

In [26]:
# test if function works
check_base_num('B02864')

'e/b30'

In [28]:
# import schema method for pyspark (lives in sql.types).  
# instead of importing one datatype separately, import entire package 
from pyspark.sql import types

In [29]:
# create spark udf
# ensure types method is imported
check_base_num_udf = F.udf(check_base_num, returnType=types.StringType())

In [31]:
df_parquet \
    .withColumn('pickup_date', F.to_date(df_parquet.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df_parquet.dropoff_datetime)) \
    .withColumn('base_num_check', check_base_num_udf(df_parquet.dispatching_base_num)) \
    .select('dispatching_base_num', 'base_num_check', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+--------------+-----------+------------+------------+------------+
|dispatching_base_num|base_num_check|pickup_date|dropoff_date|PULocationID|DOLocationID|
+--------------------+--------------+-----------+------------+------------+------------+
|              B02882|         e/b42| 2021-01-05|  2021-01-05|         229|         236|
|              B02875|         e/b3b| 2021-01-01|  2021-01-01|          49|          80|
|              B02888|         e/b48| 2021-01-01|  2021-01-01|         197|          11|
|              B02764|         e/acc| 2021-01-03|  2021-01-03|          82|         101|
|              B02876|         e/b3c| 2021-01-05|  2021-01-05|          72|          35|
|              B02864|         e/b30| 2021-01-06|  2021-01-06|           3|         128|
|              B02510|         e/9ce| 2021-01-01|  2021-01-01|         168|         138|
|              B02872|         e/b38| 2021-01-03|  2021-01-03|         164|         138|
|              B02869

                                                                                