In [2]:
from dotenv import load_dotenv
from bs4 import BeautifulSoup

import requests
import duckdb
import polars as pl
import os

In [None]:
load_dotenv(dotenv_path='/home/joao/projects/tlc-data-etl/src/config/.env')

In [None]:
URL_BASE = 'https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page'

def _get_url_source_data_(source_name:str) -> list[dict]:
    try:
        response = requests.get(URL_BASE)
        soup =  BeautifulSoup(response.text, 'html.parser')
        records = [link.get('href').strip() for table in soup.find_all('table') for link in table.find_all('a') if link.get('title').replace(' ','-').lower() == source_name]
    except Exception as e:
        raise e

    return records

In [None]:
_get_url_source_data_('yellow-taxi-trip-records')

In [None]:
ACCESS_KEY = os.environ['minio_access_key']
SECRET_KEY = os.environ['minio_secret_key']
ENDPOINT_URL = os.environ['minio_endpoint']

options = {
    'key': ACCESS_KEY,
    'secret': SECRET_KEY,
    'endpoint_url': ENDPOINT_URL
}

In [None]:
# df = pl.read_parquet('s3://tlc-data-raw/green-taxi-trip-records/*.parquet', storage_options=options)

In [None]:
# df.filter((pl.col('tpep_pickup_datetime').dt.year() >= 2021) & (pl.col('VendorID') == 1)).sample(50)
# df['VendorID'].unique()
# df.tail()

In [None]:
conn = duckdb.connect()
conn.install_extension('httpfs')
conn.load_extension('httpfs')

In [None]:
conn.query("SET s3_url_style='path'")
conn.query(f"SET s3_access_key_id='{ACCESS_KEY}'")
conn.query(f"SET s3_secret_access_key='{SECRET_KEY}'")
conn.query(f"SET s3_endpoint='minio.labserver.com.br:443'")
conn.query("CREATE SCHEMA raw")

In [None]:
conn.query("CREATE TABLE test AS SELECT * FROM read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-12.parquet')")

In [None]:
pl.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-12.parquet')

In [None]:
# conn.query("COPY raw.green_tripdata_2023 TO 's3://tlc-data-raw/test/green_tripdata_2023-01.parquet' ( FORMAT PARQUET,OVERWRITE_OR_IGNORE true )")

In [None]:
conn.query("DROP TABLE TEST")
# conn.query("DROP SCHEMA raw CASCADE")

In [None]:
# conn.query("from duckdb_settings() where name like '%s3%'")

In [None]:
# conn.query(f"SELECT * FROM read_parquet('s3://tlc-data-raw/yellow-taxi-trip-records/*.parquet')")

In [28]:
config = {
    "data_source": [
        {"source_name":"yellow-taxi-trip-records", "source_years":[2021,2022,2023] },
        {"source_name":"green-taxi-trip-records", "source_years":[2021,2022,2023] },
        {"source_name":"for-hire-vehicle-trip-records", "source_years":[2021,2022,2023]}
    ],
    "data_transform": {
        "yellow-taxi-trip-records" : {
            'cast_columns': {'RatecodeID':pl.Int64,'passenger_count':pl.Int64}
        },
        "green-taxi-trip-records": {
            'cast_columns': {'RatecodeID':pl.Int64,'passenger_count':pl.Int64,'ehail_fee':pl.Int64,'payment_type':pl.Int64 ,'trip_type': pl.Int64}
        },
        "for-hire-vehicle-trip-records" : {
            'cast_columns': {'DOlocationID':pl.Int64,'PUlocationID':pl.Int64}
        }
    },
    "data_load": {}
}

In [45]:
test = {"data_source": [
    {"source_name":"yellow-taxi-trip-records", "source_years":[2021,2022,2023] },
    {"source_name":"green-taxi-trip-records", "source_years":[2021,2022,2023] },
    {"source_name":"for-hire-vehicle-trip-records", "source_years":[2021,2022,2023]}
]}

for a  in test['data_source']:
    print(a['source_name'], a['source_years'])
    break

yellow-taxi-trip-records [2021, 2022, 2023]


In [51]:
year_range = {"from": 2021, "to": 2023}

target_year = 2022

years_in_range = [year for year in range(year_range["from"], year_range["to"] + 1)]
years_in_range

[2021, 2022, 2023]