In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import desc

# Create Spark session

In [55]:
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# Extract: Read gz file by as a csv and convert it into dataframe

In [56]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data/fhv_tripdata_2019-01.csv.gz")

print("Here is our inferred schema:")

df.printSchema()

[Stage 34:>                                                         (0 + 1) / 1]

Here is our inferred schema:
root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



                                                                                

# Show the top 20 data

In [57]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00001|2019-01-01 00:30:00|2019-01-01 02:51:55|        null|        null|   null|                B00001|
|              B00001|2019-01-01 00:45:00|2019-01-01 00:54:49|        null|        null|   null|                B00001|
|              B00001|2019-01-01 00:15:00|2019-01-01 00:54:52|        null|        null|   null|                B00001|
|              B00008|2019-01-01 00:19:00|2019-01-01 00:39:00|        null|        null|   null|                B00008|
|              B00008|2019-01-01 00:27:00|2019-01-01 00:37:00|        null|        null|   null|                B00008|
|              B00008|2019-01-01 00:48:0

# Creates or replaces a local temporary view with this dataframe

In [58]:
df.createOrReplaceTempView("tripData")

# Transform: Filter the data as specified in the sql statement below

In [59]:
df_sql = spark.sql("SELECT * FROM tripData \
    WHERE (PUlocationID is NOT NULL AND \
    DOlocationID is NOT NULL AND \
    pickup_datetime >= '2019-01-01 00:00:00' AND \
    pickup_datetime <= '2019-01-10 23:59:59')")

In [60]:
df_sql.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00254|2019-01-01 00:33:03|2019-01-01 01:37:24|         140|          52|   null|                B02356|
|              B00254|2019-01-01 00:03:00|2019-01-01 00:34:25|         141|         237|   null|                B00254|
|              B00254|2019-01-01 00:45:48|2019-01-01 01:26:01|         237|         236|   null|                B00254|
|              B00254|2019-01-01 00:37:39|2019-01-01 01:44:59|         162|          85|   null|                B00254|
|              B00254|2019-01-01 00:35:06|2019-01-01 01:30:21|         237|         246|   null|                B00254|
|              B00254|2019-01-01 00:55:2

# Show the last 20 data to make sure it is right

In [61]:
df_sql = df_sql.withColumn("index", monotonically_increasing_id())

df_sql.orderBy(desc("index")).drop("index").show(20)

[Stage 37:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B03046|2019-01-10 23:35:00|2019-01-10 23:49:00|         265|         265|   null|                B03046|
|              B03046|2019-01-10 23:15:00|2019-01-10 23:23:00|         265|         265|   null|                B03046|
|              B03046|2019-01-10 23:03:00|2019-01-10 23:14:00|         265|         265|   null|                B03046|
|              B03046|2019-01-10 23:00:00|2019-01-10 23:12:00|         265|         265|   null|                B03046|
|              B03040|2019-01-10 23:48:22|2019-01-10 23:48:29|          48|         144|   null|                B03040|
|              B03040|2019-01-10 23:21:3

                                                                                

# Load: Write the data into a parquet and json file

In [62]:
df_sql.write.parquet("data/fhv_tripdata_between_2019-01-01_and_2019-01-10.parquet")

                                                                                

In [63]:
df_sql.write.json("data/fhv_tripdata_between_2019-01-01_and_2019-01-10.json")

                                                                                