# Data Engineering Project Instructions

## Preparation
* You will need to install `boto3`
* You will be using `pandas` and `configparser`
* You will need to install two additional libraries to support pandas in reading from S3:
```bash
  conda install s3fs -c conda-forge -y
  conda install fsspec -y
```

## Transformations

You will perform the same transformations you did in the previous class, including:

* __Calculate Trip Duration in Minutes:__ Call it Trip_Duration.
* __Calculate Total Trip Charge:__ Include fare amount, extra, MTA tax, tolls amount, improvement surcharge, congestion surcharge, airport fee, and tip amount. Call it Total_Trip_Charge.
* __Add Trip Date Components:__ Add `Trip_Date`, `Trip_Month`, `Trip_Day`, and `Trip_Year`.
* __Keep Specific Columns:__ `VendorID`, `passenger_count`, `trip_distance`, `store_and_fwd_flag`, `payment_type`, `Trip_Duration`, `Total_Trip_Charge`, `Trip_Date`, `Trip_Month`, `Trip_Year`, `Trip_Day`
* __Reorder the Columns:__ Start with VendorID, followed by all the date/time columns (Date, Year, Month, and Day), then the remaining columns.
* __Rename the Columns:__ Rename columns to Vendor_ID, No_of_Passengers, SF_Flag, Payment_Type.

## Expectation Part 1

* You will create functions to support your ETL Process.
* You will need to capture some statistics for each data set you are processing for reference.
* You will read from a source S3 bucket (raw) and write into a different S3 bucket (transformed):
```python
bucket_dest = 'techcatalyst-transformed'
bucket_source = 'techcatalyst-raw'
```
* Make sure that your files "objects" are under your name inside the `techcatalyst-transformed` bucket. For example:
s3://techcatalyst-transformed/tarek/yellow_tripdata_2024-01_transformed.parquet/Trip_Year=2024/Trip_Month=January/84e2f047dcff4f7183ae25518ecd486b-0.parquet
* Create a function that generates an `s3://` URI. The function should take a bucket name, a file name, and then construct an `s3://` URI that points to the object. Refer to this link for more details on S3 URIs. If you forget how the `s3://` URI looks, please log in to AWS and navigate to S3 services to see how an object inside a bucket is being referenced by the `s3://` URI.
* Create a **cleanup** function that takes three *parameters*: the DataFrame, the name of the file, and the destination bucket. The function will transform the data, then write out a Parquet file to the destination address.
* You may need to create additional functions as necessary.
* Finally, once you write the files to S3, you will also write your statistic file. For example., capture number of rows, number of columns, number of columns with null values, date/time of processing. What else can you think of?
* While processing each file, it would be great to log something on the screen to show progress or status. Here is an example below:

```
processing yellow_tripdata_2024-01.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-01.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-02.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-02.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-03.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-03.parquet to techcatalyst-transformed bucket
...........
processing yellow_tripdata_2024-04.parquet
...........
writing s3://techcatalyst-raw/yellow_tripdata_2024-04.parquet to techcatalyst-transformed bucket
...........
```



# Expectation Part 2
* Once the data has been loaded on AWS, you will need to use Athena to inspect the data to ensure correctness.
* Remember, before using Athena, you will need to go through the process of creating the crawlers and other setup steps.

In [5]:
!conda install -c conda-forge boto3 -y

Channels:
 - conda-forge
 - defaults
