In [8]:
from pyspark.sql import SparkSession

Start pyspark session

In [10]:
spark = (SparkSession.builder
                     .master("local[*]")
                     .appName("test")
                     .getOrCreate())

Read csv

In [33]:
data_path = "../docker-datapipeline/yellow_tripdata_2021-01.csv"

In [24]:
df = (spark.read
           .option("header", "true")
           .csv(data_path))

Show csv data and schema. Everything is string for spark

In [25]:
# df.show()
# df.head(5)


df.schema

StructType(List(StructField(VendorID,StringType,true),StructField(tpep_pickup_datetime,StringType,true),StructField(tpep_dropoff_datetime,StringType,true),StructField(passenger_count,StringType,true),StructField(trip_distance,StringType,true),StructField(RatecodeID,StringType,true),StructField(store_and_fwd_flag,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(payment_type,StringType,true),StructField(fare_amount,StringType,true),StructField(extra,StringType,true),StructField(mta_tax,StringType,true),StructField(tip_amount,StringType,true),StructField(tolls_amount,StringType,true),StructField(improvement_surcharge,StringType,true),StructField(total_amount,StringType,true),StructField(congestion_surcharge,StringType,true)))

Take 5 data and save to head.csv

In [26]:
# type command in git bash
# head docker-datapipeline/yellow_tripdata_2021-01.csv > head.csv | mv head.csv pyspark-datapipeline

Read csv with pandas and see the datatype. Pandas is infering the datatype.

In [28]:
import pandas as pd

df_pandas = pd.read_csv("head.csv")
df_pandas.dtypes

VendorID                   int64
tpep_pickup_datetime      object
tpep_dropoff_datetime     object
passenger_count            int64
trip_distance            float64
RatecodeID                 int64
store_and_fwd_flag        object
PULocationID               int64
DOLocationID               int64
payment_type               int64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount               int64
improvement_surcharge    float64
total_amount             float64
congestion_surcharge     float64
dtype: object

Create dataframe from pandas. We can see that dtypes is vary.

In [30]:
spark.createDataFrame(df_pandas).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2021-01-01 00:30:10|  2021-01-01 00:36:12|              1|          2.1|         1|                 N|         142|          43|           2|        8.0|  3.0|    0.5|       0.0|           0|                  0.3

Create schema from known datatype

In [31]:
from pyspark.sql import types

In [32]:
schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.FloatType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.FloatType(), True),
    types.StructField("extra", types.FloatType(), True),
    types.StructField("mta_tax", types.FloatType(), True),
    types.StructField("tip_amount", types.FloatType(), True),
    types.StructField("tolls_amount", types.IntegerType(), True),
    types.StructField("improvement_surcharge", types.FloatType(), True),
    types.StructField("total_amount", types.FloatType(), True),
    types.StructField("congestion_surcharge", types.FloatType(), True),
])

Read csv again. Now we specify the schema

In [34]:
df = (spark.read
           .option("header", "true")
           .schema(schema)
           .csv(data_path))

The datatype is fit

In [36]:
df.head()

Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 30, 10), tpep_dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 36, 12), passenger_count=1, trip_distance=2.0999999046325684, RatecodeID=1, store_and_fwd_flag='N', PULocationID=142, DOLocationID=43, payment_type=2, fare_amount=8.0, extra=3.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0, improvement_surcharge=0.30000001192092896, total_amount=11.800000190734863, congestion_surcharge=2.5)

Now, instead of read one large file. We can split the file into smaller parts to utilize spark clusters.

In [37]:
df = df.repartition(24)

Save the data into spark format

In [None]:
df.write.parquet("yellow_tripdata.parquet")

Restart the kernel and try to reopen the saved file

In [17]:
from pyspark.sql import SparkSession
import pyspark.pandas as ps



In [2]:
spark = (SparkSession.builder
                     .master("local[*]")
                     .appName("test")
                     .getOrCreate())

In [3]:
df = spark.read.option("header", True).parquet("yellow_tripdata.parquet")

