In [1]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("DuythinhtProject1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/13 17:25:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
sdf_all = spark.read.parquet('../data/landing/')
sdf_schema = sdf_all.schema


In [3]:
print(sdf_schema)
schema_dict = {field.name.lower(): field.dataType for field in sdf_schema.fields}
schema_dict

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])


{'vendorid': IntegerType(),
 'tpep_pickup_datetime': TimestampNTZType(),
 'tpep_dropoff_datetime': TimestampNTZType(),
 'passenger_count': LongType(),
 'trip_distance': DoubleType(),
 'ratecodeid': LongType(),
 'store_and_fwd_flag': StringType(),
 'pulocationid': IntegerType(),
 'dolocationid': IntegerType(),
 'payment_type': LongType(),
 'fare_amount': DoubleType(),
 'extra': DoubleType(),
 'mta_tax': DoubleType(),
 'tip_amount': DoubleType(),
 'tolls_amount': DoubleType(),
 'improvement_surcharge': DoubleType(),
 'total_amount': DoubleType(),
 'congestion_surcharge': DoubleType(),
 'airport_fee': DoubleType()}

In [4]:
from pyspark.sql import SparkSession, functions as F

for month in range(7, 13):
    input_path = f'../data/landing/2023-{str(month).zfill(2)}-yellow_cab.parquet'
    output_path = f'../data/raw/2023-{str(month).zfill(2)}-yellow_cab'
    
    sdf_malformed = spark.read.parquet(input_path)
    
    # Convert column names to lowercase
    consistent_col_casing_malformed = [F.col(col_name).alias(col_name.lower()) for col_name in sdf_malformed.columns]
    sdf_malformed = sdf_malformed.select(*consistent_col_casing_malformed)
    
    # Cast columns to match the schema, only for columns present in the schema
    casted_columns = [
        F.col(c).cast(schema_dict[c]) 
        for c in sdf_malformed.columns 
        if c in schema_dict
    ]
    
    # Ensure all columns are in the schema
    for col_name in sdf_malformed.columns:
        if col_name not in schema_dict:
            raise ValueError(f"Column '{col_name}' in the DataFrame is not present in the schema.")

    sdf_malformed = sdf_malformed.select(*casted_columns)
    
    sdf_malformed \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet(output_path)



24/08/13 17:25:20 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [5]:
sdf = spark.read.parquet('../data/raw/*')

sdf.show(1, truncate=100)
sdf.printSchema()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-07-01 00:29:59|  2023-07-01 00:40:15|              1|          1.8|         1|                 N|         140|         263|           1|       12.1|  3.5|    0.5|       5.

In [6]:
sdf = sdf.withColumn(
    'store_and_fwd_flag',
    (F.col("store_and_fwd_flag") == 'Y').cast('BOOLEAN')
)

sdf.show(1, truncate=100)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-07-01 00:29:59|  2023-07-01 00:40:15|              1|          1.8|         1|             false|         140|         263|           1|       12.1|  3.5|    0.5|       5.

In [7]:
summary = sdf.describe()
summary.show()

24/08/13 17:25:32 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+---------------------+------------------+--------------------+-------------------+
|summary|          vendorid|   passenger_count|     trip_distance|        ratecodeid|      pulocationid|      dolocationid|      payment_type|      fare_amount|             extra|            mta_tax|        tip_amount|      tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        airport_fee|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+---------------------+------------------+--------------------+-------------------+
|  count|          18816606|          18

                                                                                

In [8]:
null_counts = sdf.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in sdf.columns])
null_counts.show()




+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|         780804|            0|    780804|            780804|           0|           0|           0|          0|    0|      0|         

                                                                                

In [9]:
sdf_cleaned = sdf.dropna()
sdf_cleaned.count()

                                                                                

18035802

In [10]:
sdf = sdf.filter(F.col('passenger_count').isNotNull())
null_counts = sdf.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in sdf.columns])
null_counts.show()

sdf.count()



                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|              0|            0|         0|                 0|           0|           0|           0|          0|    0|      0|         

18035802

In [12]:
condition = (
    (F.col('trip_distance') > 0) &
    (F.col('passenger_count') > 0) &
    (F.col('fare_amount') > 0) &
    (F.col('extra') >= 0) &
    (F.col('mta_tax') == 0.5) &
    (F.col('tip_amount') >=  0) &
    (F.col('tolls_amount') >=  0) &
    (F.col('improvement_surcharge') >=  0) &
    (F.col('total_amount') > 0) &
    (F.col('congestion_surcharge') >=  0) &
    (F.col('airport_fee') >=  0) &
    (F.col('vendorid').isin([1, 2])) &
    (F.col('ratecodeid').isin([1, 2, 3, 4, 5, 6])) &
    (F.col('payment_type').isin([1, 2, 3, 4, 5, 6])) &
    (F.col('pulocationid') >= 1) &
    (F.col('pulocationid') <= 263)
)
sdf = sdf.withColumn(
    'is_valid_record',
    F.when(condition, True).otherwise(False)
) 
sdf_valid = sdf.filter(F.col('is_valid_record') == True)


print(sdf_valid.count())
sdf_valid.show(20, truncate=100)




16999413
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|is_valid_record|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+
|       1| 2023-07-01 00:29:59|  2023-07-01 00:40:15|              1|          1.8|         1|             false|         140|    

                                                                                