In [None]:
import duckdb
import boto3
import pandas as pd
import datetime
import re

s3 = boto3.client('s3', aws_access_key_id='', aws_secret_access_key='')
s3._request_signer.sign = (lambda *args, **kwargs: None)


response = s3.list_objects_v2(
    Bucket='cycling.data.tfl.gov.uk',
    Prefix='usage-stats',

)


In [None]:
df = pd.DataFrame(response["Contents"])

In [None]:
files_dict = response["Contents"]
files=[]
pattern= re.compile(r'usage-stats/.*Journey.*Data.*.csv')
for file in files_dict:
    if pattern.match(file["Key"]) and file["LastModified"].year>2020:
        file_key = file["Key"]
        files.append(f"s3://cycling.data.tfl.gov.uk/{file_key}")

In [None]:
files

In [None]:
conn = duckdb.connect("data/journeys.duckdb")
conn.execute("SET memory_limit = '3GB';")

In [None]:
columns=set()
for file in files:
    columns |= set(conn.read_csv(file, normalize_names=True).columns)


In [None]:
df = duckdb.sql(f"select * RENAME (bike_id as bike_number, endstation_id as end_station_id, endstation_name as end_station, startstation_id as start_station_id, startstation_name as start_station, duration as total_duration_ms)  from read_csv('s3://cycling.data.tfl.gov.uk/usage-stats/246JourneyDataExtract23Dec2020-29Dec2020.csv', normalize_names=True)")
to_map = {x:y for x,y in columns_mapping.items() if x in df.columns}
df_cleaned = duckdb.sql(f"select * REPLACE( total_duration_ms*1000 as total_duration_ms) from df")


In [None]:
df_cleaned

In [None]:
columns_mapping={
 "bike_number": "bike_id",
 'duration': "total_duration_ms",
 'end_station_number': "end_station_id",
 'endstation_id': "end_station_id",
 'endstation_name': "end_station",
 'number': "rental_id",
 'start_station_number': "start_station_id",
 'startstation_id': "start_station_id",
 'startstation_name': "start_station"

}
final_cols = ["rental_id", "bike_id", "bike_model", "start_station_id", "start_station", "end_station_id", "end_station", "start_date", "end_date", "total_duration_ms"]

In [None]:
import logging

logger = logging.Logger("ingestion", level="INFO")
def clean_file(file_path:str, parquet_path:str):
    logger.info(f"ingesting {file_path}")
    raw_df = duckdb.read_csv(file_path, normalize_names=True)
    if 'duration' in raw_df.columns:
        raw_df = duckdb.sql("select * REPLACE duration*1000 as duration from raw_df")
    to_map = {x:y for x,y in columns_mapping.items() if x in raw_df.columns}
    to_map_str = ', '.join([f"{x} as {y}" for x,y in to_map.items()])
    df_cleaned = duckdb.sql(f"select * RENAME ({to_map_str}) from raw_df")
    logger.info(f"writing {parquet_path}")
    cols_to_select = [col for col in final_cols if col in df_cleaned.columns]
    df_cleaned.select(",".join(cols_to_select)).write_parquet(parquet_path, overwrite=True)
    

In [None]:
def clean_and_write_files():
    pattern= re.compile(r'usage-stats/.*Journey.*Data.*.csv')
    for file in files_dict:
        logger.info(file)
        if pattern.match(file["Key"]) and file["LastModified"].year>2020:
            file_key = file["Key"]
            csv_path = f"s3://cycling.data.tfl.gov.uk/{file_key}"
            parquet_path = f"./data/{file_key}".replace(".csv", ".parquet")
            clean_file(csv_path, parquet_path)
    

In [None]:
clean_and_write_files()

In [None]:
duckdb.read_parquet("./data/usage-stats/*.parquet", union_by_name=True)

In [None]:

conn = duckdb.connect("data/dev.duckdb")
conn.execute("SET memory_limit = '3GB';")

In [None]:
conn.sql("CREATE SCHEMA bikes")

In [None]:
conn.sql("CREATE TABLE journeys as (SELECT * from read_parquet('./data/usage-stats/*.parquet', union_by_name=True))")

In [None]:
conn.sql("select * from bike_rentals.journeys")