In [2]:
import pandas as pd 
import requests

## Loader

In [None]:

import io
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    url_202010 = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-10.csv.gz'
    url_202011 = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-11.csv.gz'
    url_202012 = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-12.csv.gz'
    # DEFINE A DICTIONARY OF DATA TYPES FOR THE NON DATETIME COLUMNS 
    taxi_dtypes = {
        'VendorID':pd.Int64Dtype(),
        'store_and_fwd_flag':str,
        'RatecodeID':pd.Int64Dtype(),
        'PULocationID':pd.Int64Dtype(),
        'DOLocationID':pd.Int64Dtype(),
        'passenger_count':pd.Int64Dtype(),
        'trip_distance':float,
        'fare_amount':float,
        'extra':float,
        'mta_tax':float,
        'tip_amount':float,
        'tolls_amount':float,
        'ehail_fee':float,
        'improvement_surcharge':float,
        'total_amount':float,
        'payment_type':float,
        'trip_type':float,
        'congestion_surcharge':float
    }

    # CREATE A LIST OF DATETIME COLUMNS.
    # The list will be passed to the read_csv function and pandas will parse the columns as dates with the appropriate time stamps.  
    parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']  

    # read_csv LOADS A CSV FILE INTO A DATAFRAME. THIS BLOCK RETURNS THAT DF. 
    df1 = pd.read_csv(url_202010, sep=',', compression="gzip", dtype=taxi_dtypes, parse_dates=parse_dates)
    df2 = pd.read_csv(url_202011, sep=',', compression="gzip", dtype=taxi_dtypes, parse_dates=parse_dates)
    df3 = pd.read_csv(url_202012, sep=',', compression="gzip", dtype=taxi_dtypes, parse_dates=parse_dates)
    combined_df = pd.concat([df1, df2, df3], ignore_index=True)

    # return pd.read_csv(url_202010, sep=',', compression="gzip", dtype=taxi_dtypes, parse_dates=parse_dates)
    return combined_df

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'



## Transform

In [None]:
# import pyarrow as pa
# import pyarrow.parquet as pq
# import os

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    # PRINT COUNTS OF RECORDS WITH 
    # print(f"Preprocessing: rows with zero passengers:{data['passenger_count'].isin([0]).sum()}")

    # RETURN FILTERED DATA SET
    data = data[(data['passenger_count']>0) & (data['trip_distance']> 0) ]
    data['lpep_pickup_date '] = data['lpep_pickup_datetime'].dt.date
    data = data.rename(columns={'VendorID': 'vendor_id'})
    # # data.rename(columns = {'VenderID'}:{'vender_id'},inplace = True)
    # # data = data.rename(columns={"VenderID": "vender_id"},inplace = True)

    data.columns = (data.columns
                    .str.replace(' ', '_')
                    .str.lower()
    )

    return data


@test
# CHECK THAT THERE ARE NO RECORDS WITH 0 PASSENGER COUNT
def test_output(output, *args):
    assert output['passenger_count'].all()> 0, 'There are rides with zero passengers'
    assert (output['trip_distance'] > 0).all()  
    assert 'vendor_id' in list(output.columns)


## Exporter

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter



# MANUALLY DEFINE THE CREDENTIALS
# Set the environment variable to the location of the mounted key. json
# This will tell pyarrow where our credentials are
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/src/de-project-datatalksclub-23996505fb10.json"

# Define the bucket, project, and table  
bucket_name = 'mage-zoomcamp-saki-001'
project_id = 'de-project-datatalksclub'
table_name = 'green_taxi'          

root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data(data, *args, **kwargs):
    # define the column to partition on 
    # create a date column from the timestamp so that we can partition on date
    data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date

    # define the pyarrow table and read the df into it
    table = pa.Table.from_pandas(data)

    # define file system - the google cloud object that is going to authorize using the environmental variable automatically
    gcs = pa.fs.GcsFileSystem()

    # write to the dataset using a parquet function
    pq.write_to_dataset(
        table, 
        root_path=root_path, 
        partition_cols=['lpep_pickup_date'], # needs to be a list
        filesystem=gcs
    )


