In [1]:
from os import environ as env

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, dayofmonth
from pyspark.sql.types import *

### Setup Spark Session

In [2]:
conf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("pyspark-playground") \
        .set("spark.cores.max", 4) \
        .set("spark.driver.memory", "2g") \
        .set("spark.executor.memory", "8g") \
        .set("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .set("spark.jars", "/Users/iobruno/Vault/data-engineering-zoomcamp/week_5_batch_processing/jars/gcs-connector-latest-hadoop2.jar")

In [3]:
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", env["GOOGLE_APPLICATION_CREDENTIALS"])

23/11/27 00:49:45 WARN Utils: Your hostname, magi.local resolves to a loopback address: 127.0.0.1; using 192.168.15.29 instead (on interface en0)
23/11/27 00:49:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/11/27 00:49:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

## Load Datasets from GCS

#### FHV Dataset

In [5]:
fhv = spark.read.parquet("gs://iobruno-lakehouse-raw/nyc_trip_record_data/fhv/*.parquet")

                                                                                

In [6]:
fhv = fhv.select(
    col('dispatching_base_num'), 
    col('Affiliated_base_number').alias('affiliated_base_num'),
    col('pickup_datetime').cast('timestamp'),
    col('dropOff_datetime').cast('timestamp').alias('dropoff_datetime'),
    col('PUlocationID').alias('pickup_location_id'),
    col('DOlocationID').alias('dropoff_location_id')
)

In [7]:
fhv.createOrReplaceTempView("fhv")

In [8]:
fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- affiliated_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)



#### Taxi Zone Lookup Dataset

In [9]:
zone_lookup_schema = StructType([
    StructField("LocationID", IntegerType(), True),
    StructField("Borough", StringType(), True),
    StructField("Zone", StringType(), True),
    StructField("service_zone", StringType(), True)
])

In [10]:
zone_lookup = spark.read\
                .option("header", True)\
                .schema(zone_lookup_schema)\
                .csv("gs://iobruno-lakehouse-raw/nyc_trip_record_data/zone_lookup/taxi_zone_lookup.csv.gz")

In [11]:
zone_lookup = zone_lookup.select(
    col('LocationID').alias('location_id'),
    col('Borough').alias('borough'),
    col('Zone').alias('zone'),
    col('service_zone')
)

In [12]:
zone_lookup.createOrReplaceTempView("zones")

In [13]:
zone_lookup.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- borough: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



### SparkSQL - Joining DataFrames

In [14]:
sdf = spark.sql("""    
    SELECT 
        f.dispatching_base_num,
        f.affiliated_base_num,

        -- Pickup Data
        f.pickup_datetime,
        pu.zone as pickup_zone,
        pu.service_zone as pickup_service_zone,
        
        -- Dropoff Data
        f.dropoff_datetime,
        do.zone as dropoff_zone,
        do.service_zone as dropoff_service_zone
    FROM 
        fhv f    
    INNER JOIN 
        zones pu ON f.pickup_location_id  = pu.location_id
    INNER JOIN 
        zones do ON f.dropoff_location_id = do.location_id
""")

In [15]:
sdf.show(10, 25, False)

                                                                                

+--------------------+-------------------+-------------------+-------------------------+-------------------+-------------------+-------------------------+--------------------+
|dispatching_base_num|affiliated_base_num|    pickup_datetime|              pickup_zone|pickup_service_zone|   dropoff_datetime|             dropoff_zone|dropoff_service_zone|
+--------------------+-------------------+-------------------+-------------------------+-------------------+-------------------+-------------------------+--------------------+
|              B00254|             B02356|2019-01-01 00:33:03|          Lenox Hill East|        Yellow Zone|2019-01-01 01:37:24|              Cobble Hill|           Boro Zone|
|              B00254|             B00254|2019-01-01 00:03:00|          Lenox Hill West|        Yellow Zone|2019-01-01 00:34:25|    Upper East Side South|         Yellow Zone|
|              B00254|             B00254|2019-01-01 00:45:48|    Upper East Side South|        Yellow Zone|2019-01-01 0

### Spark DataFrame API - Joining DataFrames

In [16]:
df = fhv.alias("f")\
        .join(zone_lookup.alias("pu"), col("f.pickup_location_id") == col("pu.location_id"), how="inner")\
        .join(zone_lookup.alias("do"), col("f.pickup_location_id") == col("do.location_id"), how="inner")\
        .select(
            col("f.dispatching_base_num"),
            col("f.affiliated_base_num"),

            # Pickup Data
            col("f.pickup_datetime"),
            col("pu.zone").alias("pickup_zone"),
            col("pu.service_zone").alias("pickup_service_zone"),
            
            # Dropoff Data
            col("f.dropoff_datetime"),
            col("do.zone").alias("dropoff_zone"),
            col("do.service_zone").alias("dropoff_service_zone")                    
        )

In [17]:
df.show(10, 25, False)

[Stage 4:>                                                          (0 + 1) / 1]

+--------------------+-------------------+-------------------+-------------------------+-------------------+-------------------+-------------------------+--------------------+
|dispatching_base_num|affiliated_base_num|    pickup_datetime|              pickup_zone|pickup_service_zone|   dropoff_datetime|             dropoff_zone|dropoff_service_zone|
+--------------------+-------------------+-------------------+-------------------------+-------------------+-------------------+-------------------------+--------------------+
|              B00254|             B02356|2019-01-01 00:33:03|          Lenox Hill East|        Yellow Zone|2019-01-01 01:37:24|          Lenox Hill East|         Yellow Zone|
|              B00254|             B00254|2019-01-01 00:03:00|          Lenox Hill West|        Yellow Zone|2019-01-01 00:34:25|          Lenox Hill West|         Yellow Zone|
|              B00254|             B00254|2019-01-01 00:45:48|    Upper East Side South|        Yellow Zone|2019-01-01 0

                                                                                