In [1]:
# pip install psycopg2-binary duckdb

# Create Mockdata: Bike shop and test landing ETL

In [2]:
import os
import shutil
import pandas as pd
import numpy as np
import random
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine

# --- MOCK DATA GENERATION ---

def create_user_data(num_users=100):
    """Generate mock user data."""
    user_ids = np.arange(1, num_users + 1)
    names = [f"User_{i}" for i in user_ids]
    # Simulate mixed data types: age mostly numeric, with occasional 'N/A'
    ages = [random.randint(18, 70) if random.random() < 0.9 else "N/A" for _ in range(num_users)]
    emails = [f"user{i}@example.com" for i in user_ids]
    return pd.DataFrame({"user_id": user_ids, "name": names, "age": ages, "email": emails})

def create_bike_data(num_bikes=50):
    """Generate mock bike data."""
    bike_ids = np.arange(1, num_bikes + 1)
    models = [f"Model_{random.choice(['A', 'B', 'C'])}" for _ in bike_ids]
    # Price as float, but occasionally a text error.
    prices = [round(random.uniform(100.0, 1000.0), 2) if random.random() < 0.85 else "unknown" for _ in bike_ids]
    types = [random.choice(["road", "mountain", "hybrid"]) for _ in bike_ids]
    return pd.DataFrame({"bike_id": bike_ids, "model": models, "price": prices, "type": types})

def create_rent_data(num_rents=500, start_date='2025-02-14', end_date='2025-02-19'):
    """Generate mock rental transaction data."""
    rent_ids = np.arange(1, num_rents + 1)
    # Randomly assign user and bike IDs.
    user_ids = np.random.randint(1, 101, size=num_rents)
    bike_ids = np.random.randint(1, 51, size=num_rents)
    
    start_ts = pd.Timestamp(start_date)
    end_ts = pd.Timestamp(end_date)
    random.seed(42)
    np.random.seed(42)
    random_seconds = np.random.randint(
        start_ts.value // 10**9,
        end_ts.value // 10**9 + 1,
        size=num_rents
    )
    rent_start = pd.to_datetime(random_seconds, unit='s')
    # Rent end is rent_start plus between 30 and 120 minutes.
    rent_end = rent_start + pd.to_timedelta(np.random.randint(30, 121, size=num_rents), unit='m')
    # Cost: float, but occasionally an error string.
    costs = [round(random.uniform(5.0, 50.0), 2) if random.random() < 0.9 else "error" for _ in range(num_rents)]
    return pd.DataFrame({
        "rent_id": rent_ids,
        "user_id": user_ids,
        "bike_id": bike_ids,
        "rent_start": rent_start,
        "rent_end": rent_end,
        "rent_cost": costs
    })

# --- DATA INGESTION ---

def write_table_to_postgres(df, table_name, connection_str):
    """Write the given DataFrame to PostgreSQL."""
    engine = create_engine(connection_str)
    df.to_sql(table_name, engine, if_exists="replace", index=False)
    print(f"Data written to table '{table_name}'.")
    return engine

# --- ETL & JOIN MODULE ---

def run_etl_join(engine, root_path, partition_col="rent_date", overwrite_date=None):
    """
    Run ETL by:
      1. Executing a join query across rent, user, and bike tables.
      2. Filtering on a specific day if overwrite_date is provided.
      3. Adding partition columns (year, month, day) based on rent_start.
      4. Clearing the contents of the target partition folder if necessary.
      5. Writing the joined result to a Hive-partitioned Parquet dataset.
    """
    if overwrite_date is not None:
        # Filter join query to a single day.
        query = f"""
            SELECT
                r.rent_id,
                r.user_id,
                r.bike_id,
                r.rent_start,
                r.rent_end,
                r.rent_cost,
                u.name as user_name,
                u.age,
                u.email,
                b.model as bike_model,
                b.price as bike_price,
                b.type as bike_type,
                to_char(r.rent_start, 'YYYY-MM-DD') AS {partition_col}
            FROM rent r
            JOIN "user" u ON r.user_id = u.user_id
            JOIN bike b ON r.bike_id = b.bike_id
            WHERE to_char(r.rent_start, 'YYYY-MM-DD') = '{overwrite_date}'
        """
    else:
        query = f"""
            SELECT
                r.rent_id,
                r.user_id,
                r.bike_id,
                r.rent_start,
                r.rent_end,
                r.rent_cost,
                u.name as user_name,
                u.age,
                u.email,
                b.model as bike_model,
                b.price as bike_price,
                b.type as bike_type,
                to_char(r.rent_start, 'YYYY-MM-DD') AS {partition_col}
            FROM rent r
            JOIN "user" u ON r.user_id = u.user_id
            JOIN bike b ON r.bike_id = b.bike_id
            WHERE 1=1
        """
    df_join = pd.read_sql_query(query, engine, dtype_backend="pyarrow")
    
    # Add additional partition columns for Hive-style partitioning.
    df_join["year"] = pd.to_datetime(df_join["rent_start"]).dt.strftime("%Y")
    df_join["month"] = pd.to_datetime(df_join["rent_start"]).dt.strftime("%m")
    df_join["day"] = pd.to_datetime(df_join["rent_start"]).dt.strftime("%d")
    
    # If overwrite_date is provided, clear the contents of the matching partition folder.
    if overwrite_date is not None:
        # Overwrite partition: use year, month, day from overwrite_date.
        y, m, d = overwrite_date.split("-")
        partition_dir = os.path.join(root_path, f"year={y}", f"month={m}", f"day={d}")
        if os.path.exists(partition_dir):
            print(f"Clearing contents in partition directory: {partition_dir}")
            for entry in os.scandir(partition_dir):
                if entry.is_file() or entry.is_symlink():
                    os.remove(entry.path)
                elif entry.is_dir():
                    shutil.rmtree(entry.path)
            print(f"Cleared contents in partition: {partition_dir}")
        else:
            print(f"Partition directory {partition_dir} does not exist. It will be created.")
    
    # Convert the joined DataFrame to a pyarrow Table.
    table = pa.Table.from_pandas(df_join, preserve_index=False)
    
    # Write to a Hive-partitioned Parquet dataset.
    partition_cols = ["year", "month", "day"]
    pq.write_to_dataset(
        table=table,
        root_path=root_path,
        partition_cols=partition_cols
    )
    print("ETL join complete. Data written to partitions under:", root_path)

