# Database Upload Pipeline: Processed Vehicle Registration Data

This notebook handles the upload of all processed German vehicle registration CSV 
files from the analysis pipeline into a PostgreSQL database. The implementation 
automatically detects CSV files, determines appropriate table structures, and 
manages database connections through environment configuration.

## Workflow Overview
1. Load database connection parameters from environment file
2. Establish SQLAlchemy connection to PostgreSQL database
3. Process CSV files with appropriate delimiters and German character support
4. Create or replace database tables with standardized schemas
5. Upload data with proper error handling and progress monitoring

## Key Variables
- `config`: Database connection parameters from .env file
- `engine`: SQLAlchemy database engine for connection management
- `DATA_DIR`: Directory containing processed CSV files
- `pg_schema`: Target PostgreSQL schema for table creation

## Prerequisites
- .env file must contain valid PostgreSQL connection parameters
- Target database and schema must exist and be accessible
- CSV files must be properly formatted with UTF-8 encoding in processed data directory
- Required Python packages: pandas, SQLAlchemy, python-dotenv

In [None]:
# === Import essential libraries for database upload operations ===
from pathlib import Path           # Modern path handling for cross-platform compatibility
from dotenv import dotenv_values   # Environment configuration loading
import pandas as pd               # Data manipulation and analysis framework
import re                          # Regular expression pattern matching
from sqlalchemy import create_engine    # Database connection management
from sqlalchemy.types import Text # Database column type specification

## Database Configuration

Load PostgreSQL connection parameters from environment variables.

In [None]:
# === Load environment configuration file as dictionary ===
config    = dotenv_values()  # Read .env file into string dictionary

# === Extract PostgreSQL connection parameters ===
pg_user   = config['POSTGRES_USER']    # Database username credential
pg_pass   = config['POSTGRES_PASS']    # Database password credential
pg_host   = config['POSTGRES_HOST']    # Database server hostname/IP
pg_port   = config['POSTGRES_PORT']    # Database server port number
pg_db     = config['POSTGRES_DB']      # Target database name
pg_schema = config['POSTGRES_SCHEMA']  # Target schema for table creation

In [None]:
# === Build PostgreSQL connection URL for SQLAlchemy ===
db_url = f"postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}"

# === Create database engine for connection management ===
engine = create_engine(db_url)  # SQLAlchemy engine for database operations

## CSV Processing and Upload

Process specific CSV files with numeric conversion and upload to database.

In [None]:
# === Define data directory and target files for processing ===
DATA_DIR  = Path("../data/processed/,")                # Processed data directory
FILES     = ["_handelsnamen_pkw.csv", "_modellreihen.csv"]  # Specific files to process
KEEP_TEXT = 5                                          # Number of text columns to preserve

def _to_float(col: pd.Series) -> pd.Series:
    """
    Convert string column to float with German formatting and dash handling.
    
    Args:
        col (pd.Series): String column containing numeric data
        
    Returns:
        pd.Series: Numeric column with proper float64 dtype
    """
    # === Compile regex for various dash characters ===
    dash_rx = re.compile(r"^[-\u2013\u2014]$")
    # === Remove leading/trailing whitespace ===
    col = col.str.strip()
    # === Replace dashes and dots with NA ===
    col = col.mask(col.str.match(dash_rx) | (col == "."), pd.NA)
    # === Remove spaces and thousand separators ===
    col = col.str.replace(r"\s|\.", "", regex=True)
    # === Convert German decimal comma to dot ===
    col = col.str.replace(",", ".", regex=False)
    # === Convert to numeric, invalid values become NaN ===
    return pd.to_numeric(col, errors="coerce")

# === Process each target file ===
for fname in FILES:
    path = DATA_DIR / fname
    # === Check if file exists before processing ===
    if not path.exists():
        print(f"!! {fname} not found")
        continue

    # === Load CSV as string data ===
    df = pd.read_csv(path, dtype=str)
    # === Clean column names (remove whitespace) ===
    df.columns = df.columns.str.strip()

    # === Remove columns containing 'ZS' (special handling) ===
    zs_mask = df.columns.str.contains(r"ZS\s|\sZS", case=False, regex=True)
    if zs_mask.any():
        df = df.loc[:, ~zs_mask]

    # === Convert numeric columns (skip first 5 text columns) ===
    numeric_cols = df.columns[KEEP_TEXT:]
    for col in numeric_cols:
        df[col] = _to_float(df[col])

    # === Save processed data back to file ===
    df.to_csv(path, index=False, encoding="utf-8")
    # === Display DataFrame info for verification ===
    df.info()

print("\nReady.")

## Bulk Database Upload

Process all CSV files in subdirectories and upload to PostgreSQL database.


In [None]:
# === Define base directory for processed CSV files ===
data_dir = Path("../data/processed/")  # Directory containing CSV files for upload

In [None]:
# Walk both “;” and “,” subfolders
# === Walk through subdirectories to find CSV files with different delimiters ===
for csv_path in data_dir.glob("*/*.csv"):
    # === Derive table name from filename with normalization ===
    table_name = csv_path.stem.lower().replace("-", "_")  # Convert to SQL-safe naming

    # === Determine delimiter from parent directory name ===
    sep = csv_path.parent.name  # Directory name indicates delimiter (";" or ",")

    # === Read CSV with appropriate delimiter and encoding ===
    df = pd.read_csv(
        csv_path,                      # File path to process
        sep=sep,                       # Use directory-specified delimiter
        engine="python",               # Python engine for flexible parsing
        on_bad_lines="warn",           # Warn about malformed lines
        encoding="utf-8"               # UTF-8 encoding for German characters
    )
    
    # === Upload DataFrame to PostgreSQL database ===
    df.to_sql(
        name      = table_name,        # SQL table name derived from filename
        con       = engine,            # SQLAlchemy database connection
        schema    = pg_schema,         # Target schema for table creation
        if_exists = "replace",         # Replace existing table if present
        index     = False              # Don't include DataFrame index as column
    )
    
    # === Confirm successful upload with table identifier ===
    print(f"Uploaded: {pg_schema}.{table_name}")