# Instancing Spark

In [1]:
# Import the PySpark library and the `SparkSession` class

import pyspark
from pyspark.sql import SparkSession

In [2]:
# Instance a session

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/02 13:40:15 WARN Utils: Your hostname, Desktop-Gar resolves to a loopback address: 127.0.1.1; using 172.25.243.204 instead (on interface eth0)
24/03/02 13:40:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/02 13:40:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/02 13:40:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Read data

In [None]:
# Download some large data

# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-01.csv.gz

In [3]:
# Read the data. Note that the datatypes are not inferred; everything is a string.

df = spark.read \
    .option("header", "true") \
    .csv('data/csv/yellow_tripdata_2020-01.csv.gz')

                                                                                

In [4]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|              1|         1.20|         1|                 N|         238|         239|           1|          6|    3|    0.5|      1.47|           0|                  0.3

In [5]:
df.head(5)

[Row(VendorID='1', tpep_pickup_datetime='2020-01-01 00:28:15', tpep_dropoff_datetime='2020-01-01 00:33:03', passenger_count='1', trip_distance='1.20', RatecodeID='1', store_and_fwd_flag='N', PULocationID='238', DOLocationID='239', payment_type='1', fare_amount='6', extra='3', mta_tax='0.5', tip_amount='1.47', tolls_amount='0', improvement_surcharge='0.3', total_amount='11.27', congestion_surcharge='2.5'),
 Row(VendorID='1', tpep_pickup_datetime='2020-01-01 00:35:39', tpep_dropoff_datetime='2020-01-01 00:43:04', passenger_count='1', trip_distance='1.20', RatecodeID='1', store_and_fwd_flag='N', PULocationID='239', DOLocationID='238', payment_type='1', fare_amount='7', extra='3', mta_tax='0.5', tip_amount='1.5', tolls_amount='0', improvement_surcharge='0.3', total_amount='12.3', congestion_surcharge='2.5'),
 Row(VendorID='1', tpep_pickup_datetime='2020-01-01 00:47:41', tpep_dropoff_datetime='2020-01-01 00:53:52', passenger_count='1', trip_distance='.60', RatecodeID='1', store_and_fwd_flag

In [6]:
# Read the schema

df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



# Creating a schema with Pandas

In [7]:
from pyspark.sql import types

In [8]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [11]:
# With this new schema, we can now create a dataframe with inferred datatypes.

df = spark.read \
    .option("header", "true") \
    .schema(yellow_schema) \
    .csv('data/csv/yellow_tripdata_2020-01.csv.gz')

In [12]:
df.show()

24/03/02 13:42:48 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 18, schema size: 20
CSV file: file:///mnt/e/projects/dezoomcamp/5_batch_processing/code/data/csv/yellow_tripdata_2020-01.csv.gz
[Stage 3:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|                 1|      null|           1|        null|            238|        239.0|        1.0|  6.0|    3.

                                                                                

In [13]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



# Partitions

In [14]:
# We will now create 24 partitions in our dataframe

df = df.repartition(96)

In [15]:
# Let's parquetize the dataframe. This will create 24 smaller parquet files.
# This operation may take a while.

df.write.csv('data/raw/yellow/2020/01/', mode='overwrite')

24/03/02 13:43:49 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 18, schema size: 20
CSV file: file:///mnt/e/projects/dezoomcamp/5_batch_processing/code/data/csv/yellow_tripdata_2020-01.csv.gz
                                                                                

Check the created files

In [16]:
!ls -lh data/raw/yellow/2020/01/

total 704M
-rwxrwxrwx 1 root root    0 Mar  2 13:49 _SUCCESS
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00000-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00001-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00002-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00003-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00004-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00005-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00006-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00007-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-00008-0e74354d-a4f8-47cf-a2a9-85fd33ea1095-c000.csv
-rwxrwxrwx 1 root root 7.4M Mar  2 13:49 part-

# Spark Dataframes

In [17]:
# # With this new schema, we can now create a dataframe with inferred datatypes.

spark.read.csv('data/raw/yellow/2020/01/')



DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string]

In [18]:
# # With this new schema, we can now create a dataframe with inferred datatypes.

df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



# Functions and UDFs

In [19]:
from pyspark.sql import functions as F

In [20]:
df.show()

24/03/02 13:51:05 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 18, schema size: 20
CSV file: file:///mnt/e/projects/dezoomcamp/5_batch_processing/code/data/csv/yellow_tripdata_2020-01.csv.gz
[Stage 9:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-04 13:49:54|  2020-01-04 13:58:51|                 1|      null|           1|        null|            161|        113.0|        1.0|  8.5|    0.

                                                                                

In [21]:
# # With this new schema, we can now create a dataframe with inferred datatypes.

df \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.tpep_dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

24/03/02 13:56:22 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: tpep_pickup_datetime, tpep_dropoff_datetime, RatecodeID, store_and_fwd_flag
 Schema: tpep_pickup_datetime, tpep_dropoff_datetime, PULocationID, DOLocationID
Expected: PULocationID but found: RatecodeID
CSV file: file:///mnt/e/projects/dezoomcamp/5_batch_processing/code/data/csv/yellow_tripdata_2020-01.csv.gz
[Stage 12:>                                                         (0 + 1) / 1]

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2020-01-28|  2020-01-28|           1|        null|
| 2020-01-23|  2020-01-23|           1|        null|
| 2020-01-28|  2020-01-28|           1|        null|
| 2020-01-08|  2020-01-08|           1|        null|
| 2020-01-12|  2020-01-12|           1|        null|
| 2020-01-22|  2020-01-22|           1|        null|
| 2020-01-16|  2020-01-16|           1|        null|
| 2020-01-31|  2020-01-31|           1|        null|
| 2020-01-08|  2020-01-08|           1|        null|
| 2020-01-05|  2020-01-05|           1|        null|
| 2020-01-08|  2020-01-08|           1|        null|
| 2020-01-11|  2020-01-11|           1|        null|
| 2020-01-02|  2020-01-02|           1|        null|
| 2020-01-24|  2020-01-24|           1|        null|
| 2020-01-05|  2020-01-05|           1|        null|
| 2020-01-12|  2020-01-12|           1|       

                                                                                

In [23]:
# # With this new schema, we can now create a dataframe with inferred datatypes.

df \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.tpep_dropoff_datetime)) \
    .select('VendorID', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

24/03/02 13:58:55 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, RatecodeID, store_and_fwd_flag
 Schema: VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, PULocationID, DOLocationID
Expected: PULocationID but found: RatecodeID
CSV file: file:///mnt/e/projects/dezoomcamp/5_batch_processing/code/data/csv/yellow_tripdata_2020-01.csv.gz
[Stage 15:>                                                         (0 + 1) / 1]

+--------+-----------+------------+------------+------------+
|VendorID|pickup_date|dropoff_date|PULocationID|DOLocationID|
+--------+-----------+------------+------------+------------+
|       1| 2020-01-24|  2020-01-24|           1|        null|
|       1| 2020-01-31|  2020-01-31|           1|        null|
|       2| 2020-01-01|  2020-01-01|           1|        null|
|       2| 2020-01-20|  2020-01-20|           1|        null|
|       2| 2020-01-29|  2020-01-29|           1|        null|
|       2| 2020-01-28|  2020-01-28|           1|        null|
|       2| 2020-01-30|  2020-01-30|           1|        null|
|       2| 2020-01-08|  2020-01-08|           1|        null|
|       2| 2020-01-15|  2020-01-15|           1|        null|
|       2| 2020-01-14|  2020-01-14|           1|        null|
|       2| 2020-01-05|  2020-01-05|           1|        null|
|       2| 2020-01-31|  2020-01-31|           1|        null|
|    null| 2020-01-10|  2020-01-10|        null|        null|
|       

                                                                                