In [None]:
import os
import requests
import zipfile
import pandas as pd
import time
import json
from urllib.parse import urlparse, unquote
from confluent_kafka import Producer
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
from datetime import datetime

# Configuration for MinIO
MINIO_ENDPOINT = "m3g2.ldn.idrivee2-66.com"
MINIO_ACCESS_KEY = "qy1bbnyZNrTbkzd63k7d"
MINIO_SECRET_KEY = "D7yiFWqeYUYGykqrEvtVJa6il4bWKVtfwnN0Wop3"
MINIO_BUCKET = "battery-data"

# Kafka Producer config (Redpanda)
producer = Producer({'bootstrap.servers': 'localhost:9092'})

# Set up MinIO client
s3 = boto3.client(
    's3',
    endpoint_url=f"https://{MINIO_ENDPOINT}",
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY
)

# Ensure bucket exists or create it
try:
    s3.head_bucket(Bucket=MINIO_BUCKET)
except ClientError as e:
    if e.response['Error']['Code'] == '404':
        s3.create_bucket(Bucket=MINIO_BUCKET)
        print(f"Created bucket: {MINIO_BUCKET}")
    else:
        print("Bucket access failed:", e)
        raise

# Dataset URLs
dataset_urls = [
    "https://web.calce.umd.edu/batteries/data/SP1_Initial%20capacity_10_16_2015.zip",
    "https://web.calce.umd.edu/batteries/data/SP2_25C_FUDS.zip"
]

def acked(err, msg):
    if err is not None:
        print("Delivery failed:", err)
    else:
        print("Delivered:", msg.value().decode())

# Stream and upload pipeline
for zip_url in dataset_urls:
    parsed_url = urlparse(zip_url)
    zip_filename = os.path.basename(unquote(parsed_url.path))
    zip_path = os.path.join("battery_data", zip_filename)
    extract_dir = os.path.join("battery_data", os.path.splitext(zip_filename)[0])

    os.makedirs("battery_data", exist_ok=True)

    print(f"\n Downloading: {zip_filename}")
    if not os.path.exists(zip_path):
        r = requests.get(zip_url, verify=False)
        with open(zip_path, "wb") as f:
            f.write(r.content)
        print(f"️Downloaded to: {zip_path}")
    else:
        print("ZIP already exists. Skipping download.")

    print(f"Extracting to: {extract_dir}")
    if not os.path.exists(extract_dir):
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(extract_dir)
    else:
        print("Directory already exists. Skipping extraction.")

    files = os.listdir(extract_dir)
    target_file = next((os.path.join(extract_dir, f) for f in files if f.lower().endswith((".xlsx", ".xls"))), None)
    if not target_file:
        raise FileNotFoundError(f"No Excel file found in {extract_dir}.")

    print(f"Reading Excel: {target_file}")
    df = pd.read_excel(target_file, engine="openpyxl")
    df = df.dropna(how='all').reset_index(drop=True)
    df["id"] = df.index
    df["source_file"] = zip_filename
    df["streamed_at"] = pd.Timestamp.now()

    # Save as CSV
    temp_csv_path = os.path.join("battery_data", f"{os.path.splitext(zip_filename)[0]}.csv")
    df.to_csv(temp_csv_path, index=False)

    # Upload to MinIO
    try:
        s3.upload_file(temp_csv_path, MINIO_BUCKET, os.path.basename(temp_csv_path))
        print(f"Uploaded to MinIO: {os.path.basename(temp_csv_path)}")
    except NoCredentialsError:
        print("Failed to upload to MinIO: credentials error")
    except ClientError as e:
        print("MinIO upload error:", e)

    # Kafka Streaming
    print("Streaming rows to Redpanda...")
    for _, row in df.iterrows():
        record = {
            k: str(v) if isinstance(v, (pd.Timestamp, datetime)) else v
            for k, v in row.to_dict().items()
        }
        try:
            producer.produce(
                topic="battery_topic",
                key=str(record["id"]),
                value=json.dumps(record),
                callback=acked
            )
            producer.poll(0)
        except Exception as e:
            print(f"Kafka produce failed: {e}")
        time.sleep(0.005)

producer.flush()
print("All data streamed to Redpanda and uploaded to MinIO.")


