In [1]:
import os
import sys
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional

import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError


In [2]:
df_yellow = pd.read_parquet(r"C:\Users\Robert\Documents\bootcamp\yellow_tripdata_2025-09.parquet")
print(df_yellow.columns.tolist())

df_green = pd.read_parquet(r"C:\Users\Robert\Documents\bootcamp\green_tripdata_2025-09.parquet")
print(df_green.columns.tolist())

['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee', 'cbd_congestion_fee']
['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge', 'cbd_congestion_fee']


In [3]:
LOG_LEVEL = logging.INFO
logging.basicConfig(
    level=LOG_LEVEL,
    format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)

logger = logging.getLogger("data_extraction_pipeline")

In [4]:
# --- Konfigurasi sumber data (2 data source contoh) ---

DATA_SOURCES = [
    {
        "name": "yellow_trip",
        "path": r"C:\Users\Robert\Documents\bootcamp\yellow_tripdata_2025-09.parquet",  # ganti dengan path sebenarnya
        "format": "parquet",                        # atau "csv"
        "date_column": "tpep_pickup_datetime",           # ganti sesuai kolom di file
        "target_table": "public.yellow_trip"
    },
    {
        "name": "green_trip",
        "path": r"C:\Users\Robert\Documents\bootcamp\green_tripdata_2025-09.parquet",   # ganti dengan path sebenarnya
        "format": "parquet",
        "date_column": "lpep_pickup_datetime",           # ganti sesuai kolom di file
        "target_table": "public.green_trip"
    }
]

# --- Konfigurasi koneksi PostgreSQL (contoh) ---
# Format: postgresql://username:password@host:port/database
POSTGRES_CONN_STR = os.getenv(
    "POSTGRES_CONN_STR",
    "postgresql://postgres:obet@127.0.0.1:5432/my_database"  # ganti
)

USE_BIGQUERY = False


In [5]:
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

def get_postgres_engine(conn_str: str):
    try:
        engine = create_engine(conn_str)
        # Tes koneksi ringan
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
        logger.info("Koneksi ke PostgreSQL berhasil.")
        return engine
    except SQLAlchemyError as e:
        logger.error(f"Gagal konek ke PostgreSQL: {e}")
        raise




def read_source_file(path: str, file_format: str) -> Optional[pd.DataFrame]:
    if not os.path.exists(path):
        logger.error(f"File tidak ditemukan: {path}")
        return None

    try:
        if file_format.lower() == "parquet":
            df = pd.read_parquet(path)
        elif file_format.lower() == "csv":
            df = pd.read_csv(path)
        else:
            logger.error(f"Format file tidak didukung: {file_format}")
            return None

        if df is None or df.empty:
            logger.warning(f"Dataframe kosong dari file: {path}")
            return None

        logger.info(f"Berhasil baca file {path} dengan {len(df)} baris.")
        return df
    except Exception as e:
        logger.error(f"Error saat membaca file {path}: {e}")
        return None


In [6]:
def validate_dataframe(
    df: Optional[pd.DataFrame],
    required_columns: List[str],
    source_name: str
) -> Optional[pd.DataFrame]:
    """Validasi dasar dataframe."""
    if df is None:
        logger.error(f"[{source_name}] Dataframe adalah None.")
        return None

    if df.empty:
        logger.error(f"[{source_name}] Dataframe kosong.")
        return None

    missing_cols = [c for c in required_columns if c not in df.columns]
    if missing_cols:
        logger.error(f"[{source_name}] Kolom hilang: {missing_cols}")
        return None

    return df


