In [10]:
%profile "cloudbend"

%glue_version "3.0"
%iam_role "arn:aws:iam::898546127587:role/GlueSessions"
%idle_timeout 60

%worker_type "G.1X"
%number_of_workers 10

Previous profile: None
Setting new profile to: cloudbend
Setting Glue version to: 3.0
Current iam_role is None
iam_role has been set to arn:aws:iam::898546127587:role/GlueSessions.
Current idle_timeout is 2880 minutes.
idle_timeout has been set to 60 minutes.


Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 10
Additional python modules to be included:
tqdm


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()

Authenticating with profile=cloudbend
glue_role_arn defined by user: arn:aws:iam::898546127587:role/GlueSessions
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 10
Session ID: 3ec187bb-7a86-4fb0-8dca-c2c7f39091c2
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
--additional-python-modules tqdm
Waiting for session 3ec187bb-7a86-4fb0-8dca-c2c7f39091c2 to get into ready status...
Session 3ec187bb-7a86-4fb0-8dca-c2c7f39091c2 has been created.



In [2]:
import boto3

s3 = boto3.client("s3")

# list all files
response = s3.list_objects_v2(Bucket="nyc-tlc", Prefix="trip data/")
paths = [f"s3://nyc-tlc/{x['Key']}" for x in response["Contents"]]

# only include parquet files
paths = [x for x in paths if x.endswith(".parquet")]

len(paths)

431


In [21]:
from pyspark.sql.functions import col, lit

bucket = "pyicebergbenchmark-icebergbucket89dd3fa6-1qhkzajoxgpzo"

column_renames = {
    "pulocationid": "pu_location_id",
    "dolocationid": "do_location_id",
    "affiliated_base_number": "affiliated_base_num",
    "vendorid": "vendor_id",
    "ratecodeid": "rate_code_id",
}

column_dtypes = {
    "pickup_datetime": "timestamp",
    "dropoff_datetime": "timestamp",
    "pu_location_id": "string",
    "do_location_id": "string",
    "trip_type": "string",
    "dispatching_base_num": "string",
    "affiliated_base_num": "string",
    "rate_code_id": "string",
    "rate_code": "string",
    "improvement_surcharge": "double",
    "congestion_surcharge": "double",
    "ehail_fee": "double",
    "vendor_id": "string",
    "sr_flag": "string",
    "airport_fee": "double",
    "payment_type": "string",
    "passenger_count": "double",
    "wav_match_flag": "string",
}

def standardize_fragment(path: str, prefix: str) -> int:
    df = spark.read.parquet(path)

    # de-duplicate columns
    columns = [c.lower() for c in df.columns]
    df = df.select(*list(set(columns)))

    # rename columns
    columns = [c.lower() for c in df.columns]
    for old, new in column_renames.items():
        if old in columns:
            df = df.withColumnRenamed(old, new)

    # normalize types
    columns = [c.lower() for c in df.columns]
    for column, dtype in column_dtypes.items():
        if column in columns:
            df = df.withColumn(column, col(column).cast(dtype))

    total = df.count()

    df.write.mode("overwrite").parquet(f"s3://{bucket}/raw/{prefix}/")

    return total




In [22]:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    totals = executor.map(
        lambda args: standardize_fragment(*args),
        [(path, str(i)) for i, path in enumerate(paths)]
    )

    sum(totals)

3318228997


In [23]:
df = (
    spark.read
        .option("mergeSchema", "true")
        .parquet(f"s3://{bucket}/raw/*/")
)

df.printSchema()

root
 |-- affiliated_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pu_location_id: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- sr_flag: string (nullable = true)
 |-- do_location_id: string (nullable = true)
 |-- access_a_ride_flag: string (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- wav_match_flag: string (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- hvfhs_license_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- tolls: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- requ

In [24]:
df.show()

+-------------------+-------------------+--------------+--------------------+-------------------+-------+--------------+------------------+-----------+--------------+---------+----------+-----------------+--------------------+-------------------+----+--------------------+-------------------+-----+-------------------+-----------------+-------------------+----------------+---------+----+----------+---------------------+---------+-----------+--------------------+------------+---------------+------------------+------------+------------+----------+---------+-------------+-----+-------+---------+------------+---------------------+---------+-----------------+--------+---------+-------+---------------------+-------+---------+---------+--------------------+-------+---------+---------+-----------+----------------+-----------------+---------------+----------------+-----------------+---------------------+--------------------+
|affiliated_base_num|    pickup_datetime|pu_location_id|dispatching_base

In [12]:
%stop_session

Stopping session: 3ec187bb-7a86-4fb0-8dca-c2c7f39091c2
Stopped session.
