In [None]:
from sqlalchemy import create_engine, text, Engine
from urllib.parse import quote_plus
import polars
from fastavro import writer, parse_schema


## Connection

In [41]:
params = {
    "database": "postgresql",
    "istance_name": "meteo",
    "host": "localhost",
    "port": 5433,
    "user": "meteo",
    "password": "meteo",
    "ssl_args": {}
} 

In [35]:
def source_conn(
    database: str,
    user: str,
    password: str,
    host: str,
    port: int,
    istance_name: str,
    cx_oracle: bool = False,
    ssl_args: dict | None = None,
    mssql_driver: str = "ODBC Driver 17 for SQL Server"
) -> Engine:
    
    """
    Create a SQLAlchemy engine for multiple database types.
    Supported: PostgreSQL, MySQL/MariaDB, Oracle, SQL Server
    """

    ssl_args = ssl_args or {}
    db_url = None

    # PostgreSQL
    if database.lower() in ('postgresql', 'pg', 'postgres', 'postgre'):
        db_url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{istance_name}"

    # MariaDB / MySQL
    elif database.lower() in ('mariadb', 'mysql'):
        db_url = f"{database.lower()}+pymysql://{user}:{password}@{host}:{port}/{istance_name}"

    # Oracle
    elif database.lower() == 'oracle':
        py_module = 'oracledb' if not cx_oracle else 'cx_oracle'
        db_url = f"oracle+{py_module}://{user}:{password}@{host}:{port}/{istance_name}"

    # SQL Server
    elif database.lower() in ('mssql', 'ms-sql', 'sqlserver', 'sql server'):
        conn_str = (
            f"DRIVER={{{mssql_driver}}};"
            f"SERVER={host},{port};DATABASE={istance_name};UID={user};PWD={password}"
        )
        params = quote_plus(conn_str)
        db_url = f"mssql+pyodbc:///?odbc_connect={params}"

    else:
        raise ValueError(f"Unsupported database type: {database}")

    engine = create_engine(db_url, connect_args=ssl_args)

    return engine


In [None]:
eng = source_conn(**params)
conn = eng.connect()

# Queries Templates

# wath out for avro schema

https://hackolade.com/help/Avroschema.html

https://chatgpt.com/share/68e15494-c200-8009-9e00-e2b12d40aa42

In [None]:
import os

# --- Configuration ---
QUERY = text("SELECT * FROM public.fct_meteo;")
QUERY_SCHEMA = 
OUTPUT_DIR = "avro_output"
OUTPUT_FILE = "fct_meteo.avro"

os.makedirs(OUTPUT_DIR, exist_ok=True)

In [None]:
# --- Define Avro schema ---
# IMPORTANT: Adapt the schema fields to match your DB table columns and types!
# Here is a simple example with two columns; extend as needed.

avro_schema = {
    "doc": "fct_meteo table schema",
    "name": "fct_meteo_record",
    "namespace": "example.avro",
    "type": "record",
    "fields": [
        {"name": "column1", "type": "string"},
        {"name": "column2", "type": "int"},
        # Add all your columns here
    ],
}

parsed_schema = parse_schema(avro_schema)

# --- Streaming query and write Avro ---
with engine.connect() as conn:
    result = conn.execution_options(stream_results=True).execute(QUERY)
    columns = result.keys()

    # Open Avro file writer
    output_path = os.path.join(OUTPUT_DIR, OUTPUT_FILE)
    with open(output_path, "wb") as out_file:
        avro_writer = writer(out_file, parsed_schema)

        # Stream rows and write one-by-one
        for row in result:
            record = {}
            for col_name, value in zip(columns, row):
                # If your schema expects non-nullable fields, handle nulls here if needed
                record[col_name] = value
            avro_writer.write(record)

print(f"Avro file written: {output_path}")