Platform: linux-64
Collecting package metadata (repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /home/npatturi/miniconda3/envs/dev1

  added / updated specs:
    - boto3


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    boto3-1.34.134             |     pyhd8ed1ab_0          79 KB  conda-forge
    botocore-1.34.134          |pyge310_1234567_0         6.7 MB  conda-forge
    brotli-python-1.1.0        |  py311hb755f60_1         343 KB  conda-forge
    cffi-1.16.0                |  py311hb3a22ac_0         293 KB  conda-forge
    h2-4.1.0                   |     pyhd8ed1ab_0          46 KB  conda-forge
    hpack-4.0.0                |     pyh9f0ad1d_0          25 KB  conda-forge
    hyperframe-6.0.1           |     pyhd8ed1ab_0          14 KB  conda-forge
    jmespath-1.0.1             |     pyhd8ed1ab_0        

In [120]:
import boto3


In [121]:
import configparser

In [122]:
config = configparser.ConfigParser()

In [123]:
config.read('aws.cfg')

['aws.cfg']

In [124]:
AWS_ACCESS_KEY = config['AWS']['aws_access_key']
AWS_SECRET_KEY = config['AWS']['aws_secret_key']

In [1]:
AWS_SECRET_KEY

NameError: name 'AWS_SECRET_KEY' is not defined

In [126]:
# establish your S3 client 
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-creating-buckets.html
# make sure to pass your credentials: aws_access_key_id and aws_secret_access_key
s3client = boto3.client('s3', aws_access_key_id = AWS_ACCESS_KEY, aws_secret_access_key = AWS_SECRET_KEY)

In [127]:
# submit the query to get back all the buckets in the account and store in the response variable 
response = s3client.list_buckets()

In [128]:
response

{'ResponseMetadata': {'RequestId': '02H0N1RB7SB0TJ3Q',
  'HostId': 'EJnvvsea0XxIwl97JHge12O5e9BOg6K9CsGJPgrFw1ytvWnl1tH+9KVCATlHB8FD6zBWFIJ7frk=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'EJnvvsea0XxIwl97JHge12O5e9BOg6K9CsGJPgrFw1ytvWnl1tH+9KVCATlHB8FD6zBWFIJ7frk=',
   'x-amz-request-id': '02H0N1RB7SB0TJ3Q',
   'date': 'Thu, 27 Jun 2024 17:30:02 GMT',
   'content-type': 'application/xml',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Buckets': [{'Name': 'amsu-aws-s3-assessment',
   'CreationDate': datetime.datetime(2023, 9, 8, 16, 49, 14, tzinfo=tzutc())},
  {'Name': 'amsu-s3-cf-website',
   'CreationDate': datetime.datetime(2023, 9, 7, 13, 50, 19, tzinfo=tzutc())},
  {'Name': 'andrea-lm-code-bucket',
   'CreationDate': datetime.datetime(2020, 2, 6, 17, 40, 27, tzinfo=tzutc())},
  {'Name': 'aryan-aws-glue-activity-raw',
   'CreationDate': datetime.datetime(2024, 6, 20, 17, 59, 50, tzinfo=tzutc())},
  {'Name': 'aryan-aws-glue-acti

In [129]:
# use either a loop or list comprehension to extract the Bucket Names to a list. This is mainly for your reference in terms what are all the buckets available

buckets = [bucket['Name'] for bucket in response['Buckets']]

In [130]:
buckets

['amsu-aws-s3-assessment',
 'amsu-s3-cf-website',
 'andrea-lm-code-bucket',
 'aryan-aws-glue-activity-raw',
 'aryan-aws-glue-activity-transformed',
 'austin-lambda-s3',
 'aws-athena-query-results-us-east-1-535146832369',
 'aws-athena-query-results-us-east-1-535146832369-andy',
 'aws-athena-query-results-us-east-1-535146832369nh',
 'aws-athena-query-results-us-east-1-aryan',
 'aws-athena-query-results-us-east-1-tarek',
 'aws-athena-query-results-us-east-2-535146832369',
 'aws-athena-query-results-us-east-2-535146832369-kb',
 'aws-athena-query-results-us-east-2-nsp',
 'aws-athena-query-results-us-east-2-sp',
 'aws-cloudtrail-logs-535146832369-01f575cd',
 'aws-cloudtrail-logs-535146832369-9d19bcae',
 'aws-glue-assets-535146832369-us-east-1',
 'aws-glue-assets-535146832369-us-east-2',
 'aws-sam-cli-managed-default-samclisourcebucket-1oz2tqbsdli3',
 'aws-sam-cli-managed-default-samclisourcebucket-mgdplk7mifi7',
 'aws-sam-cli-managed-default-samclisourcebucket-t2x2wb9il3p1',
 'aws-sam-cli-ma

In [131]:
# We care only about reading from ther 'techcatalyst-raw' bucket
bucket_name = 'techcatalyst-raw'

In [132]:
# Get all the objects inside that bucket. Once you get response, you can list all the keys. Remember, like a dictionary you have Key-Value pairs. We care about the Contents key.



In [133]:
# Print out the objects available. There are the Parquet files.

In [134]:
# Store them into a Python list to use later 

In [135]:
import pandas as pd

In [114]:
# if you get this error: ImportError: Missing optional dependency 'fsspec'.  Use pip or conda to install fsspec. you just need to uncomment the below
!conda install fsspec -y

Retrieving notices: ...working... done
Channels:
 - defaults
 - conda-forge
Platform: linux-64
Collecting package metadata (repodata.json): done
Solving environment: done

# All requested packages already installed.



In [115]:
# uncomment if you did not install s3fs
!conda install s3fs -c conda-forge -y

Channels:
 - conda-forge
 - defaults
Platform: linux-64
Collecting package metadata (repodata.json): done
Solving environment: done

# All requested packages already installed.



In [136]:
# run this example to make sure everything works

pd.read_parquet('s3://techcatalyst-raw/yellow_tripdata_2024-01.parquet',
               storage_options={
                   'key' : AWS_ACCESS_KEY,
                   'secret' : AWS_SECRET_KEY
               })

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1.0,1.72,1.0,N,186,79,2,17.70,1.00,0.5,0.00,0.00,1.0,22.70,2.5,0.0
1,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1.0,1.80,1.0,N,140,236,1,10.00,3.50,0.5,3.75,0.00,1.0,18.75,2.5,0.0
2,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1.0,4.70,1.0,N,236,79,1,23.30,3.50,0.5,3.00,0.00,1.0,31.30,2.5,0.0
3,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1.0,1.40,1.0,N,79,211,1,10.00,3.50,0.5,2.00,0.00,1.0,17.00,2.5,0.0
4,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1.0,0.80,1.0,N,211,148,1,7.90,3.50,0.5,3.20,0.00,1.0,16.10,2.5,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2964619,2,2024-01-31 23:45:59,2024-01-31 23:54:36,,3.18,,,107,263,0,15.77,0.00,0.5,2.00,0.00,1.0,21.77,,
2964620,1,2024-01-31 23:13:07,2024-01-31 23:27:52,,4.00,,,114,236,0,18.40,1.00,0.5,2.34,0.00,1.0,25.74,,
2964621,2,2024-01-31 23:19:00,2024-01-31 23:38:00,,3.33,,,211,25,0,19.97,0.00,0.5,0.00,0.00,1.0,23.97,,
2964622,2,2024-01-31 23:07:23,2024-01-31 23:25:14,,3.06,,,107,13,0,23.88,0.00,0.5,5.58,0.00,1.0,33.46,,


## Putting it all together

In [144]:
# Create a function that generates an s3:// URI 
# The function takes a bucket name, a file name and then constructs an s3:// uri that points to the object  

# https://repost.aws/questions/QUFXlwQxxJQQyg9PMn2b6nTg/what-is-s3-uri-in-simple-storage-service

# if you forgot how the s3:// uri looks like then please login to AWS and navigate to S3 services and see how an object inside a bucket is being referenced by the s3:// uri

def generate_url(bucket, file):
    uri = f's3://{bucket}/Sriya/{file}'
    return uri 

In [145]:
# Create a cleanup function that takes three parameters: the DataFrame, the name of the file, the destination bucket

# the function will transform the data then write out a Parquet file to the destination address 
from datetime import datetime

def cleanup(df, name, dest):
    print(f'processing {name}')
    print("..........")

    df['Trip_Duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() // 60
    df['Total_Trip_Charge'] = df['fare_amount'] + df['extra'] + df['mta_tax'] + df['tolls_amount'] + df['improvement_surcharge'] + df['congestion_surcharge'] + df['Airport_fee'] + df['tip_amount']

    df['Trip_Date'] = pd.to_datetime(df['tpep_pickup_datetime'].dt.date)
    df['Trip_Month'] = df['Trip_Date'].dt.month_name()
    df['Trip_Day'] = df['Trip_Date'].dt.day_name()
    df['Trip_Year'] = df['Trip_Date'].dt.year

    cols = ['VendorID', 'Trip_Date', 'Trip_Year', 'Trip_Month', 'Trip_Day',
       'passenger_count', 'trip_distance', 'store_and_fwd_flag',
       'payment_type', 'Trip_Duration',
       'Total_Trip_Charge' ]
    df = df[cols]

    df.rename(columns={
    'VendorID': 'Vendor_ID',
    'passenger_count': 'No_of_Passengers',
    'store_and_fwd_flag': 'SF_Flag',
    'payment_type': 'Payment_Type'
}, inplace=True)

    # df.to_parquet('Yellow Taxi Transformed.parquet', partition_cols=['Trip_Year', 'Trip_Month'])
    uri = generate_url(dest, name)
    print(f'{name} to {dest} bucket')
    print("..........")
    df.to_parquet(uri, partition_cols=['Trip_Year', 'Trip_Month'], storage_options= {
                         'key': AWS_ACCESS_KEY,
                         'secret': AWS_SECRET_KEY
                     })
    
    

    
    
    

   # Write your code here


    return None # replace None with the proper value 


In [146]:
# processing yellow_tripdata_2024-01.parquet
# ...........
# writing s3://techcatalyst-raw/yellow_tripdata_2024-01.parquet to techcatalyst-transformed bucket
# ...........
# processing yellow_tripdata_2024-02.parquet
# ...........
# writing s3://techcatalyst-raw/yellow_tripdata_2024-02.parquet to techcatalyst-transformed bucket
# ...........
# processing yellow_tripdata_2024-03.parquet
# ...........
# writing s3://techcatalyst-raw/yellow_tripdata_2024-03.parquet to techcatalyst-transformed bucket
# ...........
# processing yellow_tripdata_2024-04.parquet
# ...........
# writing s3://techcatalyst-raw/yellow_tripdata_2024-04.parquet to techcatalyst-transformed bucket



bucket_dest = 'techcatalyst-transformed'
bucket_source = 'techcatalyst-raw'

objects = s3client.list_objects_v2(Bucket=bucket_source)
myfiles = [obj['Key'] for obj in objects['Contents']]
# print(myfiles)

for i in range(len(myfiles)):
    imported_df = pd.read_parquet(f's3://{bucket_source}/{myfiles[i]}', 
                     storage_options= {
                         'key': AWS_ACCESS_KEY,
                         'secret': AWS_SECRET_KEY
                     })
    cleanup(imported_df, myfiles[i], bucket_dest)
    
    
    
    
# # Write your code that leverages the functions you just created 
# # Your functions should perform the ETL but should also print out some status as shown

processing yellow_tripdata_2024-01.parquet
..........


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={


yellow_tripdata_2024-01.parquet to techcatalyst-transformed bucket
..........
processing yellow_tripdata_2024-02.parquet
..........


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={


yellow_tripdata_2024-02.parquet to techcatalyst-transformed bucket
..........
processing yellow_tripdata_2024-03.parquet
..........


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={


yellow_tripdata_2024-03.parquet to techcatalyst-transformed bucket
..........
processing yellow_tripdata_2024-04.parquet
..........


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={


yellow_tripdata_2024-04.parquet to techcatalyst-transformed bucket
..........


In [147]:
# Example of the statistics collected from each file before writing (loading). Can you think of other information you can capture? Maybe stats before and after?