In [1]:
pip install boto3 pandas pyarrow

Note: you may need to restart the kernel to use updated packages.


In [10]:
import io, boto3, pandas as pd

RAW_BUCKET="bucket1-raw-stock"
RAW_KEY = "StockMarket.csv"     # exact key (case-sensitive)
CURATED_BUCKET = "bucket2-curated-stock"
CURATED_PREFIX = "curated/stock_market/"
CSV_KEY = f"{CURATED_PREFIX}stock_clean.csv"

s3 = boto3.client("s3")

def head_size(bucket, key):
    try:
        h = s3.head_object(Bucket=bucket, Key=key)
        return h.get("ContentLength", 0)
    except Exception as e:
        print("head_object error:", e)
        return None

print("RAW object size (bytes):", head_size(RAW_BUCKET, RAW_KEY))

# -------- READ with robust delimiter detection ----------
raw_obj = s3.get_object(Bucket=RAW_BUCKET, Key=RAW_KEY)
raw_bytes = raw_obj["Body"].read()

try:
    df_raw = pd.read_csv(io.BytesIO(raw_bytes), engine="python", sep=None)
except Exception:
    try:
        df_raw = pd.read_csv(io.BytesIO(raw_bytes), sep="\t")
    except Exception:
        df_raw = pd.read_csv(io.BytesIO(raw_bytes))  # default comma

print("Raw shape:", df_raw.shape)
print("Raw columns:", list(df_raw.columns))
print(df_raw.head(3))

# ---------- CLEAN: make parsing tolerant ----------
df = df_raw.copy()
df.columns = [c.strip().lower() for c in df.columns]

# Date: try strict dd-MM-yyyy first, then fall back to general parser
if "date" in df.columns:
    d1 = pd.to_datetime(df["date"], format="%d-%m-%Y", errors="coerce")
    if d1.notna().sum() == 0:
        # try year-first
        d2 = pd.to_datetime(df["date"], format="%Y-%m-%d", errors="coerce")
        if d2.notna().sum() == 0:
            # final fallback: infer
            d3 = pd.to_datetime(df["date"], dayfirst=True, errors="coerce")
            df["date"] = d3
        else:
            df["date"] = d2
    else:
        df["date"] = d1

for c in ["open","high","low","close","volume"]:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

print("After type casting shape:", df.shape)
print(df.head(3))

# Check how many rows would be dropped by each rule
def count_notna(cols):
    return df.dropna(subset=[c for c in cols if c in df.columns]).shape[0]

print("Rows with non-null date/symbol:", count_notna(["date","symbol"]))
print("Rows with non-null OHLCV:", count_notna(["open","high","low","close","volume"]))

# Apply *minimal* required filters first
required = [c for c in ["date","symbol","open","high","low","close"] if c in df.columns]
df = df.dropna(subset=required)

# Only drop rows with missing VOLUME if the column exists and is critical for you
if "volume" in df.columns:
    before_vol = df.shape[0]
    df = df.dropna(subset=["volume"])
    print(f"Dropped {before_vol - df.shape[0]} rows due to null volume")

# Sanity checks – but don’t be overly strict for synthetic data
if set(["low","high","open","close"]).issubset(df.columns):
    mask_ok = (
        (df["low"] <= df["high"]) &
        (df["open"] >= df["low"]) & (df["open"] <= df["high"]) &
        (df["close"] >= df["low"]) & (df["close"] <= df["high"])
    )
    removed = (~mask_ok).sum()
    if removed > 0:
        print(f"Filter will remove {removed} rows failing price sanity checks")
    # If it removes *everything*, relax the rule
    if mask_ok.sum() == 0:
        print("Warning: sanity check would remove all rows – skipping this filter.")
    else:
        df = df[mask_ok]

df = df.drop_duplicates().sort_values(["symbol","date"]).reset_index(drop=True)
print("Final cleaned shape:", df.shape)
print(df.head(5))

# Abort early if empty so you don’t write an empty CSV
if df.empty:
    raise RuntimeError("Cleaned DataFrame is empty – check date format, numeric coercion, and filters above.")

# -------- WRITE CSV ----------
buf = io.StringIO()
df.to_csv(buf, index=False)
s3.put_object(Bucket=CURATED_BUCKET, Key=CSV_KEY, Body=buf.getvalue().encode("utf-8"))

print("Wrote CSV to:", f"s3://{CURATED_BUCKET}/{CSV_KEY}")
print("Curated object size (bytes):", head_size(CURATED_BUCKET, CSV_KEY))

RAW object size (bytes): 51841
Raw shape: (1000, 7)
Raw columns: ['Date', 'Symbol', 'Open', 'High', 'Low', 'Close', 'Volume']
         Date Symbol    Open    High     Low   Close    Volume
