<a href="https://colab.research.google.com/github/jmcconne100/Pandas_Notebook_Project/blob/main/Integration_helpers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# integration_helpers.py
from __future__ import annotations
import io
import json
import gzip
from typing import Optional, Dict, Any

import pandas as pd

# ---- S3 ----
import boto3
from botocore.config import Config
from botocore.exceptions import BotoCoreError, ClientError

# ---- SQL ----
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine


# ---------- S3: Write DataFrame as JSON / JSON Lines ----------
def df_to_json_s3(
    df: pd.DataFrame,
    bucket: str,
    key: str,
    *,
    json_orient: str = "records",      # 'records' or 'table' (for whole JSON doc)
    json_lines: bool = False,          # True => JSON Lines (one object per line)
    gzip_compress: bool = False,
    content_type: Optional[str] = None,
    s3_client: Optional[Any] = None,
    extra_put_kwargs: Optional[Dict[str, Any]] = None,
    region_name: Optional[str] = None,
    max_attempts: int = 5,
    timeout_s: int = 30,
) -> Dict[str, Any]:
    """
    Serialize a DataFrame to JSON or JSON Lines and upload to S3.

    Example:
        df_to_json_s3(df, "my-bucket", "exports/data.json.gz",
                      json_orient="records", json_lines=True, gzip_compress=True)
    """
    # Serialize
    if json_lines:
        # Each row -> one JSON object line
        buf = io.StringIO()
        df.to_json(buf, orient="records", lines=True)  # pandas handles newlines
        payload = buf.getvalue().encode("utf-8")
        default_ct = "application/x-ndjson"
    else:
        obj = json.loads(df.to_json(orient=json_orient))
        payload = json.dumps(obj, ensure_ascii=False).encode("utf-8")
        default_ct = "application/json"

    # Optional gzip
    if gzip_compress:
        gzbuf = io.BytesIO()
        with gzip.GzipFile(fileobj=gzbuf, mode="wb") as gz:
            gz.write(payload)
        payload = gzbuf.getvalue()
        default_ct = default_ct  # content-type stays JSON; encoding signals gzip

    # S3 client with retries/timeouts
    s3 = s3_client or boto3.client(
        "s3",
        region_name=region_name,
        config=Config(retries={"max_attempts": max_attempts, "mode": "standard"},
                      connect_timeout=timeout_s, read_timeout=timeout_s),
    )

    put_kwargs = extra_put_kwargs.copy() if extra_put_kwargs else {}
    put_kwargs.setdefault("Bucket", bucket)
    put_kwargs.setdefault("Key", key)
    put_kwargs.setdefault("Body", payload)

    # Headers
    ct = content_type or default_ct
    put_kwargs.setdefault("ContentType", ct)
    if gzip_compress:
        put_kwargs.setdefault("ContentEncoding", "gzip")

    try:
        resp = s3.put_object(**put_kwargs)
        return {"bucket": bucket, "key": key, "etag": resp.get("ETag")}
    except (BotoCoreError, ClientError) as e:
        raise RuntimeError(f"S3 upload failed for s3://{bucket}/{key}: {e}") from e


# ---------- S3: Read CSV into DataFrame ----------
def read_s3_csv(
    bucket: str,
    key: str,
    *,
    s3_client: Optional[Any] = None,
    pandas_read_csv_kwargs: Optional[Dict[str, Any]] = None,
    region_name: Optional[str] = None,
    max_attempts: int = 5,
    timeout_s: int = 30,
) -> pd.DataFrame:
    """
    Read a CSV object from S3 (auto-handling bytes → pandas).

    Example:
        df = read_s3_csv("my-bucket", "landing/sales_2025-10-17.csv",
                         pandas_read_csv_kwargs={"dtype": {"id": "Int64"}})
    """
    s3 = s3_client or boto3.client(
        "s3",
        region_name=region_name,
        config=Config(retries={"max_attempts": max_attempts, "mode": "standard"},
                      connect_timeout=timeout_s, read_timeout=timeout_s),
    )
    try:
        obj = s3.get_object(Bucket=bucket, Key=key)
        body = obj["Body"].read()
    except (BotoCoreError, ClientError) as e:
        raise RuntimeError(f"S3 download failed for s3://{bucket}/{key}: {e}") from e

    buf = io.BytesIO(body)
    kwargs = pandas_read_csv_kwargs or {}
    try:
        return pd.read_csv(buf, **kwargs)
    except Exception as e:
        raise ValueError(f"Failed to parse CSV from s3://{bucket}/{key}: {e}") from e


# ---------- SQL: Write DataFrame to Database ----------
def write_sql(
    df: pd.DataFrame,
    conn_str: str | Engine,
    table: str,
    *,
    schema: Optional[str] = None,
    if_exists: str = "replace",   # 'fail' | 'replace' | 'append'
    index: bool = False,
    chunksize: Optional[int] = 10_000,
    method: Optional[str] = None, # e.g., 'multi' for faster inserts on some DBs
    dtype: Optional[Dict[str, Any]] = None,
    create_table_sql: Optional[str] = None,  # optional DDL to run before write
) -> None:
    """
    Write a DataFrame to a SQL table using SQLAlchemy.

    conn_str examples:
      - 'postgresql+psycopg2://user:pass@host:5432/dbname'
      - 'sqlite:///local.db'
      - Engine instance from create_engine(...)

    Example:
        write_sql(df, "sqlite:///demo.db", "sales", if_exists="append", chunksize=5000)
    """
    engine: Engine = conn_str if isinstance(conn_str, Engine) else create_engine(conn_str)
    try:
        with engine.begin() as conn:
            if create_table_sql:
                conn.execute(text(create_table_sql))
            df.to_sql(
                name=table,
                con=conn,
                if_exists=if_exists,
                index=index,
                schema=schema,
                chunksize=chunksize,
                method=method,
                dtype=dtype,
            )
    finally:
        # Dispose only if we created it here (simple heuristic)
        if not isinstance(conn_str, Engine):
            engine.dispose()
