In [5]:
%idle_timeout 60
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 60 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 2


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::176256382487:role/almighty
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: 028c5944-bf45-4406-b44f-427cc3c7abe2
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 028c5944-bf45-4406-b44f-427cc3c7abe2 to get into ready status...
Session 028c5944-bf45-4406-b44f-427cc3c7abe2 has been created.



In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(
    database='taxitripsdb',
    table_name='raw_taxi_data',
    transformation_ctx='dyf',
)

df = dyf.toDF()




## Cleaning

In [3]:
from pyspark.sql.functions import when, col




**Remove rows with null in important columns**

In [4]:
df = df.na.drop(subset=['total_amount', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PULocationID', 'DOLocationID'])




**Standardize columns name**

In [5]:
columns_to_be_renamed = [
    ['VendorID', 'vendor_id'],
    ['tpep_pickup_datetime', 'pickup_datetime'],
    ['tpep_dropoff_datetime', 'dropoff_datetime'],
    ['RatecodeID', 'rate_code_id'],
    ['store_and_fwd_flag', 'store_and_forward'],
    ['PULocationID', 'pickup_location_id'],
    ['DOLocationID', 'dropoff_location_id'],
    ['extra', 'extra_amount'],
    ['Airport_fee', 'airport_fee_amount'],
    ['payment_type', 'payment_type_id']
]

for [old_name, new_name] in columns_to_be_renamed:
    df = df.withColumnRenamed(old_name, new_name)




**Make sure values are within range**

In [6]:
df = df.withColumn('vendor_id', when(col('vendor_id').isin([1, 2]), col('vendor_id')).otherwise(3))




In [7]:
df = df.withColumn('store_and_forward', when(col('store_and_forward') == 'Y', True).otherwise(False))




## Aggregation

In [8]:
def get_kilometers_from_miles(miles):
    try:
        return miles * 1.60934
    except:
        return 0

def get_payment_type(payment_type_id):
    _map = {
        1: 'Credit card',
        2: 'Cash',
        3: 'No charge',
        4: 'Dispute',
        6: 'Voided trip'
    }

    return _map[payment_type_id] if payment_type_id in _map else 'Unknown'

def get_rate_code(rate_code_id):
    _map = {
        1: 'Standard',
        2: 'Airport (JFK)',
        3: 'Newark',
        4: 'Nassau or Nassau or Westchester',
        5: 'Negotiated',
        6: 'Group ride'
    }

    return _map[rate_code_id] if rate_code_id in _map else 'Unknown'

def get_vendor(vendor_id):
    _map = {
        1: 'Creative Mobile Technologies',
        2: 'VeriFone'
    }

    return _map[vendor_id] if vendor_id in _map else 'Unknown'

def aggregator(df):
    row = df.asDict()
    row['payment_type'] = get_payment_type(row['payment_type_id'])
    row['rate_code'] = get_rate_code(row['rate_code_id'])
    row['vendor'] = get_vendor(row['vendor_id'])
    row['trip_distance'] = get_kilometers_from_miles(row['trip_distance'])

    return row
    

rdd = df.rdd.map(aggregator)
df = rdd.toDF()




**Add location data**

In [9]:
df.createOrReplaceTempView('taxi_trips')
df_taxi_zones = spark.read.format('csv').option('header', 'true').load('s3://raw-taxi-data/taxi_zone_lookup.csv')
df_taxi_zones.createOrReplaceTempView('taxi_zones')




In [10]:
no_ids_columns = filter(lambda column: '_id' not in column, df.columns)

df = spark.sql(f"""
SELECT {','.join(no_ids_columns)}, partition_0, partition_1,
taxi_zones_pickup.Zone as pickup_location_zone, 
taxi_zones_pickup.Borough as pickup_location_borough, 
taxi_zones_dropoff.Zone as dropoff_location_zone, 
taxi_zones_dropoff.Borough as dropoff_location_borough
FROM taxi_trips
JOIN taxi_zones AS taxi_zones_pickup on taxi_zones_pickup.LocationID = taxi_trips.pickup_location_id
JOIN taxi_zones AS taxi_zones_dropoff on taxi_zones_dropoff.LocationID = taxi_trips.dropoff_location_id
""")




In [11]:
df.printSchema()

root
 |-- airport_fee_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- extra_amount: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- partition_0: string (nullable = true)
 |-- partition_1: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_forward: boolean (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- vendor: string (nullable = true)
 |-- partition_0: string (nullable = true)
 |-- partition_1: string (nullable = true)
 |-- pickup_location_zone: string (nullable = 

#### Load into transformed_taxi_trips S3

In [12]:
from awsglue.dynamicframe import DynamicFrame

dyf = DynamicFrame.fromDF(df, glueContext, "dyf")

transformed_taxi_data_bucket = glueContext.write_dynamic_frame.from_options(
    frame=dyf,
    connection_type='s3',
    format='csv',
    connection_options={'path': 's3://transformed-taxi-data', 'partitionKeys': ['partition_0', 'partition_1']},
    transformation_ctx='transformed_taxi_data_bucket',
)

job.commit()




In [13]:
dyf.show()

{"airport_fee_amount": 1.25, "congestion_surcharge": 0.0, "dropoff_datetime": 2023-03-25 18:54:48.0, "extra_amount": 0.0, "fare_amount": 63.9, "improvement_surcharge": 1.0, "mta_tax": 0.5, "partition_0": "2023", "partition_1": "03", "passenger_count": 1.0, "payment_type": "Credit card", "pickup_datetime": 2023-03-25 18:13:11.0, "rate_code": "Standard", "store_and_forward": false, "tip_amount": 3.0, "tolls_amount": 0.0, "total_amount": 69.65, "trip_distance": 25.9747476, "vendor": "VeriFone", "pickup_location_zone": "JFK Airport", "pickup_location_borough": "Queens", "dropoff_location_zone": "Long Island City/Hunters Point", "dropoff_location_borough": "Queens"}
{"airport_fee_amount": 0.0, "congestion_surcharge": 2.5, "dropoff_datetime": 2023-03-11 11:51:28.0, "extra_amount": 0.0, "fare_amount": 12.8, "improvement_surcharge": 1.0, "mta_tax": 0.5, "partition_0": "2023", "partition_1": "03", "passenger_count": 1.0, "payment_type": "Credit card", "pickup_datetime": 2023-03-11 11:40:55.0, "