# Data Engineering Project Instructions

## Preparation
* You will need to install `boto3`
* You will be using `pandas` and `configparser`
* You will need to install two additional libraries to support pandas in reading from S3:
```bash
  conda install s3fs -c conda-forge -y
  conda install fsspec -y
```

## Transformations

You will perform the same transformations you did in the previous class, including:

* __Calculate Trip Duration in Minutes:__ Call it Trip_Duration.
* __Calculate Total Trip Charge:__ Include fare amount, extra, MTA tax, tolls amount, improvement surcharge, congestion surcharge, airport fee, and tip amount. Call it Total_Trip_Charge.
* __Add Trip Date Components:__ Add `Trip_Date`, `Trip_Month`, `Trip_Day`, and `Trip_Year`.
* __Keep Specific Columns:__ `VendorID`, `passenger_count`, `trip_distance`, `store_and_fwd_flag`, `payment_type`, `Trip_Duration`, `Total_Trip_Charge`, `Trip_Date`, `Trip_Month`, `Trip_Year`, `Trip_Day`
* __Reorder the Columns:__ Start with VendorID, followed by all the date/time columns (Date, Year, Month, and Day), then the remaining columns.
* __Rename the Columns:__ Rename columns to Vendor_ID, No_of_Passengers, SF_Flag, Payment_Type.

## Expectation Part 1

* You will create functions to support your ETL Process.
* You will need to capture some statistics for each data set you are processing for reference.
* You will read from a source S3 bucket (raw) and write into a different S3 bucket (transformed):
```python
bucket_dest = 'techcatalyst-transformed'
bucket_source = 'techcatalyst-raw'
```
* Make sure that your files "objects" are under your name inside the `techcatalyst-transformed` bucket. For example:
s3://techcatalyst-transformed/tarek/yellow_tripdata_2024-01_transformed.parquet/Trip_Year=2024/Trip_Month=January/84e2f047dcff4f7183ae25518ecd486b-0.parquet
* Create a function that generates an `s3://` URI. The function should take a bucket name, a file name, and then construct an `s3://` URI that points to the object. Refer to this link for more details on S3 URIs. If you forget how the `s3://` URI looks, please log in to AWS and navigate to S3 services to see how an object inside a bucket is being referenced by the `s3://` URI.
* Create a **cleanup** function that takes three *parameters*: the DataFrame, the name of the file, and the destination bucket. The function will transform the data, then write out a Parquet file to the destination address.
* You may need to create additional functions as necessary.
* Finally, once you write the files to S3, you will also write your statistic file. For example., capture number of rows, number of columns, number of columns with null values, date/time of processing. What else can you think of?
* While processing each file, it would be great to log something on the screen to show progress or status. Here is an example below:

```
processing yellow_tripdata_2024-01.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-01.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-02.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-02.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-03.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-03.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-04.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-04.parquet to techcatalyst-transformed bucket
...........
```



# Expectation Part 2
* Once the data has been loaded on AWS, you will need to use Athena to inspect the data to ensure correctness.
* Remember, before using Athena, you will need to go through the process of creating the crawlers and other setup steps.

In [1]:
import boto3


In [2]:
import configparser
config = configparser.ConfigParser()

# read the cfg file
config.read('../datasets/aws.cfg')

AWS_ACCESS_KEY = config['AWSDEV']['aws_access_key']
AWS_SECRET_KEY = config['AWSDEV']['aws_secret_key']

In [3]:
s3_client = boto3.client('s3', 
                             aws_access_key_id = AWS_ACCESS_KEY,
                             aws_secret_access_key = AWS_SECRET_KEY)

In [4]:
response = s3_client.list_buckets()

In [5]:
buckets = [bucket['Name'] for bucket in response['Buckets']]

In [6]:
bucket_name = 'techcatalyst-raw'

In [7]:
response_objs = s3_client.list_objects_v2(Bucket=bucket_name)
response_objs.keys()

dict_keys(['ResponseMetadata', 'IsTruncated', 'Contents', 'Name', 'Prefix', 'MaxKeys', 'EncodingType', 'KeyCount'])

In [8]:
for obj in response_objs['Contents']:
    print(obj['Key'])

yellow_tripdata_2024-01.parquet
yellow_tripdata_2024-02.parquet
yellow_tripdata_2024-03.parquet
yellow_tripdata_2024-04.parquet


In [9]:
files = [obj['Key'] for obj in response_objs['Contents']]
files

['yellow_tripdata_2024-01.parquet',
 'yellow_tripdata_2024-02.parquet',
 'yellow_tripdata_2024-03.parquet',
 'yellow_tripdata_2024-04.parquet']

In [10]:
s3_urls = []
for file in files:
    url = f"s3://{bucket_name}/{file}"
    s3_urls.append(url)

In [11]:
s3_urls

['s3://techcatalyst-raw/yellow_tripdata_2024-01.parquet',
 's3://techcatalyst-raw/yellow_tripdata_2024-02.parquet',
 's3://techcatalyst-raw/yellow_tripdata_2024-03.parquet',
 's3://techcatalyst-raw/yellow_tripdata_2024-04.parquet']

In [12]:
s3_urls[0].split('.')

['s3://techcatalyst-raw/yellow_tripdata_2024-01', 'parquet']

