In [1]:
from pathlib import Path

BASE_DIR = Path("/")
DATA_PATH = BASE_DIR / "calories.csv"
print(DATA_PATH)

/calories.csv


In [3]:
import os
print(os.getcwd())

/home/ec2-user/SageMaker


In [6]:
#!/usr/bin/env python3
# S3 only feature store: write raw and standardized features to S3 in Parquet, partitioned by dt

REGION = "us-east-1"
S3_BUCKET = "myfeaturestore-000"
S3_PREFIX = "feature-store"
CSV_PATH = "/home/ec2-user/SageMaker/calories.csv"

FEATURE_SET = "cal_v1"
STD_VERSION = "v1"
LABEL_COL = "Calories"
ENTITY_KEY = "entity_id"
EVENT_TIME_COL = "event_time"

ENABLE_GLUE_ATHENA = False
GLUE_DB = "fs"
RAW_TABLE = "features_raw"
STD_TABLE = "features_std"

import os
import json
import uuid
import time
import boto3
import numpy as np
import pandas as pd
from datetime import datetime, timezone
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs

session = boto3.Session(region_name=REGION)
s3 = session.client("s3")
fs = s3fs.S3FileSystem()

def iso_now():
    return datetime.now(timezone.utc).isoformat()

def dt_today():
    return datetime.now(timezone.utc).strftime("%Y-%m-%d")

def to_parquet(df, bucket, base_key, dt):
    key = f"{base_key}/dt={dt}/part-{uuid.uuid4().hex}.parquet"
    with fs.open(f"{bucket}/{key}", "wb") as f:
        pq.write_table(pa.Table.from_pandas(df, preserve_index=False), f)
    return f"s3://{bucket}/{key}"

def athena_type(s):
    if pd.api.types.is_integer_dtype(s) or pd.api.types.is_bool_dtype(s):
        return "bigint"
    if pd.api.types.is_float_dtype(s):
        return "double"
    return "string"

