In [None]:
import pandas as pd
import pyodbc
from datetime import datetime
from getpass import getpass
import numpy as np
import warnings
warnings.filterwarnings("ignore", category=UserWarning, message=".*pandas only supports SQLAlchemy connectable.*")
# --- Step 1: Read connection string ---
try:
    conn_str = (
    "DRIVER={ODBC Driver 17 for SQL Server};"
    "SERVER=ge-prd.database.windows.net;"
    "DATABASE=GreenEnergy_DBP;"
    "UID=Nalinpgdde@chndsrnvsgmail.onmicrosoft.com;"
    "PWD=Neilapple7#;"
    "Authentication=ActiveDirectoryPassword;"
    "Encrypt=yes;"
    "TrustServerCertificate=no;"
)
except FileNotFoundError:
    raise FileNotFoundError("Connection string file 'sql_cred.txt' not found!")

# --- Step 2: Connect to SQL Server ---
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
cursor.fast_executemany = True

# --- Step 3: Load staging data ---
query = "SELECT * FROM GreenEnergy_DBP.dbo.energy_data_STG WHERE ETL_LOAD_FLAG = 0"
df = pd.read_sql(query, conn)

# --- Step 4: Data enrichment ---
df['day'] = pd.to_datetime(df['datetime']).dt.date
df['hour'] = pd.to_datetime(df['datetime']).dt.hour

# --- Step 5: Optional local backup ---
df.to_csv('nightly_energy_dump.csv', index=False)
print("ETL ingestion completed.")

# --- Step 6: Aggregation logic ---
agg = df.groupby(['day', 'hour']).agg({
    'consumption_kwh': ['mean', 'max'],
    'solar_generation_kwh': 'sum',
    'temperature_C': 'mean',
    'humidity_percent': 'mean',
    'wind_speed_kmph': 'mean'
}).reset_index()

# Rename columns
agg.columns = [
    'day', 'hour', 'avg_consumption_kwh', 'peak_consumption_kwh',
    'solar_output_kwh', 'avg_temperature', 'avg_humidity', 'avg_wind_speed'
]

# --- Step 7: Prepare insert data ---
bulk_data = []
for _, row in agg.iterrows():
    dt = datetime.combine(row.day, pd.Timestamp(row.hour, unit='h').time())
    record = (
        dt, row.hour, row.day,
        float(row.avg_consumption_kwh), float(row.peak_consumption_kwh),
        float(row.avg_temperature), float(row.avg_humidity),
        float(row.avg_wind_speed), float(row.solar_output_kwh)
    )
    bulk_data.append(record)

# --- Step 8: Sanity check for nulls ---
for i, record in enumerate(bulk_data):
    if any(val is None or pd.isnull(val) for val in record):
        print(f"⚠️ Null detected in row {i}: {record}")
        raise ValueError("Null value detected in insert data.")

# --- Step 9: Insert in chunks ---
def chunks(data, size):
    for i in range(0, len(data), size):
        yield data[i:i + size]

chunk_size = 5000
total_rows = len(bulk_data)
print(f"\n🚀 Starting insert of {total_rows} rows in chunks of {chunk_size}...")

try:
    for i, chunk in enumerate(chunks(bulk_data, chunk_size)):
        cursor.executemany("""
            INSERT INTO GreenEnergy_DBP.dbo.energy_features
            (datetime, hour, day, avg_consumption_kwh, peak_consumption_kwh,
             avg_temperature, avg_humidity, avg_wind_speed, solar_output_kwh)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, chunk)
        conn.commit()
        print(f"✅ Chunk {i+1} inserted ({len(chunk)} rows).")
except Exception as e:
    print(f"❌ Insert failed: {e}")
    conn.rollback()
    raise

# --- Step 10: Update ETL flag in staging table ---
try:
    update_query = """
    UPDATE GreenEnergy_DBP.dbo.energy_data_STG
    SET ETL_LOAD_FLAG = 1
    WHERE ETL_LOAD_FLAG = 0
    """
    cursor.execute(update_query)
    conn.commit()
    print("✅ ETL_LOAD_FLAG updated successfully.")
except Exception as e:
    print(f"❌ Failed to update ETL flag: {e}")
    conn.rollback()
    raise

# --- Step 11: Cleanup ---
conn.close()
print("🎉 All tasks completed successfully!")