def filter_by_period(
    df: pd.DataFrame,
    date_col: str,
    run_date: datetime,
    mode: str = "monthly_first_day"
) -> pd.DataFrame:

    if date_col not in df.columns:
        raise ValueError(f"Kolom tanggal '{date_col}' tidak ditemukan di dataframe.")

    df = df.copy()

    # Pastikan kolom bertipe datetime
    if not pd.api.types.is_datetime64_any_dtype(df[date_col]):
        df[date_col] = pd.to_datetime(df[date_col], errors="coerce")

    df = df.dropna(subset=[date_col])

    if mode == "all":
        return df

    if mode == "daily":
        mask = df[date_col].dt.date == run_date.date()
    elif mode == "weekly":
        mask = (
            (df[date_col].dt.isocalendar().year == run_date.isocalendar().year) &
            (df[date_col].dt.isocalendar().week == run_date.isocalendar().week)
        )
    elif mode == "monthly":
        mask = (
            (df[date_col].dt.year == run_date.year) &
            (df[date_col].dt.month == run_date.month)
        )
    elif mode == "monthly_first_day":
        # Hanya baris yang tanggalnya = 1 di bulan yang sama
        mask = (
            (df[date_col].dt.year == run_date.year) &
            (df[date_col].dt.month == run_date.month) &
            (df[date_col].dt.day == 1)
        )
    else:
        raise ValueError(f"Mode filter tidak dikenal: {mode}")

    filtered = df[mask]

    logger.info(
        f"Filter mode='{mode}' untuk tanggal run {run_date.date()} "
        f"menghasilkan {len(filtered)} baris dari {len(df)} baris."
    )

    return filtered


In [7]:
def save_to_postgres(
    df: pd.DataFrame,
    engine,
    table_name: str,
    if_exists: str = "append",
    chunksize: int = 10_000
):
    if df is None or df.empty:
        logger.warning(f"Tidak ada data yang disimpan ke PostgreSQL untuk tabel {table_name}.")
        return

    try:
        df.to_sql(
            name=table_name.split(".")[-1],    # jika "schema.table"
            schema=table_name.split(".")[0] if "." in table_name else None,
            con=engine,
            if_exists=if_exists,
            index=False,
            chunksize=chunksize
        )
        logger.info(f"Berhasil simpan {len(df)} baris ke PostgreSQL tabel {table_name}.")
    except SQLAlchemyError as e:
        logger.error(f"Error saat menyimpan ke PostgreSQL ({table_name}): {e}")
    except Exception as e:
        logger.error(f"Error umum saat menyimpan ke PostgreSQL ({table_name}): {e}")


def save_to_bigquery(
    df: pd.DataFrame,
    table_name: str,
    project_id: str,
    if_exists: str = "append"
):
    if df is None or df.empty:
        logger.warning(f"Tidak ada data yang disimpan ke BigQuery untuk tabel {table_name}.")
        return

    if not USE_BIGQUERY:
        logger.info("USE_BIGQUERY=False, skip upload ke BigQuery.")
        return

    if "." not in table_name:
        logger.error(
            f"Nama tabel BigQuery harus dalam format 'dataset.table', "
            f"didapat: {table_name}"
        )
        return

    dataset, table = table_name.split(".", 1)
    try:
        from pandas_gbq import to_gbq
        to_gbq(
            df,
            destination_table=f"{dataset}.{table}",
            project_id=project_id,
            if_exists=if_exists
        )
        logger.info(f"Berhasil simpan {len(df)} baris ke BigQuery tabel {dataset}.{table}.")
    except Exception as e:
        logger.error(f"Error saat menyimpan ke BigQuery ({table_name}): {e}")


In [8]:
def process_single_run(
    run_date: datetime,
    data_sources: List[Dict],
    filter_mode: str = "monthly_first_day"
):

    logger.info("=" * 80)
    logger.info(f"Mulai proses running untuk tanggal: {run_date.date()} "
                f"dengan mode filter='{filter_mode}'")
    logger.info("=" * 80)

    engine = get_postgres_engine(POSTGRES_CONN_STR)

    for src in data_sources:
        name = src.get("name")
        path = src.get("path")
        fmt = src.get("format", "parquet")
        date_col = src.get("date_column")
        target_table_pg = src.get("target_table")


        logger.info(f"--- Proses source: {name} ---")

        df_raw = read_source_file(path, fmt)
        if df_raw is None:
            logger.error(f"[{name}] Gagal baca file, skip source ini.")
            continue

        df_valid = validate_dataframe(df_raw, required_columns=[date_col], source_name=name)
        if df_valid is None:
            logger.error(f"[{name}] Validasi gagal, skip source ini.")
            continue

        try:
            df_filtered = filter_by_period(df_valid, date_col=date_col, run_date=run_date, mode=filter_mode)
        except Exception as e:
            logger.error(f"[{name}] Error saat filter_by_period: {e}")
            continue

        if df_filtered is None or df_filtered.empty:
            logger.warning(f"[{name}] Tidak ada data setelah filter, tidak akan disimpan.")
            continue

        save_to_postgres(df_filtered, engine=engine, table_name=target_table_pg)

    logger.info(f"Selesai proses running untuk tanggal: {run_date.date()}")