# --- MAIN EXECUTION ---

if __name__ == "__main__":
    # Connection string for PostgreSQL.
    connection_str = "postgresql+psycopg2://testuser:testpassword@postgres:5432/testdb"
    
    # Create and write mock data for three tables.
    user_df = create_user_data(num_users=100)
    bike_df = create_bike_data(num_bikes=50)
    rent_df = create_rent_data(num_rents=500)
    
    engine = write_table_to_postgres(user_df, "user", connection_str)
    write_table_to_postgres(bike_df, "bike", connection_str)
    write_table_to_postgres(rent_df, "rent", connection_str)
    
    # Define the output directory for Parquet dataset.
    parquet_root = "l.fact_rentbike"
    
    # Run the ETL join process for a specific day, for example, '2025-02-17'
    run_etl_join(engine, parquet_root, overwrite_date="2025-02-17")
    
    # If you want to process all data, call without overwrite_date:
    # run_etl_join(engine, parquet_root)


Data written to table 'user'.
Data written to table 'bike'.
Data written to table 'rent'.
Partition directory l.fact_rentbike/year=2025/month=02/day=17 does not exist. It will be created.
ETL join complete. Data written to partitions under: l.fact_rentbike


## test query on the data base

In [3]:
partition_col="rent_date"
overwrite_date="2025-02-14"

query = f"""
            SELECT
                r.rent_id,
                r.user_id,
                r.bike_id,
                r.rent_start,
                r.rent_end,
                r.rent_cost,
                u.name as user_name,
                u.age,
                u.email,
                b.model as bike_model,
                b.price as bike_price,
                b.type as bike_type,
                to_char(r.rent_start, 'YYYY-MM-DD') AS {partition_col}
            FROM rent r
            JOIN "user" u ON r.user_id = u.user_id
            JOIN bike b ON r.bike_id = b.bike_id
            WHERE to_char(r.rent_start, 'YYYY-MM-DD') = '{overwrite_date}'
        """
df_join = pd.read_sql_query(query, engine, dtype_backend="pyarrow")
df_join.info()
df_join.head(10)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 94 entries, 0 to 93
Data columns (total 13 columns):
 #   Column      Non-Null Count  Dtype                 
---  ------      --------------  -----                 
 0   rent_id     94 non-null     int64[pyarrow]        
 1   user_id     94 non-null     int64[pyarrow]        
 2   bike_id     94 non-null     int64[pyarrow]        
 3   rent_start  94 non-null     timestamp[ns][pyarrow]
 4   rent_end    94 non-null     timestamp[ns][pyarrow]
 5   rent_cost   94 non-null     string[pyarrow]       
 6   user_name   94 non-null     string[pyarrow]       
 7   age         94 non-null     string[pyarrow]       
 8   email       94 non-null     string[pyarrow]       
 9   bike_model  94 non-null     string[pyarrow]       
 10  bike_price  94 non-null     string[pyarrow]       
 11  bike_type   94 non-null     string[pyarrow]       
 12  rent_date   94 non-null     string[pyarrow]       
