# 1. Downloading dependencies

In [None]:
import sys
import logging
from awsglue.job import Job
from awsglue.transforms import *
from pyspark.sql import SparkSession
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from com.amazonaws.services.glue import DynamicFrame

In [None]:
# Creating logger object so it's easier to debug looking at CloudWatch logs
logger = logging.getLogger()
logger.setLevel(logging.INFO)

I created this code firstly in plain PySpark in a colab enviroment, so I could make some explanatory analysis. That code is [here]("https://github.com/luis-fnogueira/trips-data/blob/main/trip-data.ipynb"). Then, thereafter, I adapted that code to Glue Job.

## 1.1 Creating GlueContext and loading data

Firstly I'm creating a Spark Dataframe to convert in a Glue Dynamic DF later.

In [None]:
spark = SparkSession.builder.master("local[*]").appName("trips_data").getOrCreate()

In [None]:
spark

In [None]:
glue_context = GlueContext(spark.sparkContext)

In [None]:
## params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job = Job(glue_context)
job.init(args['JOB_NAME'], args)

In [None]:
df_dynamic = glue_context.create_dynamic_frame_from_options(connection_type='s3',
                                                            connection_options={'paths': ['s3://903442739132-source-bucket-trips-data-01/year=2021/*']},
                                                            format="parquet")

In [None]:
logger.info(f'printSchema: {df_dynamic.printSchema()}')

# 2. Separating files by type of license

As described in the dataset documentation:

**Field name**: hvfhs_license_num

**Description**: the TLC license number of the HVFHS base or business as of September 2019, the HVFHS licensees are the following: HV0002: Juno, HV0003: Uber, HV0004: Via and HV0005: Lyft

In [None]:
df_uber = df_dynamic.filter(f=lambda x: x["hvfhs_license_num"] in "HV0003")

In [None]:
df_via = df_dynamic.filter(f=lambda x: x["hvfhs_license_num"] in "HV0004")

In [None]:
df_lyft = df_dynamic.filter(f=lambda x: x["hvfhs_license_num"] in "HV0005")

In [None]:
glue_context.write_dynamic_frame.from_options(
    frame=df_uber,
    connection_type='s3',
    connection_options={
        'path': 's3://903442739132-type-of-license-bucket/year=2021/',
    },
    format='csv',
    format_options={
        'separator': ","
    }
)

In [None]:
logger.info(f'Created df_uber file')

In [None]:
glue_context.write_dynamic_frame.from_options(
    frame=df_via,
    connection_type='s3',
    connection_options={
        'path': 's3://903442739132-type-of-license-bucket/year=2021/',
    },
    format='csv',
    format_options={
        'separator': ","
    }
)

In [None]:
logger.info(f'Created df_via file')

In [None]:
glue_context.write_dynamic_frame.from_options(
    frame=df_lyft,
    connection_type='s3',
    connection_options={
        'path': 's3://903442739132-type-of-license-bucket/year=2021/',
    },
    format='csv',
    format_options={
        'separator': ","
    }
)

In [None]:
logger.info(f'Created df_lyft file')

# 3. Setting aside only rides that were not shared

In [None]:
df_uber = df_dynamic.filter(f=lambda x: x["shared_match_flag "] not in "Y")

In [None]:
glue_context.write_dynamic_frame.from_options(
    frame=df_lyft,
    connection_type='s3',
    connection_options={
        'path': 's3://903442739132-trips-not-shared-bucket/year=2021/',
    },
    format='csv',
    format_options={
        'separator': ","
    }
)

In [None]:
logger.info(f'Created trips not shared')

# 4. Selecting trips which the driver received more than 10 dollars

In [None]:
from pyspark.sql.functions import floor

In [None]:
df = df_dynamic.toDF()

In [None]:
df = df.withColumn("grouped_driver_pay", floor(df["driver_pay"]/10)*10)

In [None]:
df_over_10 = df.filter(df.grouped_driver_pay >10)

In [None]:
dyf_grouped = DynamicFrame.fromDF(df_over_10, glue_context)

In [None]:
glue_context.write_dynamic_frame.from_options(
    frame=dyf_grouped,
    connection_type='s3',
    connection_options={
        'path': 's3://903442739132-trips-more-than-10-dollars-bucket/year=2021/',
    },
    format='csv',
    format_options={
        'separator': ","
    }
)

In [None]:
job.commit()