In [1]:
from pyspark.sql import SparkSession, functions as F

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 2")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/20 23:14:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
#read data
yellow = spark.read.parquet("../data/raw/yellow_data/")

In [7]:
yellow.show(1,vertical=True, truncate=100)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2022-10-01 00:03:41 
 tpep_dropoff_datetime | 2022-10-01 00:18:39 
 passenger_count       | 1.0                 
 trip_distance         | 1.7                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 249                 
 DOLocationID          | 107                 
 payment_type          | 1                   
 fare_amount           | 9.5                 
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 2.65                
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 15.95               
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
only showing top 1 row



There is no error, so we can directly copy them 1-1 into the raw layer.

In [8]:
yellow_jan = spark.read.parquet("../data/raw/yellow_data/2022-01.parquet")

In [9]:
# get the general schema for this data
consistent_col_casing = [F.col(col_name).alias(col_name.lower()) for col_name in yellow_jan.columns]
yellow_jan = yellow_jan.select(*consistent_col_casing)

yellow_schema = yellow_jan.schema
yellow_schema

StructType([StructField('vendorid', LongType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('ratecodeid', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('pulocationid', LongType(), True), StructField('dolocationid', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True)])

In [10]:
# copy the raw data 1-1 into the landing file.
for month in range(1, 13):
    input_path = f'../data/raw/yellow_data/2022-{str(month).zfill(2)}.parquet'
    output_path = f'../data/landing/yellow22/2022-{str(month).zfill(2)}'
    
    sdf = spark \
        .read \
        .schema(yellow_schema) \
        .parquet(input_path) \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(output_path)

                                                                                