0  2024-01-01   AAPL  154.72  161.22  151.89  159.55   3494467
1  2024-01-02   MSFT  157.29  159.06  147.59  150.54   2138775
2  2024-01-03   AAPL  142.91  144.99  138.40  141.21  10960425
After type casting shape: (1000, 7)
        date symbol    open    high     low   close    volume
0 2024-01-01   AAPL  154.72  161.22  151.89  159.55   3494467
1 2024-01-02   MSFT  157.29  159.06  147.59  150.54   2138775
2 2024-01-03   AAPL  142.91  144.99  138.40  141.21  10960425
Rows with non-null date/symbol: 1000
Rows with non-null OHLCV: 1000
Dropped 0 rows due to null volume
Final cleaned shape: (1000, 7)
        date symbol    open    high     low   close    volume
0 2024-01-01   AAPL  154.72  161.22  151.89  159.55   3494467
1 2024-01-03   AAPL  142.91  144.99  138.40  141.21  10960425
2 2024-01-04   AAPL

In [11]:
import io
import uuid
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

s3 = boto3.client("s3")

def write_csv_to_s3(df: pd.DataFrame, bucket: str, key: str) -> None:
    """Write a pandas DataFrame as CSV to S3 (single object)."""
    buf = io.StringIO()
    df.to_csv(buf, index=False)
    s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue().encode("utf-8"))
    print(f"[CSV] s3://{bucket}/{key}")

def write_parquet_to_s3_single(df: pd.DataFrame, bucket: str, key: str, compression: str = "snappy") -> None:
    """Write a pandas DataFrame as a single Parquet object to S3."""
    table = pa.Table.from_pandas(df, preserve_index=False)
    buf = io.BytesIO()
    pq.write_table(table, buf, compression=compression)
    s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/octet-stream")
    print(f"[PARQUET] s3://{bucket}/{key}")

def write_parquet_to_s3_sharded_by_symbol(
    df: pd.DataFrame,
    bucket: str,
    prefix: str,
    compression: str = "snappy"
) -> None:
    """
    Write multiple Parquet objects to S3, one per symbol, under:
    s3://<bucket>/<prefix>/symbol=<SYM>/part-<uuid>.parquet
    """
    if "symbol" not in df.columns:
        raise ValueError("Column 'symbol' is required for sharded write.")

    for sym, g in df.groupby("symbol"):
        g2 = g.reset_index(drop=True)
        table = pa.Table.from_pandas(g2, preserve_index=False)
        buf = io.BytesIO()
        pq.write_table(table, buf, compression=compression)
        key = f"{prefix.rstrip('/')}/symbol={sym}/part-{uuid.uuid4().hex}.parquet"
        s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/octet-stream")
        print(f"[PARQUET SHARD] s3://{bucket}/{key}")

def simple_stock_transforms(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add a couple of useful columns:
      - range = high - low
      - daily_return_pct = (close / open - 1) * 100
    Sort by symbol,date for stable files.
    """
    req = {"open", "high", "low", "close"}
    if not req.issubset(df.columns):
        missing = req - set(df.columns)
        raise ValueError(f"Missing required columns for transforms: {missing}")

    out = df.copy()
    out["range"] = out["high"] - out["low"]
    out["daily_return_pct"] = (out["close"] / out["open"] - 1.0) * 100.0
    # Optional ordering
    cols = ["symbol", "date"]
    out = out.sort_values([c for c in cols if c in out.columns]).reset_index(drop=True)
    return out

In [13]:
# ====== after your cleaning code produced df (non-empty) ======

# 1) Apply simple transformations
df_tx = simple_stock_transforms(df)

# 2) Write CSV (same as you already do)
CSV_KEY = f"{CURATED_PREFIX}stock_clean.csv"
write_csv_to_s3(df_tx, CURATED_BUCKET, CSV_KEY)

# 3A) Write a single Parquet file alongside the CSV
PARQUET_KEY = f"{CURATED_PREFIX}stock_clean.parquet"
write_parquet_to_s3_single(df_tx, CURATED_BUCKET, PARQUET_KEY)

# 3B) Alternatively, write sharded Parquet by symbol (comment out if not needed)
# This creates pseudo-partitions like curated/stock_market/symbol=AAPL/part-xxxx.parquet
# write_parquet_to_s3_sharded_by_symbol(df_tx, CURATED_BUCKET, CURATED_PREFIX)

print("Done writing CSV and Parquet to curated.")

[CSV] s3://bucket2-curated-stock/curated/stock_market/stock_clean.csv
[PARQUET] s3://bucket2-curated-stock/curated/stock_market/stock_clean.parquet
Done writing CSV and Parquet to curated.