dtypes: int64[pyarrow](3), string[pyarrow](8), timestamp[

Unnamed: 0,rent_id,user_id,bike_id,rent_start,rent_end,rent_cost,user_name,age,email,bike_model,bike_price,bike_type,rent_date
0,51,69,2,2025-02-14 08:45:51,2025-02-14 10:12:51,49.83,User_69,33.0,user69@example.com,Model_A,250.54,mountain,2025-02-14
1,378,44,2,2025-02-14 13:42:57,2025-02-14 15:28:57,error,User_44,34.0,user44@example.com,Model_A,250.54,mountain,2025-02-14
2,482,17,2,2025-02-14 00:50:51,2025-02-14 02:23:51,40.95,User_17,34.0,user17@example.com,Model_A,250.54,mountain,2025-02-14
3,48,96,3,2025-02-14 18:43:55,2025-02-14 20:04:55,9.93,User_96,64.0,user96@example.com,Model_A,847.08,road,2025-02-14
4,408,37,3,2025-02-14 18:15:26,2025-02-14 18:49:26,9.19,User_37,49.0,user37@example.com,Model_A,847.08,road,2025-02-14
5,20,26,3,2025-02-14 18:00:20,2025-02-14 19:32:20,32.83,User_26,63.0,user26@example.com,Model_A,847.08,road,2025-02-14
6,195,87,4,2025-02-14 07:12:19,2025-02-14 09:11:19,16.46,User_87,61.0,user87@example.com,Model_B,unknown,road,2025-02-14
7,428,48,4,2025-02-14 15:49:18,2025-02-14 16:54:18,19.48,User_48,38.0,user48@example.com,Model_B,unknown,road,2025-02-14
8,471,97,5,2025-02-14 15:26:49,2025-02-14 16:13:49,16.25,User_97,,user97@example.com,Model_B,669.77,road,2025-02-14
9,456,25,5,2025-02-14 19:27:11,2025-02-14 21:10:11,45.65,User_25,40.0,user25@example.com,Model_B,669.77,road,2025-02-14


## test reading parquert files in landing zone

In [4]:
import duckdb
import pandas as pd

# Connect to an in-memory DuckDB instance.
con = duckdb.connect(database=':memory:')

# Use DuckDB's read_parquet() function to read the dataset.
# Assume the dataset is stored in the directory "output_parquet_dataset" with hive partitions:
# output_parquet_dataset/year=2025/month=02/day=17/...
query = """
SELECT *
FROM read_parquet('l.fact_rentbike/**/*.parquet')
WHERE year = '2025'
  AND month = '02'
  AND day = '17'
"""

df_duck = con.execute(query).df()

print("DuckDB Parquet Query Result:")
df_duck.info()
df_duck.head(20)

DuckDB Parquet Query Result:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 84 entries, 0 to 83
Data columns (total 16 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   rent_id     84 non-null     int64         
 1   user_id     84 non-null     int64         
 2   bike_id     84 non-null     int64         
 3   rent_start  84 non-null     datetime64[ns]
 4   rent_end    84 non-null     datetime64[ns]
 5   rent_cost   84 non-null     object        
 6   user_name   84 non-null     object        
 7   age         84 non-null     object        
 8   email       84 non-null     object        
 9   bike_model  84 non-null     object        
 10  bike_price  84 non-null     object        
 11  bike_type   84 non-null     object        
 12  rent_date   84 non-null     object        
 13  day         84 non-null     int64         
 14  month       84 non-null     object        
 15  year        84 non-null     int64         
dtyp

Unnamed: 0,rent_id,user_id,bike_id,rent_start,rent_end,rent_cost,user_name,age,email,bike_model,bike_price,bike_type,rent_date,day,month,year
0,49,30,1,2025-02-17 16:37:10,2025-02-17 17:16:10,40.64,User_30,52.0,user30@example.com,Model_B,unknown,road,2025-02-17,17,2,2025
1,314,12,1,2025-02-17 06:37:56,2025-02-17 08:24:56,17.43,User_12,18.0,user12@example.com,Model_B,unknown,road,2025-02-17,17,2,2025
2,349,25,2,2025-02-17 14:44:12,2025-02-17 16:03:12,40.58,User_25,40.0,user25@example.com,Model_A,250.54,mountain,2025-02-17,17,2,2025
3,117,30,3,2025-02-17 01:31:52,2025-02-17 03:29:52,37.33,User_30,52.0,user30@example.com,Model_A,847.08,road,2025-02-17,17,2,2025
4,427,2,3,2025-02-17 10:45:21,2025-02-17 11:40:21,29.45,User_2,,user2@example.com,Model_A,847.08,road,2025-02-17,17,2,2025
5,500,36,4,2025-02-17 23:48:14,2025-02-18 00:20:14,48.49,User_36,33.0,user36@example.com,Model_B,unknown,road,2025-02-17,17,2,2025
6,86,97,5,2025-02-17 15:32:19,2025-02-17 17:22:19,15.3,User_97,,user97@example.com,Model_B,669.77,road,2025-02-17,17,2,2025
7,452,77,5,2025-02-17 01:28:32,2025-02-17 02:12:32,error,User_77,44.0,user77@example.com,Model_B,669.77,road,2025-02-17,17,2,2025
8,444,9,5,2025-02-17 03:46:31,2025-02-17 04:29:31,36.17,User_9,45.0,user9@example.com,Model_B,669.77,road,2025-02-17,17,2,2025
9,113,95,7,2025-02-17 15:02:38,2025-02-17 15:43:38,40.28,User_95,28.0,user95@example.com,Model_B,166.45,mountain,2025-02-17,17,2,2025


In [5]:
import pyarrow as pa

def arrow_info(table: pa.Table):
    """Print basic information about a pyarrow Table similar to pandas.DataFrame.info()."""
    print("PyArrow Table Info:")
    print("Number of columns:", table.num_columns)
    print("Number of rows:", table.num_rows)
    # print("Schema:")
    # print(table.schema)
    print("\nDetailed Column Info:")
    for i, field in enumerate(table.schema):
        print(f"  Column {i} - {field.name}: {field.type}")

def arrow_head(table: pa.Table, n=5):
    """Return a new pyarrow Table containing only the first n rows, similar to pandas.DataFrame.head()."""
    return table.slice(0, n)

df_arrow= con.execute(query).arrow()

arrow_info(df_arrow)

df2=df_arrow.to_pandas(types_mapper=pd.ArrowDtype)
df2.info()
df2.head(20)

PyArrow Table Info:
Number of columns: 16
Number of rows: 84

Detailed Column Info:
  Column 0 - rent_id: int64
  Column 1 - user_id: int64
  Column 2 - bike_id: int64
  Column 3 - rent_start: timestamp[ns]
  Column 4 - rent_end: timestamp[ns]
  Column 5 - rent_cost: string
  Column 6 - user_name: string
  Column 7 - age: string
  Column 8 - email: string
  Column 9 - bike_model: string
  Column 10 - bike_price: string
  Column 11 - bike_type: string
  Column 12 - rent_date: string
  Column 13 - day: int64
  Column 14 - month: string
  Column 15 - year: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 84 entries, 0 to 83
Data columns (total 16 columns):
 #   Column      Non-Null Count  Dtype                 
---  ------      --------------  -----                 
 0   rent_id     84 non-null     int64[pyarrow]        
 1   user_id     84 non-null     int64[pyarrow]        
 2   bike_id     84 non-null     int64[pyarrow]        
 3   rent_start  84 non-null     timestamp[ns][pyar

Unnamed: 0,rent_id,user_id,bike_id,rent_start,rent_end,rent_cost,user_name,age,email,bike_model,bike_price,bike_type,rent_date,day,month,year
0,49,30,1,2025-02-17 16:37:10,2025-02-17 17:16:10,40.64,User_30,52.0,user30@example.com,Model_B,unknown,road,2025-02-17,17,2,2025
1,314,12,1,2025-02-17 06:37:56,2025-02-17 08:24:56,17.43,User_12,18.0,user12@example.com,Model_B,unknown,road,2025-02-17,17,2,2025
2,349,25,2,2025-02-17 14:44:12,2025-02-17 16:03:12,40.58,User_25,40.0,user25@example.com,Model_A,250.54,mountain,2025-02-17,17,2,2025
3,117,30,3,2025-02-17 01:31:52,2025-02-17 03:29:52,37.33,User_30,52.0,user30@example.com,Model_A,847.08,road,2025-02-17,17,2,2025
4,427,2,3,2025-02-17 10:45:21,2025-02-17 11:40:21,29.45,User_2,,user2@example.com,Model_A,847.08,road,2025-02-17,17,2,2025
5,500,36,4,2025-02-17 23:48:14,2025-02-18 00:20:14,48.49,User_36,33.0,user36@example.com,Model_B,unknown,road,2025-02-17,17,2,2025
6,86,97,5,2025-02-17 15:32:19,2025-02-17 17:22:19,15.3,User_97,,user97@example.com,Model_B,669.77,road,2025-02-17,17,2,2025
7,452,77,5,2025-02-17 01:28:32,2025-02-17 02:12:32,error,User_77,44.0,user77@example.com,Model_B,669.77,road,2025-02-17,17,2,2025
8,444,9,5,2025-02-17 03:46:31,2025-02-17 04:29:31,36.17,User_9,45.0,user9@example.com,Model_B,669.77,road,2025-02-17,17,2,2025
9,113,95,7,2025-02-17 15:02:38,2025-02-17 15:43:38,40.28,User_95,28.0,user95@example.com,Model_B,166.45,mountain,2025-02-17,17,2,2025