In [9]:
def generate_first_day_months(
    start_year: int,
    start_month: int,
    num_months: int
) -> List[datetime]:
    dates = []
    year = start_year
    month = start_month

    for _ in range(num_months):
        dates.append(datetime(year, month, 1))

        month += 1
        if month > 12:
            month = 1
            year += 1

    return dates


run_dates = generate_first_day_months(start_year=2024, start_month=1, num_months=7)
for d in run_dates:
    logger.info(f"Scheduled run date: {d.date()}")


2025-11-16 13:45:47,997 [INFO] data_extraction_pipeline - Scheduled run date: 2024-01-01
2025-11-16 13:45:47,999 [INFO] data_extraction_pipeline - Scheduled run date: 2024-02-01
2025-11-16 13:45:48,000 [INFO] data_extraction_pipeline - Scheduled run date: 2024-03-01
2025-11-16 13:45:48,002 [INFO] data_extraction_pipeline - Scheduled run date: 2024-04-01
2025-11-16 13:45:48,004 [INFO] data_extraction_pipeline - Scheduled run date: 2024-05-01
2025-11-16 13:45:48,005 [INFO] data_extraction_pipeline - Scheduled run date: 2024-06-01
2025-11-16 13:45:48,005 [INFO] data_extraction_pipeline - Scheduled run date: 2024-07-01


In [10]:
if __name__ == "__main__":
    # Mode filter bisa kamu ganti: 'daily', 'weekly', 'monthly', 'monthly_first_day', 'all'
    FILTER_MODE = "monthly_first_day"

    # Contoh: gunakan run_dates yang sudah dibuat di block sebelumnya
    for run_date in run_dates:
        try:
            process_single_run(
                run_date=run_date,
                data_sources=DATA_SOURCES,
                filter_mode=FILTER_MODE
            )
        except Exception as e:
            # Error handling global per running
            logger.error(f"Terjadi error fatal pada run tanggal {run_date.date()}: {e}")


2025-11-16 13:45:48,031 [INFO] data_extraction_pipeline - Mulai proses running untuk tanggal: 2024-01-01 dengan mode filter='monthly_first_day'
2025-11-16 13:45:48,516 [INFO] data_extraction_pipeline - Koneksi ke PostgreSQL berhasil.
2025-11-16 13:45:48,517 [INFO] data_extraction_pipeline - --- Proses source: yellow_trip ---
2025-11-16 13:45:49,332 [INFO] data_extraction_pipeline - Berhasil baca file C:\Users\Robert\Documents\bootcamp\yellow_tripdata_2025-09.parquet dengan 4251015 baris.
2025-11-16 13:45:49,868 [ERROR] data_extraction_pipeline - [yellow_trip] Error saat filter_by_period: Unable to allocate 422. MiB for an array with shape (13, 4251015) and data type float64
2025-11-16 13:45:50,077 [INFO] data_extraction_pipeline - --- Proses source: green_trip ---
2025-11-16 13:45:50,099 [INFO] data_extraction_pipeline - Berhasil baca file C:\Users\Robert\Documents\bootcamp\green_tripdata_2025-09.parquet dengan 48893 baris.
2025-11-16 13:45:50,191 [INFO] data_extraction_pipeline - Filt