In [13]:
s3_urls[0].split('.')[0] + '_transformed'

's3://techcatalyst-raw/yellow_tripdata_2024-01_transformed'

In [14]:
import pandas as pd

In [15]:
# !conda install fsspec -y

In [16]:
# !conda install s3fs -c conda-forge -y

In [17]:
# pd.read_parquet('s3://techcatalyst-raw/yellow_tripdata_2024-01.parquet',
#                storage_options={
#                    'key' : AWS_ACCESS_KEY,
#                    'secret' : AWS_SECRET_KEY
#                })

## Putting it all together

In [15]:
# Create a function that generates an s3:// URI 
# The function takes a bucket name, a file name and then constructs an s3:// uri that points to the object  

# https://repost.aws/questions/QUFXlwQxxJQQyg9PMn2b6nTg/what-is-s3-uri-in-simple-storage-service

# if you forgot how the s3:// uri looks like then please login to AWS and navigate to S3 services and see how an object inside a bucket is being referenced by the s3:// uri

def generate_url(bucket, file):
        url = f"s3://{bucket_name}/{file}"
        return url

In [22]:
# Create a cleanup function that takes three parameters: the DataFrame, the name of the file, the destination bucket

# the function will transform the data then write out a Parquet file to the destination address 
from datetime import datetime

def cleanup(df, name, dest):

    # calculate Trip Duration in Minutes 
    print(f'processing {name}')
    print('...........')
    df['Trip_Duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() // 60
    
    # Calculate Total Trip Charge to inclide: fare amount, extra, mta_tax, tolls amount, improvement surcharge, congestion surcharge, airport fee, and tip amount
    df['Total_Trip_Charge'] = df['fare_amount'] + df['extra'] + df['mta_tax'] + df['tolls_amount'] + df['improvement_surcharge'] + df['congestion_surcharge'] + df['Airport_fee'] + df['tip_amount']
    
    # Add Trip Date, Trip Month, Trip Day, and Trip Year
    
    df['Trip_Date'] = pd.to_datetime(df['tpep_pickup_datetime'].dt.date)
    df['Trip_Month'] = df['Trip_Date'].dt.month_name()
    df['Trip_Day'] = df['Trip_Date'].dt.day_name()
    df['Trip_Year'] = df['Trip_Date'].dt.year
    
    
    cols = ['VendorID', 'Trip_Date', 'Trip_Year', 'Trip_Month', 'Trip_Day',
           'passenger_count', 'trip_distance', 'store_and_fwd_flag',
           'payment_type', 'Trip_Duration',
           'Total_Trip_Charge' ]
    
    df = df[cols]
    df = df.rename(columns={
        'VendorID': 'Vendor_ID',
        'passenger_count': 'No_of_Passengers',
        'store_and_fwd_flag': 'SF_Flag',
        'payment_type': 'Payment_Type'
    })
    
    print(f'writing {obj} to {dest} bucket')
    print('...........')

    number_of_records = df.shape[0]
    number_of_columns = df.shape[1]
    number_of_columns_with_na = (df.isnull().sum() > 0).sum().tolist()
    
    

    stats = {
        'file_name': [name],
        'number_of_records': [number_of_records],
        'number_of_cols': [number_of_columns],
        'number_of_cols_with_na': [number_of_columns_with_na],
        'date_time':  f'{datetime.now()}'
    }

    file = f"s3://{dest}/tarek/{name.split('.')[0]}_transformed.parquet"
    

    df.to_parquet(file, partition_cols=['Trip_Year', 'Trip_Month'],
                  storage_options={
                   'key' : AWS_ACCESS_KEY,
                   'secret' : AWS_SECRET_KEY
               })


    return pd.DataFrame(stats)


In [23]:
%%time
bucket_dest = 'techcatalyst-transformed'
bucket_source = 'techcatalyst-raw'


stats_df = pd.DataFrame(columns=['file_name', 'number_of_records', 'number_of_cols', 'number_of_cols_with_na', 'date_time'])

for file in files:
    obj = generate_url(bucket_source, file)
    df = pd.read_parquet(obj,
               storage_options={
                   'key' : AWS_ACCESS_KEY,
                   'secret' : AWS_SECRET_KEY
               })
    stats_returned = cleanup(df, file, bucket_dest)
    stats_df = pd.concat([stats_df, stats_returned])

processing yellow_tripdata_2024-01.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-01.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-02.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-02.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-03.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-03.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-04.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-04.parquet to techcatalyst-transformed bucket
...........
CPU times: user 16.8 s, sys: 3.35 s, total: 20.2 s
Wall time: 8min 23s


In [24]:
stats_df.reset_index(drop=True, inplace=True)
stats_df

Unnamed: 0,file_name,number_of_records,number_of_cols,number_of_cols_with_na,date_time
0,yellow_tripdata_2024-01.parquet,2964624,11,3,2024-06-28 13:25:48.855560
1,yellow_tripdata_2024-02.parquet,3007526,11,3,2024-06-28 13:26:40.752699
2,yellow_tripdata_2024-03.parquet,3582628,11,3,2024-06-28 13:27:39.263565
3,yellow_tripdata_2024-04.parquet,3514289,11,3,2024-06-28 13:28:25.298665