def create_table_sql(table, location_s3, df_for_schema):
    cols = [f"  `{c}` {athena_type(df_for_schema[c])}" for c in df_for_schema.columns if c != "dt"]
    cols_sql = ",\n".join(cols)
    return f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {table} (
{cols_sql}
)
PARTITIONED BY (dt string)
STORED AS PARQUET
LOCATION '{location_s3}'
TBLPROPERTIES ('parquet.compress'='SNAPPY');
""".strip()

def run_athena(sql, db, athena, results_bucket):
    out = f"s3://{results_bucket}/athena-results/"
    qid = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": db},
        ResultConfiguration={"OutputLocation": out},
    )["QueryExecutionId"]
    while True:
        st = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"]["State"]
        if st in ("SUCCEEDED", "FAILED", "CANCELLED"):
            if st != "SUCCEEDED":
                reason = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"].get("StateChangeReason", "")
                raise RuntimeError(f"Athena query failed: {reason}")
            break
        time.sleep(1)

def main():
    p = Path(CSV_PATH)
    if not p.exists():
        raise FileNotFoundError(f"not found: {CSV_PATH}")

    df0 = pd.read_csv(p)

    if "User_ID" in df0.columns:
        df0[ENTITY_KEY] = df0["User_ID"].astype(int)
        base = df0.drop(columns=["User_ID"]).copy()
    else:
        df0[ENTITY_KEY] = np.arange(1, len(df0) + 1, dtype=int)
        base = df0.copy()

    if LABEL_COL not in base.columns:
        raise ValueError(f"missing label column {LABEL_COL}")
    base[LABEL_COL] = base[LABEL_COL].astype(float)

    if "Gender" in base.columns:
        base = pd.get_dummies(base, columns=["Gender"], drop_first=True)

    train_df, _ = train_test_split(base, test_size=0.2, random_state=42)
    numeric_cols = [c for c in base.columns if c not in [LABEL_COL, ENTITY_KEY]]
    scaler = StandardScaler().fit(train_df[numeric_cols])

    df_std = base.copy()
    df_std[numeric_cols] = scaler.transform(df_std[numeric_cols])

    raw = base[[c for c in base.columns if c != LABEL_COL]].copy()
    std = df_std[[c for c in df_std.columns if c != LABEL_COL]].copy()

    ts = iso_now()
    raw[EVENT_TIME_COL] = ts
    std[EVENT_TIME_COL] = ts
    raw["feature_set"] = FEATURE_SET
    std["feature_set"] = FEATURE_SET
    std["std_version"] = STD_VERSION

    dt = dt_today()
    raw_base = f"{S3_PREFIX}/features_raw"
    std_base = f"{S3_PREFIX}/features_std"

    raw_uri = to_parquet(raw, S3_BUCKET, raw_base, dt)
    std_uri = to_parquet(std, S3_BUCKET, std_base, dt)

    params_key = f"{S3_PREFIX}/params/{FEATURE_SET}/std_{STD_VERSION}.json"
    s3.put_object(
        Bucket=S3_BUCKET,
        Key=params_key,
        Body=json.dumps(
            {
                "feature_set": FEATURE_SET,
                "std_version": STD_VERSION,
                "numeric_cols": numeric_cols,
                "mean": scaler.mean_.tolist(),
                "scale": scaler.scale_.tolist(),
                "saved_at": ts,
            }
        ).encode("utf-8"),
    )

    manifest = {
        "feature_set": FEATURE_SET,
        "dt": dt,
        "raw_s3_path": f"s3://{S3_BUCKET}/{raw_base}/dt={dt}/",
        "std_s3_path": f"s3://{S3_BUCKET}/{std_base}/dt={dt}/",
        "raw_example_file": raw_uri,
        "std_example_file": std_uri,
        "entity_key": ENTITY_KEY,
        "event_time_col": EVENT_TIME_COL,
        "std_version": STD_VERSION,
    }
    s3.put_object(
        Bucket=S3_BUCKET,
        Key=f"{S3_PREFIX}/manifests/{FEATURE_SET}/{dt}.json",
        Body=json.dumps(manifest, indent=2).encode("utf-8"),
    )

    if ENABLE_GLUE_ATHENA:
        glue = session.client("glue")
        athena = session.client("athena")
        try:
            glue.get_database(Name=GLUE_DB)
        except glue.exceptions.EntityNotFoundException:
            glue.create_database(DatabaseInput={"Name": GLUE_DB})
        raw_loc = f"s3://{S3_BUCKET}/{raw_base}/"
        std_loc = f"s3://{S3_BUCKET}/{std_base}/"
        run_athena(f"CREATE DATABASE IF NOT EXISTS {GLUE_DB}", "default", athena, S3_BUCKET)
        run_athena(create_table_sql(f"{GLUE_DB}.{RAW_TABLE}", raw_loc, raw), GLUE_DB, athena, S3_BUCKET)
        run_athena(create_table_sql(f"{GLUE_DB}.{STD_TABLE}", std_loc, std), GLUE_DB, athena, S3_BUCKET)
        run_athena(f"MSCK REPAIR TABLE {RAW_TABLE}", GLUE_DB, athena, S3_BUCKET)
        run_athena(f"MSCK REPAIR TABLE {STD_TABLE}", GLUE_DB, athena, S3_BUCKET)

    print("done")
    print("raw path", f"s3://{S3_BUCKET}/{raw_base}/dt={dt}/")
    print("std path", f"s3://{S3_BUCKET}/{std_base}/dt={dt}/")
    print("params json", f"s3://{S3_BUCKET}/{params_key}")

if __name__ == "__main__":
    main()

done
raw path s3://myfeaturestore-000/feature-store/features_raw/dt=2025-08-18/
std path s3://myfeaturestore-000/feature-store/features_std/dt=2025-08-18/
params json s3://myfeaturestore-000/feature-store/params/cal_v1/std_v1.json
