In [None]:
# Zelle 1: Setup & Config
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

import pandas as pd
import uuid
import datetime
import matplotlib.pyplot as plt

# sorgt dafür, dass Plots im Notebook angezeigt werden
%matplotlib inline

# Lesbarkeit in der Exploration erhöhen
pd.set_option("display.max_columns", 200)
pd.set_option("display.max_rows", 200)
pd.set_option("display.width", 140)

In [None]:
# Zelle 1: Imports & Config
from google.cloud import bigquery
import logging

# Logging Setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

# --- KONFIGURATION ---
PROJECT_ID = "taxi-bi-project"
# Trennung: Wo liegen die Daten? Wohin sollen sie?
SOURCE_DATASET = "staging"    # Hier liegen staging_yellow, staging_green, fhv
TARGET_DATASET = "canonical"   # Hier soll die saubere Tabelle hin
# Tabellennamen (Basierend auf deiner Exploration)
STAGING_YELLOW = "yellow_staging_unified"
STAGING_GREEN = "green_staging_unified"
STAGING_FHV = "fhv_staging_unified" # <--- Name aus deinem Notebook übernommen

BQ_LOCATION = "EU"
# Tabellen
TARGET_TABLE = "canonical_unified_taxi"
ERROR_TABLE = "error_records"
LOG_TABLE = "etl_process_log"

# Client
client = bigquery.Client(project=PROJECT_ID)

# Dataset Location Check (verhindert 404 Fehler)
try:
    src_ds = client.get_dataset(f"{PROJECT_ID}.{SOURCE_DATASET}")
    LOCATION = src_ds.location
    print(f"Region erkannt: {LOCATION}")
    
    # Ziel-Dataset anlegen/prüfen
    tgt_ref = bigquery.Dataset(f"{PROJECT_ID}.{TARGET_DATASET}")
    tgt_ref.location = LOCATION
    client.create_dataset(tgt_ref, exists_ok=True)
    print(f"Ziel-Dataset '{TARGET_DATASET}' ist bereit.")
except Exception as e:
    print(f"Setup-Fehler: {e}")

# Referenzen
table_ref = f"{PROJECT_ID}.{TARGET_DATASET}.{TARGET_TABLE}"
error_table_ref = f"{PROJECT_ID}.{TARGET_DATASET}.{ERROR_TABLE}"
log_table_ref = f"{PROJECT_ID}.{TARGET_DATASET}.{LOG_TABLE}"

Dataset 'canonical' ist bereit.
Ziel-Tabelle wird sein: taxi-bi-project.canonical.canonical_unified_taxi


In [None]:
# Zelle 2: Tabellen-DDL (Canonical & Error & Log)
def create_all_tables():
    # 1. Schema für Taxi-Daten (Valid & Error sind fast identisch)
    base_schema = [
        bigquery.SchemaField("trip_id", "STRING", description="Unique ID für ALLE"),
        bigquery.SchemaField("source_system", "STRING"),
        bigquery.SchemaField("load_date", "TIMESTAMP"),
        bigquery.SchemaField("vendor_id", "STRING"), 
        bigquery.SchemaField("pickup_datetime", "TIMESTAMP"),
        bigquery.SchemaField("dropoff_datetime", "TIMESTAMP"),
        bigquery.SchemaField("pickup_location_id", "INTEGER"),
        bigquery.SchemaField("dropoff_location_id", "INTEGER"),
        bigquery.SchemaField("passenger_count", "INTEGER"),
        bigquery.SchemaField("trip_distance", "FLOAT"),
        bigquery.SchemaField("store_and_fwd_flag", "STRING"),
        bigquery.SchemaField("fare_amount", "FLOAT"),
        bigquery.SchemaField("tip_amount", "FLOAT"),
        bigquery.SchemaField("total_amount", "FLOAT"),
        bigquery.SchemaField("payment_type", "INTEGER"),
        bigquery.SchemaField("is_shared_ride", "BOOLEAN"),
        bigquery.SchemaField("dq_issue_flag", "BOOLEAN") # Soft Validation
    ]

    # Error Tabelle bekommt eine Zusatz-Spalte
    error_schema = base_schema + [bigquery.SchemaField("rejection_reason", "STRING")]

    # Log Schema
    log_schema = [
        bigquery.SchemaField("run_id", "STRING"),
        bigquery.SchemaField("start_time", "TIMESTAMP"),
        bigquery.SchemaField("status", "STRING"),
        bigquery.SchemaField("valid_rows", "INTEGER"),
        bigquery.SchemaField("error_rows", "INTEGER"),
        bigquery.SchemaField("error_msg", "STRING")
    ]

    # Tabellen anlegen
    tables_to_create = [
        (table_ref, base_schema),
        (error_table_ref, error_schema),
        (log_table_ref, log_schema)
    ]

    for t_ref, t_schema in tables_to_create:
        try:
            t = bigquery.Table(t_ref, schema=t_schema)
            # Partitionierung für Performance
            if "log" in t_ref:
                t.time_partitioning = bigquery.TimePartitioning(field="start_time")
            else:
                t.time_partitioning = bigquery.TimePartitioning(field="pickup_datetime")
            
            client.create_table(t, exists_ok=True)
            print(f"Tabelle bereit: {t_ref.split('.')[-1]}")
        except Exception as e:
            print(f"Fehler bei {t_ref}: {e}")

