In [1]:
import os
from datetime import datetime

import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text

In [2]:
TICKERS = os.getenv("TICKERS", "AAPL").split(",")
START_DATE = os.getenv("START_DATE")
END_DATE = os.getenv("END_DATE")
DATA_PROVIDER = os.getenv("DATA_PROVIDER")
RUN_ID = os.getenv("RUN_ID")

PG_HOST = os.getenv("PG_HOST")
PG_PORT = os.getenv("PG_PORT")
PG_DB = os.getenv("PG_DB")
PG_USER = os.getenv("PG_USER")
PG_PASSWORD = os.getenv("PG_PASSWORD")
PG_SCHEMA_RAW = os.getenv("PG_SCHEMA_RAW")

print("TICKERS:", TICKERS)
print("START_DATE:", START_DATE)
print("END_DATE:", END_DATE)
print("RUN_ID:", RUN_ID)

TICKERS: ['AAPL']
START_DATE: 2018-01-01
END_DATE: 2025-11-28
RUN_ID: run_001


In [3]:
PG_URL = f"postgresql+psycopg2://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}"
engine = create_engine(PG_URL)

engine

Engine(postgresql+psycopg2://root:***@postgres:5432/trading_db)

In [4]:
create_schema_sql = f"CREATE SCHEMA IF NOT EXISTS {PG_SCHEMA_RAW};"

create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {PG_SCHEMA_RAW}.prices_daily (
    date DATE NOT NULL,
    ticker TEXT NOT NULL,
    
    open FLOAT,
    high FLOAT,
    low FLOAT,
    close FLOAT,
    adj_close FLOAT,
    volume BIGINT,

    run_id TEXT,
    ingested_at_utc TIMESTAMP,
    source_name TEXT,

    PRIMARY KEY (date, ticker)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_schema_sql))
    conn.execute(text(create_table_sql))
    conn.commit()

print("Tabla raw.prices_daily verificada/creada")

Tabla raw.prices_daily verificada/creada


In [5]:
all_data = []

for ticker in TICKERS:
    print(f"Descargando {ticker}...")

    df = yf.download(
        ticker,
        start=START_DATE,
        end=END_DATE,
        auto_adjust=False,
        progress=False
    )

    if df.empty:
        print(f"No se descargaron datos para {ticker}")
        continue

    df = df.reset_index()

    if isinstance(df.columns, pd.MultiIndex):
        df.columns = [col[0] for col in df.columns]

    df["ticker"] = ticker
    df["run_id"] = RUN_ID
    df["ingested_at_utc"] = datetime.utcnow()
    df["source_name"] = DATA_PROVIDER

    df = df.rename(columns={
        "Date": "date",
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Adj Close": "adj_close",
        "Volume": "volume"
    })

    df = df[
        [
            "date", "ticker", "open", "high", "low",
            "close", "adj_close", "volume",
            "run_id", "ingested_at_utc", "source_name"
        ]
    ]

    all_data.append(df)

df_prices = pd.concat(all_data, ignore_index=True)

with engine.connect() as conn:
    delete_sql = f"""
        DELETE FROM {PG_SCHEMA_RAW}.prices_daily
        WHERE ticker = ANY(:tickers)
    """
    conn.execute(text(delete_sql), {"tickers": TICKERS})
    conn.commit()

    print("Registros previos eliminados para los tickers:", TICKERS)

df_prices.to_sql(
    "prices_daily",
    engine,
    schema=PG_SCHEMA_RAW,
    if_exists="append",
    index=False,
    method="multi",
    chunksize=1000
)

print("Datos insertados en raw.prices_daily")

Descargando AAPL...
Registros previos eliminados para los tickers: ['AAPL']
Datos insertados en raw.prices_daily


In [6]:
df_prices.dtypes

date               datetime64[ns]
ticker                     object
open                      float64
high                      float64
low                       float64
close                     float64
adj_close                 float64
volume                      int64
run_id                     object
ingested_at_utc    datetime64[us]
source_name                object
dtype: object

In [7]:
query = f"""
SELECT
    ticker,
    COUNT(*) AS total_registros,
    MIN(date) AS fecha_minima,
    MAX(date) AS fecha_maxima
FROM {PG_SCHEMA_RAW}.prices_daily
GROUP BY ticker
ORDER BY ticker;
"""

df_check = pd.read_sql(query, engine)
df_check

Unnamed: 0,ticker,total_registros,fecha_minima,fecha_maxima
0,AAPL,1988,2018-01-02,2025-11-26
