# Notebook para consumir datos de Yahoo Finance

In [1]:
!pip install yfinance
!pip install python-dotenv
!pip install psycopg2-binary

Collecting yfinance
  Downloading yfinance-0.2.66-py2.py3-none-any.whl.metadata (6.0 kB)
Collecting multitasking>=0.0.7 (from yfinance)
  Downloading multitasking-0.0.12.tar.gz (19 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting frozendict>=2.3.4 (from yfinance)
  Downloading frozendict-2.4.7-py3-none-any.whl.metadata (23 kB)
Collecting peewee>=3.16.2 (from yfinance)
  Downloading peewee-3.18.3.tar.gz (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting curl_cffi>=0.7 (from yfinance)
  Downloading curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Collecting websockets>=13.0 (from yfinance)
  Downloading websockets-15.0.1-cp311-cp311-manylinux_2_5_x86_64.manylin

In [2]:
import os, json, calendar
from datetime import datetime, date
import pandas as pd
import yfinance as yf
from pyspark.sql import SparkSession
import psycopg2
from dotenv import load_dotenv
load_dotenv()

True

In [3]:
POSTGRES_HOST = 'warehouses'
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
POSTGRES_DB = os.getenv('POSTGRES_DB')
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
DEST_SCHEMA = os.getenv('RAW_SCHEMA', 'raw')
DEST_TABLE = os.getenv('RAW_TABLE', 'yf_prices')
FULL_TABLE = f"{DEST_SCHEMA}.{DEST_TABLE}"

jar_path = '/home/jovyan/work/postgresql-42.2.5.jar'
spark = SparkSession.builder.config('spark.jars', jar_path).master('local').appName('YF_Spark').getOrCreate()

In [4]:
def create_table():
    try:
        conn = psycopg2.connect(
            host=POSTGRES_HOST,
            port=POSTGRES_PORT,
            database=POSTGRES_DB,
            user=POSTGRES_USER,
            password=POSTGRES_PASSWORD
        )
        cursor = conn.cursor()
        cursor.execute(f"""
            CREATE TABLE IF NOT EXISTS {FULL_TABLE} (
                date TIMESTAMP WITH TIME ZONE NOT NULL,
                ticker VARCHAR(20) NOT NULL,
                open DOUBLE PRECISION,
                high DOUBLE PRECISION,
                low DOUBLE PRECISION,
                close DOUBLE PRECISION,
                adj_close DOUBLE PRECISION,
                volume BIGINT,
                PRIMARY KEY (date, ticker)
            )
        """)
        conn.commit()
        cursor.close()
        conn.close()
        print(f"Tabla {FULL_TABLE} verificada/creada")
    except Exception as e:
        print(f"Error creando tabla: {e}")

create_table()

Tabla raw.prices_daily verificada/creada


In [5]:
def save_checkpoint(checkpoint_file, year, month):
    with open(checkpoint_file, 'w') as f:
        json.dump({'year': year, 'month': month}, f)

def load_checkpoint(checkpoint_file):
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'r') as f:
            return json.load(f)
    return {'year': 0, 'month': 0}

In [6]:
def month_iter(start_yyyy_mm, end_yyyy_mm):
    sy, sm = map(int, start_yyyy_mm.split('-'))
    ey, em = map(int, end_yyyy_mm.split('-'))
    y, m = sy, sm
    while (y < ey) or (y == ey and m <= em):
        yield y, m
        m += 1
        if m > 12:
            m = 1
            y += 1

def to_iso(raw):
    parts = raw.split('-')
    if len(parts) == 3:
        if len(parts[0]) == 2:
            d, m, y = parts
            return f"{y}-{m}-{d}"
        elif len(parts[0]) == 4:
            return raw
    raise ValueError(f"Formato de fecha invalido: {raw}")

In [7]:
START_DATE = to_iso(os.getenv('START_DATE'))
END_DATE = to_iso(os.getenv('END_DATE'))
start_dt = datetime.strptime(START_DATE, '%Y-%m-%d').date()
end_dt = datetime.strptime(END_DATE, '%Y-%m-%d').date()
start_yyyy_mm = f"{start_dt.year}-{start_dt.month:02d}"
end_yyyy_mm = f"{end_dt.year}-{end_dt.month:02d}"

TICKERS = [t.strip() for t in os.getenv('TICKERS').split(',') if t.strip()]
if not TICKERS or not POSTGRES_DB or not POSTGRES_USER or not POSTGRES_PASSWORD:
    raise RuntimeError('Faltan TICKERS o credenciales POSTGRES')

total_insertadas = 0

In [8]:
for ticker in TICKERS:
    CHECKPOINT_FILE = f"checkpointYF_{ticker}.json"
    checkpoint = load_checkpoint(CHECKPOINT_FILE)
    print(f'Ticker: {ticker}, checkpoint: {checkpoint}')
    
    months_list = list(month_iter(start_yyyy_mm, end_yyyy_mm))
    
    if checkpoint != {'year':0,'month':0} and (checkpoint['year'], checkpoint['month']) in months_list:
        idx = months_list.index((checkpoint['year'], checkpoint['month']))
        months_list = months_list[idx+1:]
    
    for year_i, month_i in months_list:
        print(f"Procesando {ticker}: {year_i}-{month_i:02d}")
        
        month_start = date(year_i, month_i, 1)
        month_end_day = calendar.monthrange(year_i, month_i)[1]
        month_end = date(year_i, month_i, month_end_day)
        
        try:
            df = yf.download(ticker, start=str(month_start), end=str(month_end + pd.Timedelta(days=1)), progress=False, auto_adjust=False)
            
            if df.empty:
                print(f"Sin datos para {ticker} en {year_i}-{month_i:02d}")
                save_checkpoint(CHECKPOINT_FILE, year_i, month_i)
                continue
            
            df = df.reset_index()
            
            print(f"Columnas originales: {df.columns.tolist()}")
            print(f"Tipos de columnas: {[type(c) for c in df.columns]}")
            
            new_columns = []
            for col in df.columns:
                if isinstance(col, tuple):
                    clean_col = next((item for item in col if item), '')
                    new_columns.append(clean_col)
                else:
                    new_columns.append(col)
            
            df.columns = new_columns
            print(f"Columnas limpiadas: {df.columns.tolist()}")
            
            rename_dict = {}
            for col in df.columns:
                col_lower = str(col).lower()
                if 'date' in col_lower or col == 'Datetime':
                    rename_dict[col] = 'date'
                elif 'open' in col_lower:
                    rename_dict[col] = 'open'
                elif 'high' in col_lower:
                    rename_dict[col] = 'high'
                elif 'low' in col_lower:
                    rename_dict[col] = 'low'
                elif 'close' == col_lower:
                    rename_dict[col] = 'close'
                elif 'adj' in col_lower:
                    rename_dict[col] = 'adj_close'
                elif 'volume' in col_lower:
                    rename_dict[col] = 'volume'
            
            df = df.rename(columns=rename_dict)
            print(f"Columnas renombradas: {df.columns.tolist()}")
            
            df['date'] = pd.to_datetime(df['date']).dt.tz_localize('UTC')
            df['ticker'] = ticker
            
            final_df = pd.DataFrame()
            for col in ['date', 'ticker', 'open', 'high', 'low', 'close', 'adj_close', 'volume']:
                if col in df.columns:
                    final_df[col] = df[col]
            
            print(f"Descargados {len(final_df)} registros")
            
        except Exception as e:
            print(f"Error descargando datos: {e}")
            break
        
        try:
            conn = psycopg2.connect(
                host=POSTGRES_HOST,
                port=POSTGRES_PORT,
                database=POSTGRES_DB,
                user=POSTGRES_USER,
                password=POSTGRES_PASSWORD
            )
            cursor = conn.cursor()
            cursor.execute(
                f"DELETE FROM {FULL_TABLE} WHERE ticker=%s AND date::date BETWEEN %s AND %s",
                (ticker, str(month_start), str(month_end))
            )
            conn.commit()
            cursor.close()
            conn.close()
            print("Registros anteriores eliminados")
        except Exception as e:
            print(f"Error eliminando registros: {e}")
        
        try:
            sdf = spark.createDataFrame(final_df)
            
            print("Esquema de Spark DataFrame:")
            sdf.printSchema()
            
            sdf.write.format('jdbc') \
                .option('url', f"jdbc:postgresql://{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}") \
                .option('driver', 'org.postgresql.Driver') \
                .option('dbtable', FULL_TABLE) \
                .option('user', POSTGRES_USER) \
                .option('password', POSTGRES_PASSWORD) \
                .mode('append') \
                .save()
            
            cnt = len(final_df)
            total_insertadas += cnt
            print(f"Insertadas {cnt} filas")
            
        except Exception as e2:
            print(f"Error escribiendo a PostgreSQL: {e2}")
            print("Primeras filas del DataFrame:")
            print(final_df.head())
            break
        
        save_checkpoint(CHECKPOINT_FILE, year_i, month_i)

print(f"Total filas insertadas: {total_insertadas}")

Ticker: AAPL, checkpoint: {'year': 0, 'month': 0}
Procesando AAPL: 2020-01
Columnas originales: [('Date', ''), ('Adj Close', 'AAPL'), ('Close', 'AAPL'), ('High', 'AAPL'), ('Low', 'AAPL'), ('Open', 'AAPL'), ('Volume', 'AAPL')]
Tipos de columnas: [<class 'tuple'>, <class 'tuple'>, <class 'tuple'>, <class 'tuple'>, <class 'tuple'>, <class 'tuple'>, <class 'tuple'>]
Columnas limpiadas: ['Date', 'Adj Close', 'Close', 'High', 'Low', 'Open', 'Volume']
Columnas renombradas: ['date', 'adj_close', 'close', 'high', 'low', 'open', 'volume']
Descargados 21 registros
Registros anteriores eliminados
Esquema de Spark DataFrame:
root
 |-- date: timestamp (nullable = true)
 |-- ticker: string (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- adj_close: double (nullable = true)
 |-- volume: long (nullable = true)

Insertadas 21 filas
Procesando AAPL: 2020-02
Columnas originales: [('Date', 

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 55994)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =