In [2]:
# ============================================================
# Notebook: Batch + Spark Structured Streaming
# ============================================================

# ---------------------------
# 0) Instalar dependencias
# ---------------------------
print("1) Instalando librerías necesarias...")
!pip install pyspark==3.5.1 statsmodels openpyxl pandas numpy matplotlib --quiet

# ---------------------------
# 1) Librerías
# ---------------------------
import os, time, uuid, threading
from datetime import timedelta
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType
from pyspark.sql.functions import col, to_timestamp

from statsmodels.tsa.stattools import adfuller, coint
import statsmodels.api as sm

print("Librerías cargadas")

# ---------------------------
# 2) Pedir archivo y leer Excel
# ---------------------------
from google.colab import files
print("\n2) Por favor SUBE tu archivo 'Datos.xlsx' (columnas: Fecha, IPSA, COLCAP).")
uploaded = files.upload()  # sube el archivo cuando aparezca el diálogo
file_name = list(uploaded.keys())[0]
print("Archivo subido:", file_name)

# leer con pandas
df = pd.read_excel(file_name)
# normalizar nombres
df.columns = [c.strip() for c in df.columns]
expected = ["Fecha", "IPSA", "COLCAP"]
if not all([name in df.columns for name in expected]):
    raise ValueError(f"El archivo debe contener las columnas: {expected}. Columnas encontradas: {list(df.columns)}")

df['Fecha'] = pd.to_datetime(df['Fecha'])
df = df.sort_values('Fecha').reset_index(drop=True)
print(f"Datos históricos: {len(df)} filas. Desde {df['Fecha'].min().date()} hasta {df['Fecha'].max().date()}")

# ---------------------------
# 3) Iniciar SparkSession
# ---------------------------
print("\n3) Iniciando SparkSession...")
spark = SparkSession.builder \
    .appName("IPSA_COLCAP_Batch_Streaming") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()
print("Spark version:", spark.version)

# ---------------------------
# 4) Análisis BATCH inicial
# ---------------------------
print("\n4) Análisis batch (ADF en niveles y retornos, Engle-Granger)")

# log-prices and returns
df['Log_IPSA'] = np.log(df['IPSA'])
df['Log_COLCAP'] = np.log(df['COLCAP'])
df['rIPSA'] = df['Log_IPSA'].diff()
df['rCOLCAP'] = df['Log_COLCAP'].diff()
df_clean = df.dropna().copy()

def run_adf(series, name):
    stat, pval, _, _, crit, _ = adfuller(series.dropna())
    print(f" ADF {name}: stat={stat:.4f}, p={pval:.6f}, crit={crit}")
    return stat, pval

# ADF niveles
run_adf(df_clean['Log_IPSA'], "IPSA (nivel)")
run_adf(df_clean['Log_COLCAP'], "COLCAP (nivel)")
# ADF retornos
run_adf(df_clean['rIPSA'], "IPSA (retornos)")
run_adf(df_clean['rCOLCAP'], "COLCAP (retornos)")

# Engle-Granger cointegration on log-prices
y = df_clean['Log_IPSA'].values
x = df_clean['Log_COLCAP'].values
eg_stat, eg_p, eg_crit = coint(y, x)
print(f"\nEngle-Granger: stat={eg_stat:.4f}, p-value={eg_p:.6f}, crit={eg_crit}")
initial_cointegrated = (eg_p < 0.05)
print("¿Cointegración histórica? ", initial_cointegrated)

# ---------------------------
# 5) Preparar carpeta de streaming (API simulada escribirá JSON)
# ---------------------------
print("\n5) Preparando carpeta para Spark Structured Streaming (simulación API)...")
stream_dir = "/content/stream_api/"
if os.path.exists(stream_dir):
    # limpiar para evitar archivos viejos
    import shutil
    shutil.rmtree(stream_dir)
os.makedirs(stream_dir, exist_ok=True)
print("Carpeta creada:", stream_dir)

# Definir schema para lectura de JSON
schema = StructType([
    StructField("Fecha", TimestampType(), True),
    StructField("IPSA", DoubleType(), True),
    StructField("COLCAP", DoubleType(), True)
])

# ---------------------------
# 6) Buffer en driver para almacenar datos "reales + stream"
# ---------------------------
# empezamos con los datos históricos ya en df_buffer
df_buffer = df_clean[['Fecha','IPSA','COLCAP','Log_IPSA','Log_COLCAP','rIPSA','rCOLCAP']].copy()
df_buffer = df_buffer.reset_index(drop=True)

# parámetros de vigilancia
ROLLING_WINDOW = 60   # nº de observaciones para rolling correlation (en días)
COINT_WINDOW = 250    # ventana para re-ejecutar test de cointegración (últimos N obs)
CORR_ALERT_LOW = 0.3  # umbral baja correlación
VOL_ALERT_MULT = 1.5  # si vol actual > 1.5 * vol historial -> alerta

print(f"Buffer inicial: {len(df_buffer)} observaciones")

# ---------------------------
# 7) Función que procesa cada micro-batch
# ---------------------------
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, log as spark_log

# global variables referenced inside foreachBatch
global_state = {
    "df_buffer": df_buffer,
    "initial_cointegrated": initial_cointegrated
}

def process_microbatch(microbatch_df, batch_id):
    """
    Esta función se ejecuta en el driver para cada micro-batch.
    microbatch_df: Spark DataFrame (micro-batch)
    batch_id: id del batch
    """
    if microbatch_df.rdd.isEmpty():
        print(f"[batch {batch_id}] microbatch vacío")
        return

    # convertir a pandas
    pdf = microbatch_df.toPandas()
    pdf = pdf.sort_values("Fecha").reset_index(drop=True)

    # calcular log y retornos en el microbatch
    pdf['Log_IPSA'] = np.log(pdf['IPSA'])
    pdf['Log_COLCAP'] = np.log(pdf['COLCAP'])
    pdf['rIPSA'] = pdf['Log_IPSA'].diff()
    pdf['rCOLCAP'] = pdf['Log_COLCAP'].diff()

    # append to global buffer
    buf = global_state['df_buffer']
    buf = pd.concat([buf, pdf[['Fecha','IPSA','COLCAP','Log_IPSA','Log_COLCAP','rIPSA','rCOLCAP']]], ignore_index=True)
    buf = buf.drop_duplicates(subset=['Fecha']).sort_values('Fecha').reset_index(drop=True)

    # keep a reasonable maximum length to avoid memoria infinita (por ejemplo 5 años * ~252 = 1260)
    maxlen = max(COINT_WINDOW*2, 1500)
    if len(buf) > maxlen:
        buf = buf.iloc[-maxlen:].reset_index(drop=True)

    # rolling correlation (last ROLLING_WINDOW)
    if len(buf) >= ROLLING_WINDOW:
        recent = buf[['rIPSA','rCOLCAP']].dropna().iloc[-ROLLING_WINDOW:]
        rolling_corr = recent['rIPSA'].corr(recent['rCOLCAP'])
    else:
        rolling_corr = np.nan

    # volatility comparison: recent vs historical (std of returns)
    hist_vol = buf['rIPSA'].std()  # global vol (IPSA)
    recent_vol = buf['rIPSA'].dropna().iloc[-ROLLING_WINDOW:].std() if len(buf.dropna())>=ROLLING_WINDOW else np.nan

    # cointegration on last COINT_WINDOW observations (log prices)
    has_cointegr = False
    if len(buf) >= COINT_WINDOW:
        Yc = buf['Log_IPSA'].values[-COINT_WINDOW:]
        Xc = buf['Log_COLCAP'].values[-COINT_WINDOW:]
        try:
            statc, pc, _ = coint(Yc, Xc)
            has_cointegr = (pc < 0.05)
        except Exception as e:
            print("Error cointegration test:", e)
            statc, pc = np.nan, np.nan
    else:
        statc, pc = np.nan, np.nan

    # print summary and possible alerts
    print(f"\n[batch {batch_id}] Procesadas {len(pdf)} observaciones. Buffer total: {len(buf)}")
    print(f" RollingCorr({ROLLING_WINDOW}) = {rolling_corr:.4f}")
    if not np.isnan(recent_vol) and not np.isnan(hist_vol):
        print(f" Volatility IPA recent/std = {recent_vol:.6f} (hist={hist_vol:.6f})")
    print(f" Cointegration last {COINT_WINDOW}: stat={statc}, p={pc}")

    # alertas
    if not np.isnan(rolling_corr) and rolling_corr < CORR_ALERT_LOW:
        print("!!! ALERTA: Correlación rolling baja <", CORR_ALERT_LOW)

    if not np.isnan(recent_vol) and not np.isnan(hist_vol) and recent_vol > VOL_ALERT_MULT * hist_vol:
        print("!!! ALERTA: Volatilidad reciente > {VOL_ALERT_MULT} * volatilidad histórica")

    # detect change in cointegration state vs initial
    prev_state = global_state.get('initial_cointegrated', False)
    if not np.isnan(pc):
        if (pc < 0.05) and (not prev_state):
            print("+++ ALERTA: Cointegración APARECIÓ en la ventana reciente")
            global_state['initial_cointegrated'] = True
        elif (pc >= 0.05) and prev_state:
            print("--- ALERTA: Cointegración DESAPARECIÓ en la ventana reciente")
            global_state['initial_cointegrated'] = False

    # update buffer
    global_state['df_buffer'] = buf

# ---------------------------
# 8) Crear streaming DataFrame y arrancar query con foreachBatch
# ---------------------------
print("\n8) Configurando Spark Structured Streaming (leyendo JSON desde carpeta)...")

streaming_df = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).json(stream_dir)

# start streaming with foreachBatch
query = streaming_df.writeStream.foreachBatch(process_microbatch).option("checkpointLocation","/content/checkpoint_ipsc_colcap").trigger(processingTime='5 seconds').start()
print("Streaming iniciado. Esperando archivos en:", stream_dir)

# ---------------------------
# 9) API simulada: escribimos N archivos JSON a la carpeta stream_dir
# ---------------------------
print("\n9) Iniciando simulación de API que escribe nuevos ticks cada 5s (10 iteraciones).")
# We'll simulate 10 new "days" arriving one every 5 seconds
N_SIM = 10
interval_secs = 5

# start date = last date in buffer
last_date = global_state['df_buffer']['Fecha'].iloc[-1]

# use last prices as seed
last_ipsa = global_state['df_buffer']['IPSA'].iloc[-1]
last_col = global_state['df_buffer']['COLCAP'].iloc[-1]

for i in range(N_SIM):
    new_date = last_date + timedelta(days=1 + i)
    # simulate small random walk return
    new_ipsa = float(last_ipsa * (1 + np.random.normal(0, 0.002)))
    new_col = float(last_col * (1 + np.random.normal(0, 0.002)))
    rec = pd.DataFrame([{"Fecha": new_date, "IPSA": new_ipsa, "COLCAP": new_col}])
    # write json file to stream_dir; Spark will pick it up
    file_path = os.path.join(stream_dir, f"tick_{i}_{uuid.uuid4().hex}.json")
    rec.to_json(file_path, orient='records', date_format='iso')
    print(f"Sim API -> creado: {file_path}  [{new_date.date()}] (IPSA={new_ipsa:.4f}, COLCAP={new_col:.4f})")
    time.sleep(interval_secs)

# Allow some time for final micro-batches to be processed
print("\nSimulación completada. Esperando procesamiento final (15s)...")
time.sleep(15)

# stop query
print("\nDeteniendo streaming query...")
query.stop()
print("Streaming detenido.")

# ---------------------------
# 10) Resultado final: mostrar últimas filas del buffer y re-ejecutar test de cointegración final
# ---------------------------
final_buf = global_state['df_buffer'].copy()
print("\nÚltimas 8 observaciones del buffer (incluye histórico + stream):")
print(final_buf[['Fecha','IPSA','COLCAP']].tail(8))

if len(final_buf) >= COINT_WINDOW:
    Yf = final_buf['Log_IPSA'].values[-COINT_WINDOW:]
    Xf = final_buf['Log_COLCAP'].values[-COINT_WINDOW:]
else:
    Yf = final_buf['Log_IPSA'].values
    Xf = final_buf['Log_COLCAP'].values

statf, pf, _ = coint(Yf, Xf)
print(f"\nTest final Engle-Granger sobre ventana usada: stat={statf:.4f}, p-value={pf:.6f}")
print("Estado cointegración final:", (pf < 0.05))

# ---------------------------
# 11) Cerrar Spark
# ---------------------------
spark.stop()
print("\nSpark detenido. ANÁLISIS COMPLETO FINALIZADO.")


1) Instalando librerías necesarias...
Librerías cargadas

2) Por favor SUBE tu archivo 'Datos.xlsx' (columnas: Fecha, IPSA, COLCAP).


Saving Datos.xlsx to Datos (1).xlsx
Archivo subido: Datos (1).xlsx
Datos históricos: 4162 filas. Desde 2008-05-23 hasta 2025-12-01

3) Iniciando SparkSession...
Spark version: 3.5.1

4) Análisis batch (ADF en niveles y retornos, Engle-Granger)
 ADF IPSA (nivel): stat=-2.0961, p=0.246095, crit={'1%': np.float64(-3.4319282328371266), '5%': np.float64(-2.8622373766412523), '10%': np.float64(-2.567141219855403)}
 ADF COLCAP (nivel): stat=-1.5563, p=0.505484, crit={'1%': np.float64(-3.431923675236044), '5%': np.float64(-2.8622353632982076), '10%': np.float64(-2.5671401480435727)}
 ADF IPSA (retornos): stat=-17.0172, p=0.000000, crit={'1%': np.float64(-3.431927852028984), '5%': np.float64(-2.862237208417471), '10%': np.float64(-2.5671411303007297)}
 ADF COLCAP (retornos): stat=-40.1289, p=0.000000, crit={'1%': np.float64(-3.4319232966238484), '5%': np.float64(-2.8622351960442405), '10%': np.float64(-2.5671400590052276)}

Engle-Granger: stat=-1.9503, p-value=0.554288, crit=[-3.89907461 -3.337