In [32]:
import requests
import pandas as pd
import pyarrow.parquet as pq
from bs4 import BeautifulSoup

url = 'https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page'

taxi_data_dtypes = {
    'VendorID': pd.Int64Dtype(),
    'lpep_pickup_datetime': 'datetime64[ns]',
    'lpep_dropoff_datetime': 'datetime64[ns]',
    'store_and_fwd_flag': str,
    'RatecodeID': pd.Int64Dtype(),
    'PULocationID': pd.Int64Dtype(),
    'DOLocationID': pd.Int64Dtype(),
    'passenger_count': pd.Int64Dtype(),
    'trip_distance': float,
    'fare_amount': float,
    'extra': float,
    'mta_tax': float,
    'tip_amount': float,
    'tolls_amount': float,
    'improvement_surcharge': float,
    'total_amount': float,
    'payment_type': pd.Int64Dtype(),
    'congestion_surcharge': float
}


def stream_download_parquet(url):
    response = requests.get(url).text
    soup = BeautifulSoup(response, features='html.parser')  # Raise an HTTPError for bad responses

    for i, link in enumerate(soup.findAll('a')):
        href = link.get('href')

        if 'fhv_tripdata_2019' in href:
            parquet_file = pd.read_parquet(href)
            
            yield parquet_file.astype(taxi_data_dtypes)

In [33]:
import dlt

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
generators_pipeline = dlt.pipeline(
    destination='duckdb',
    dataset_name='generators'
)

# we can load the next generator to the same or to a different table.
info = generators_pipeline.run(
    stream_download_parquet(url),
    table_name="stream_download",
    write_disposition="replace"
)

print(info)

PipelineStepFailed: Pipeline execution failed at stage extract when processing package 1709008993.709861 with exception:

<class 'dlt.extract.exceptions.ResourceExtractionError'>
In processing pipe stream_download: extraction of resource stream_download in generator stream_download_parquet caused an exception: "Only a column name can be used for the key in a dtype mappings argument. 'VendorID' not found in columns."

In [3]:
import duckdb

conn = duckdb.connect(f"{generators_pipeline.pipeline_name}.duckdb")

In [4]:
# show tables
conn.sql(f"SET search_path = '{generators_pipeline.dataset_name}'")
print('Loaded tables:')
display(conn.sql("show tables"))

Loaded tables:


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ stream_download     │
└─────────────────────┘

In [5]:
# show data
print("stream_download table below:")
display(conn.sql("SELECT * FROM stream_download"))

stream_download table below:


┌───────────┬──────────────────────┬──────────────────────┬───┬──────────────┬───────────┬──────────────────────┐
│ vendor_id │ lpep_pickup_datetime │ lpep_dropoff_datet…  │ … │ payment_type │ trip_type │ congestion_surcharge │
│   int64   │ timestamp with tim…  │ timestamp with tim…  │   │    int64     │  double   │        double        │
├───────────┼──────────────────────┼──────────────────────┼───┼──────────────┼───────────┼──────────────────────┤
│         2 │ 2018-12-21 08:17:2…  │ 2018-12-21 08:18:5…  │ … │            2 │       1.0 │                 NULL │
│         2 │ 2018-12-31 17:10:1…  │ 2018-12-31 17:16:3…  │ … │            2 │       1.0 │                 NULL │
│         2 │ 2018-12-31 17:27:1…  │ 2018-12-31 17:31:3…  │ … │            1 │       1.0 │                 NULL │
│         2 │ 2018-12-31 17:46:2…  │ 2018-12-31 18:04:5…  │ … │            1 │       1.0 │                 NULL │
│         2 │ 2018-12-31 17:19:0…  │ 2018-12-31 17:39:4…  │ … │            2 │       1.0

In [6]:
# count records
print("stream_download count of records:")
display(conn.sql("SELECT COUNT(*) FROM stream_download"))

stream_download count of records:


┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│      6300985 │
└──────────────┘

In [7]:
query_group_by_date = '''
    SELECT
        lpep_pickup_datetime::DATE AS lpep_pickup_date,
        COUNT(*) AS count
        
    FROM
        stream_download
        
    GROUP BY
        lpep_pickup_datetime::DATE
        
    ORDER BY
        lpep_pickup_datetime::DATE
'''

duck_df = conn.sql(query_group_by_date).df()
duck_df

Unnamed: 0,lpep_pickup_date,count
0,2008-10-21,1
1,2008-12-31,98
2,2009-01-01,11
3,2009-01-04,1
4,2009-01-05,1
...,...,...
383,2020-03-29,1
384,2020-04-01,1
385,2020-04-22,2
386,2035-09-02,1


In [8]:
# for d in duck_df['lpep_pickup_date']:
#     df = conn.sql(f"SELECT '{d.date()}' AS date, COUNT(*) AS count FROM stream_download WHERE lpep_pickup_datetime::DATE = '{d.date()}'").df()

In [28]:
import boto3
# import pyarrow as pa
from io import BytesIO

# Load to S3
def df_to_s3(df: pd.DataFrame, bucket: str, object_key: str) -> None:
    session = boto3.session.Session(profile_name='dez')
    s3_client = session.client('s3')

    # s3_url = f"s3://{bucket}/{prefix}_{d.strftime('%Y%m%d')}.parquet"
    # df.to_parquet(s3_url)

    # https://stackoverflow.com/questions/53416226/how-to-write-parquet-file-from-pandas-dataframe-in-s3-in-python
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, compression='gzip', index=False)

    s3_client.put_object(Bucket=bucket, Key=object_key, Body=out_buffer.getvalue())

In [29]:
BUCKET = 'dez2024-dez-dlt'
PREFIX = 'ny_taxi/green/2019'

for d in duck_df['lpep_pickup_date']:
    df_to_s3(
        df=conn.sql(f"SELECT * FROM stream_download WHERE lpep_pickup_datetime::DATE = '{d.date()}'").df(),
        bucket=BUCKET,
        object_key=f"{PREFIX}/{d.strftime('%Y%m%d')}.parquet.gz"
    )

You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

  df=conn.sql(f"SELECT * FROM stream_download WHERE lpep_pickup_datetime::DATE = '{d.date()}'").df(),