In [8]:
df.head()

Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2021, 1, 10, 12, 32, 3), tpep_dropoff_datetime=datetime.datetime(2021, 1, 10, 12, 43, 36), passenger_count=1, trip_distance=3.5, RatecodeID=1, store_and_fwd_flag='N', PULocationID=79, DOLocationID=236, payment_type=1, fare_amount=12.0, extra=2.5, mta_tax=0.5, tip_amount=3.049999952316284, tolls_amount=0, improvement_surcharge=0.30000001192092896, total_amount=18.350000381469727, congestion_surcharge=2.5)

In [16]:
df.groupBy(df.VendorID).sum().head()

Row(VendorID=None, sum(VendorID)=None, sum(passenger_count)=None, sum(trip_distance)=2917624.442051178, sum(RatecodeID)=None, sum(PULocationID)=14892901, sum(DOLocationID)=13331869, sum(payment_type)=None, sum(fare_amount)=2565865.076589197, sum(extra)=79782.82001562417, sum(mta_tax)=48730.0, sum(tip_amount)=161767.29994092882, sum(tolls_amount)=426, sum(improvement_surcharge)=29501.701172292233, sum(total_amount)=3051485.7303704023, sum(congestion_surcharge)=51804.75)

Convert to pandas

In [18]:
psdf = ps.DataFrame(df)

In [20]:
psdf.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-10 12:32:03,2021-01-10 12:43:36,1,3.5,1,N,79,236,1,12.0,2.5,0.5,3.05,0,0.3,18.35,2.5
1,2,2021-01-05 10:26:12,2021-01-05 10:42:45,1,3.7,1,N,234,33,2,14.5,0.0,0.5,0.0,0,0.3,17.799999,2.5
2,2,2021-01-06 13:25:45,2021-01-06 13:34:09,1,2.74,1,N,170,263,1,9.5,0.0,0.5,2.0,0,0.3,14.8,2.5
3,2,2021-01-04 08:19:46,2021-01-04 08:30:33,1,2.25,1,N,237,239,2,10.0,0.0,0.5,0.0,0,0.3,13.3,2.5
4,2,2021-01-10 11:14:50,2021-01-10 11:47:14,1,6.93,1,N,113,37,1,26.5,0.0,0.5,5.96,0,0.3,35.759998,2.5


Convert to SQL Table

In [22]:
df.createOrReplaceTempView("yellow_taxi")
spark.sql("SELECT count(*) from yellow_taxi LIMIT 5").show()

+--------+
|count(1)|
+--------+
| 1369765|
+--------+



SQL alternative of `.nunique()`

In [31]:
spark.sql("SELECT count(DISTINCT tpep_pickup_datetime), count(DISTINCT VendorID) FROM yellow_taxi").show()

+------------------------------------+------------------------+
|count(DISTINCT tpep_pickup_datetime)|count(DISTINCT VendorID)|
+------------------------------------+------------------------+
|                              939018|                       2|
+------------------------------------+------------------------+



In [44]:
spark.sql("SELECT DISTINCT VendorID, tpep_pickup_datetime FROM yellow_taxi").show()

+--------+--------------------+
|VendorID|tpep_pickup_datetime|
+--------+--------------------+
|       2| 2021-01-09 13:28:04|
|       2| 2021-01-07 08:17:14|
|       1| 2021-01-04 16:12:44|
|       2| 2021-01-07 21:22:01|
|       2| 2021-01-01 17:20:53|
|       1| 2021-01-08 08:44:50|
|       2| 2021-01-06 11:40:29|
|       2| 2021-01-06 11:25:51|
|       1| 2021-01-03 12:23:20|
|       1| 2021-01-04 09:08:34|
|       2| 2021-01-04 07:22:22|
|       1| 2021-01-01 22:04:01|
|       1| 2021-01-02 10:39:49|
|       2| 2021-01-05 22:16:08|
|       2| 2021-01-06 16:00:12|
|       2| 2021-01-02 17:53:51|
|       2| 2021-01-06 14:31:16|
|       2| 2021-01-06 11:51:16|
|       2| 2021-01-07 20:46:19|
|       2| 2021-01-05 17:49:28|
+--------+--------------------+
only showing top 20 rows



SQL alternative of `.value_counts()`

In [42]:
spark.sql("SELECT VendorID, count(*) as count FROM yellow_taxi GROUP BY VendorID").show()

+--------+------+
|VendorID| count|
+--------+------+
|    null| 98352|
|       1|410762|
|       2|860651|
+--------+------+

