In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
def load_data_from_api(*args, **kwargs):
    """
    Load green taxi data from API
    
    for the final quarter of 2020
    """

    # defining schema
    taxi_dtypes = {
        'VendorID': 'Int64',
        'store_and_fwd_flag': 'str',
        'RatecodeID': 'Int64',
        'PULocationID': 'Int64',
        'DOLocationID': 'Int64',
        'passenger_count': 'Int64',
        'trip_distance': 'float64',
        'fare_amount': 'float64',
        'extra': 'float64',
        'mta_tax': 'float64',
        'tip_amount': 'float64',
        'tolls_amount': 'float64',
        'ehail_fee': 'float64',
        'improvement_surcharge': 'float64',
        'total_amount': 'float64',
        'payment_type': 'float64',
        'trip_type': 'float64',
        'congestion_surcharge': 'float64'
    }
    green_taxi_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

    green_taxi_df = pd.DataFrame() #create empty dataframe

    #read necessary data month by month and concat it 
    for month in range (10, 13):
        green_taxi_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-{str(month)}.csv.gz"

        df = pd.read_csv(green_taxi_url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=green_taxi_dates)
        green_taxi_df = pd.concat([green_taxi_df, df])

    return green_taxi_df

In [3]:
df = load_data_from_api()
df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2020-10-01 00:31:19,2020-10-01 00:34:55,N,1,7,7,1,0.79,5.0,0.5,0.5,1.58,0.0,,0.3,7.88,1.0,1.0,0.0
1,2,2020-10-01 00:42:12,2020-10-01 00:43:51,N,1,179,7,1,0.5,4.0,0.5,0.5,0.0,0.0,,0.3,5.3,2.0,1.0,0.0
2,2,2020-10-01 00:53:09,2020-10-01 00:55:39,N,1,179,223,1,0.6,4.0,0.5,0.5,1.06,0.0,,0.3,6.36,1.0,1.0,0.0
3,1,2020-10-01 00:12:29,2020-10-01 00:20:08,N,1,134,216,2,4.4,13.5,0.5,0.5,0.0,0.0,,0.3,14.8,2.0,1.0,0.0
4,1,2020-10-01 00:32:38,2020-10-01 00:43:02,N,1,82,7,1,2.9,10.5,0.5,0.5,0.0,0.0,,0.3,11.8,2.0,1.0,0.0


In [4]:
def transform(data, *args, **kwargs):
    """
    Transform data according requirements.

    1. Remove rows where the passenger count is equal to 0 and the trip distance is equal to zero.
    2. Create a new column lpep_pickup_date by converting lpep_pickup_datetime to a date.
    3. Rename columns in Camel Case to Snake Case.
    """
    #q4 answer
    print(list(data['VendorID'].unique()))

    print(list(data.columns))
    #  transformation 1
    cleaned_data = data[ (data['passenger_count'] > 0) & (data['trip_distance'] > 0) ]

    #  transformation 2
    cleaned_data['lpep_pickup_date'] = cleaned_data['lpep_pickup_datetime'].dt.date

    #  transformation 3

    snake_names = ['vendor_id', 'lpep_pickup_datetime', 'lpep_dropoff_datetime',

       'store_and_fwd_flag', 'ratecode_id', 'pu_location_id', 'do_location_id',

       'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax',

       'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge',

       'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge', 'lpep_pickup_date']
    
    cleaned_data = cleaned_data.rename(columns=dict(zip(cleaned_data.columns, snake_names)))

    return cleaned_data

In [5]:
cleaned_df = transform(df)
cleaned_df.shape

[2, 1, <NA>]
['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  cleaned_data['lpep_pickup_date'] = cleaned_data['lpep_pickup_datetime'].dt.date


(139370, 21)

In [6]:

# Convert the DataFrame to a pyarrow Table
table = pa.Table.from_pandas(cleaned_df)

# Specify the output folder where the Parquet files will be saved
output_folder = 'D:/dwh/green_taxi'

# Define the partitioning scheme
partition_cols = ['lpep_pickup_date']

# Create the output writer with the desired partitioning scheme
writer = pq.ParquetWriter(output_folder, table.schema, partition_cols=partition_cols)

# Write the DataFrame to Parquet files, partitioned by lpep_pickup_date
writer.write_table(table)

# Close the Parquet writer
writer.close()

PermissionError: [WinError 5] Failed to open local file 'D:/dwh/green_taxi'. Detail: [Windows error 5] Access is denied.