In [None]:
# Re-import libraries and re-run logic after code execution environment reset
import duckdb
import pandas as pd
import boto3
from io import BytesIO

# Configuration for MinIO
MINIO_ENDPOINT = "https://m3g2.ldn.idrivee2-66.com"
MINIO_ACCESS_KEY = "qy1bbnyZNrTbkzd63k7d"
MINIO_SECRET_KEY = "D7yiFWqeYUYGykqrEvtVJa6il4bWKVtfwnN0Wop3"
MINIO_BUCKET = "battery-data"

# Set up MinIO client
s3 = boto3.client(
    's3',
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY
)

# List CSV files from MinIO
csv_files = [obj["Key"] for obj in s3.list_objects_v2(Bucket=MINIO_BUCKET)["Contents"] if obj["Key"].endswith(".csv")]

# Load and clean all CSVs
all_dfs = []
for key in csv_files:
    response = s3.get_object(Bucket=MINIO_BUCKET, Key=key)
    df = pd.read_csv(response["Body"])
    df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]
    df = df.dropna(how="all")
    for col in df.columns:
        try:
            df[col] = pd.to_numeric(df[col])
        except Exception:
            continue
    all_dfs.append(df)

# Concatenate all DataFrames
df_cleaned = pd.concat(all_dfs, ignore_index=True)

# Connect to MotherDuck and store the cleaned table
conn = duckdb.connect("md:battery_db?motherduck_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6InJ5b2ppLnRha2FoYXNoaUBnbWFpbC5jb20iLCJzZXNzaW9uIjoicnlvamkudGFrYWhhc2hpLmdtYWlsLmNvbSIsInBhdCI6Im5FM3NZLXdENEhWcWR0aUotcmZUazlyZVlkT3VEY21GWWJXaC1QbGVPNWsiLCJ1c2VySWQiOiIwNGZiODAyZS01MjJhLTQ1MDMtOTYyMC1mYmNiNzJjNmJiYjkiLCJpc3MiOiJtZF9wYXQiLCJyZWFkT25seSI6ZmFsc2UsInRva2VuVHlwZSI6InJlYWRfd3JpdGUiLCJpYXQiOjE3NDczMTk4NzV9.dDFrp-nsxROrdV1_QnBNlrAx3E8de0ZUOMfaGLQOiZ4")
conn.execute("DROP TABLE IF EXISTS battery_cleaned")
conn.register("battery_cleaned_view", df_cleaned)
conn.execute("CREATE TABLE battery_cleaned AS SELECT * FROM battery_cleaned_view")

# Preview cleaned data
df_preview = conn.execute("SELECT * FROM battery_cleaned LIMIT 50").fetchdf()

import ace_tools_open as tools; tools.display_dataframe_to_user(name="Preview Cleaned Battery Data from MinIO", dataframe=df_preview)


In [None]:
phys_cols = df_preview.columns
print(phys_cols)

df_cleaned = df_preview.dropna(subset=phys_cols)
print(df_cleaned.isna().sum())

# Drop constant columns (e.g. all 0s or same value)
n_unique = df_cleaned.nunique()
constant_cols = n_unique[n_unique <= 1].index.tolist()

print("Dropping constant columns:", constant_cols)
df_cleaned = df_cleaned.drop(columns=constant_cols)

In [None]:
print(df_cleaned.info())
print(df_cleaned.describe())

In [None]:
print("Cleaned shape:", df_cleaned.shape)
print("Remaining columns:", df_cleaned.columns.tolist())

In [None]:
df = df_cleaned.copy()
df

In [None]:
import duckdb

# Connect to MotherDuck
conn = duckdb.connect(f"md:battery_db?motherduck_token={MOTHERDUCK_TOKEN}")

# Drop existing object regardless of type
try:
    conn.execute("DROP VIEW IF EXISTS battery_ts")
except Exception:
    pass

try:
    conn.execute("DROP TABLE IF EXISTS battery_ts")
except Exception:
    pass

# Recreate as a physical table from the cleaned version
conn.execute("CREATE TABLE battery_ts AS SELECT * FROM battery_ts_cleaned")

print("✅ Final warehouse table 'battery_ts' created successfully.")