create_all_tables()

In [None]:
# Zelle: Reset - Alte Tabelle löschen
from google.api_core.exceptions import NotFound
try:
    client.delete_table(table_ref)
    print(f"Tabelle {table_ref} wurde gelöscht (falls sie existierte).")
except NotFound:
    print("Tabelle existierte noch nicht. Alles okay.")

Tabelle taxi-bi-project.canonical.canonical_unified_taxi wurde gelöscht (falls sie existierte).


In [None]:
# Zelle 3: ETL Pipeline (Split: Valid vs. Error)
def run_etl_split_logic():
    print("Starte ETL mit Error-Handling & Unique IDs...")
    
    run_id = str(uuid.uuid4())
    start_ts = datetime.datetime.now(datetime.timezone.utc)
    
    # Filter-Konstanten
    START_DATE = '2015-01-01'
    END_DATE = '2026-01-01'

    # Das SQL Skript
    query = f"""
    BEGIN
        -- 1. Temp Tabelle mit ALLEN Daten (Roh-Harmonisierung)
        CREATE TEMP TABLE temp_all_trips AS
        WITH raw_combined AS (
            -- (A) YELLOW
            SELECT 
                'YELLOW' as source, CAST(VendorID AS STRING) as vid,
                CAST(tpep_pickup_datetime AS TIMESTAMP) as t_pick, CAST(tpep_dropoff_datetime AS TIMESTAMP) as t_drop,
                PULocationID as loc_pu, DOLocationID as loc_do,
                IFNULL(passenger_count, 1) as pax, trip_distance as dist, IFNULL(store_and_fwd_flag, 'N') as flag,
                GREATEST(IFNULL(fare_amount, 0), 0) as fare, GREATEST(IFNULL(tip_amount, 0), 0) as tip, GREATEST(IFNULL(total_amount, 0), 0) as total,
                IFNULL(payment_type, 0) as pay_type, FALSE as shared,
                GREATEST(IFNULL(airport_fee, 0), 0) as air_fee
            FROM `{PROJECT_ID}.{SOURCE_DATASET}.staging_yellow_taxi`
            
            UNION ALL
            
            -- (B) GREEN
            SELECT 
                'GREEN', CAST(VendorID AS STRING),
                CAST(lpep_pickup_datetime AS TIMESTAMP), CAST(lpep_dropoff_datetime AS TIMESTAMP),
                PULocationID, DOLocationID,
                IFNULL(passenger_count, 1), trip_distance, IFNULL(store_and_fwd_flag, 'N'),
                GREATEST(IFNULL(fare_amount, 0), 0), GREATEST(IFNULL(tip_amount, 0), 0), GREATEST(IFNULL(total_amount, 0), 0),
                IFNULL(payment_type, 0), FALSE, 0
            FROM `{PROJECT_ID}.{SOURCE_DATASET}.staging_green_taxi`

            UNION ALL

            -- (C) FHV
            SELECT 
                'FHV', dispatching_base_num,
                CAST(pickup_datetime AS TIMESTAMP), CAST(dropOff_datetime AS TIMESTAMP),
                PULocationID, DOLocationID,
                NULL, NULL, 'N', -- Pax & Dist NULL
                NULL, NULL, NULL, -- Finanzen NULL
                NULL, -- Payment Type
                IF(CAST(SR_Flag AS STRING)='1', TRUE, FALSE), -- Shared Ride Check
                NULL -- Air Fee
            FROM `{PROJECT_ID}.{SOURCE_DATASET}.fhv_unified`
        )
        
        -- Anreichern mit ID und DQ-Checks
        SELECT
            FARM_FINGERPRINT(CONCAT(source, CAST(t_pick AS STRING), IFNULL(vid,''))) as trip_id,
            source as source_system,
            CURRENT_TIMESTAMP() as load_date,
            vid as vendor_id,
            t_pick as pickup_datetime,
            t_drop as dropoff_datetime,
            loc_pu as pickup_location_id,
            loc_do as dropoff_location_id,
            pax as passenger_count,
            dist as trip_distance,
            flag as store_and_fwd_flag,
            fare as fare_amount,
            tip as tip_amount,
            total as total_amount,
            pay_type as payment_type,
            shared as is_shared_ride,
            
            -- SOFT VALIDATION (DQ Issue Flag) -> Landet in Canonical, aber markiert
            CASE 
                WHEN dist > 500 THEN TRUE
                WHEN source IN ('YELLOW', 'GREEN') AND pax > 5 THEN TRUE
                WHEN pay_type = 2 AND tip > 10 THEN TRUE
                WHEN air_fee > 0 AND (loc_pu NOT IN (132, 138) AND loc_do NOT IN (132, 138)) THEN TRUE
                ELSE FALSE 
            END as dq_issue_flag,

            -- HARD VALIDATION (Rejection Logic) -> Bestimmt Ziel-Tabelle
            CASE
                WHEN t_pick IS NULL OR t_drop IS NULL THEN 'Missing Timestamps'
                WHEN t_pick >= t_drop THEN 'Negative Duration'
                WHEN t_pick < TIMESTAMP('{START_DATE}') THEN 'Date too old (<2015)'
                WHEN t_pick > CURRENT_TIMESTAMP() THEN 'Future Date'
                ELSE 'VALID'
            END as row_status

        FROM raw_combined;

        -- 2. Clean Table befüllen (Nur VALID)
        CREATE OR REPLACE TABLE `{table_ref}` AS
        SELECT * EXCEPT(row_status)
        FROM temp_all_trips
        WHERE row_status = 'VALID';

        -- 3. Error Table befüllen (Nur INVALID)
        CREATE OR REPLACE TABLE `{error_table_ref}` AS
        SELECT * EXCEPT(row_status), row_status as rejection_reason
        FROM temp_all_trips
        WHERE row_status != 'VALID';

    END;
    """

    try:
        # Job starten
        query_job = client.query(query)
        query_job.result() # Warten auf Fertigstellung
        
        # Statistik holen
        cnt_valid = client.get_table(table_ref).num_rows
        cnt_error = client.get_table(error_table_ref).num_rows
        status = "SUCCESS"
        print(f"✅ Fertig! Valid: {cnt_valid} | Errors: {cnt_error}")
        
    except Exception as e:
        print(f"❌ Fehler: {e}")
        status = "FAILURE"
        cnt_valid = 0
        cnt_error = 0
        error_msg = str(e)
    
    # Log schreiben
    try:
        end_ts = datetime.datetime.now(datetime.timezone.utc)
        client.insert_rows_json(log_table_ref, [{
            "run_id": run_id,
            "start_time": start_ts.isoformat(),
            "status": status,
            "valid_rows": cnt_valid,
            "error_rows": cnt_error,
            "error_msg": str(error_msg) if 'error_msg' in locals() else None
        }])
        print("Log geschrieben.")
    except:
        pass

run_etl_split_logic()

In [None]:
# Zelle 4: Quality Check
print("--- CLEAN DATA SAMPLE ---")
print(client.query(f"SELECT * FROM `{table_ref}` LIMIT 3").to_dataframe().T)

print("\n--- ERROR DATA SAMPLE (Warum abgelehnt?) ---")
try:
    err_df = client.query(f"SELECT rejection_reason, count(*) as cnt FROM `{error_table_ref}` GROUP BY 1").to_dataframe()
    print(err_df)
except:
    print("Keine Fehler gefunden (Tabelle leer).")