In [0]:
%pip install databricks-feature-engineering
dbutils.library.restartPython()

In [0]:
dbutils.widgets.text("train_table_raw", "dp_ml_titanic_train_raw")
dbutils.widgets.text("test_table_raw", "dp_ml_titanic_test_raw")

train_table_raw = dbutils.widgets.get("train_table_raw")
test_table_raw = dbutils.widgets.get("test_table_raw")

# Set catalog and schema (adjust if needed)
spark.sql("USE CATALOG dp_ml_raw")
spark.sql("USE dp_ml_raw.dp_ml_titanic")

# Create the Delta table directly from the CSV files in the Volume
spark.sql(f"""
CREATE table IF NOT EXISTS dp_ml_raw.dp_ml_titanic.{train_table_raw}
USING DELTA
CLUSTER BY (PassengerId)
SELECT *
FROM read_files(
  'dbfs:/Volumes/dp_ml_raw/dp_ml_titanic/titanic_raw_data/train.csv',
  format => 'csv',
  header => true,
  inferSchema => true
)
""")

spark.sql(f"""
CREATE table IF NOT EXISTS dp_ml_raw.dp_ml_titanic.{test_table_raw}
USING DELTA
CLUSTER BY (PassengerId)
SELECT *
FROM read_files(
  'dbfs:/Volumes/dp_ml_raw/dp_ml_titanic/titanic_raw_data/test.csv',
  format => 'csv',
  header => true,
  inferSchema => true
)
          """)

# Preview a few rows
display(spark.table(f"dp_ml_raw.dp_ml_titanic.{train_table_raw}").limit(10))
df_train = spark.sql(f"SELECT * FROM dp_ml_raw.dp_ml_titanic.{train_table_raw}")
df_test = spark.sql(f"SELECT * FROM dp_ml_raw.dp_ml_titanic.{test_table_raw}")


In [0]:
# df_train to pandas: 
df = df_train.toPandas()
# --- Prepare X, y ---
X = df

import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression

# --- Config ---
TARGET = "Survived"   # change if needed
DROP_COLS = ['PassengerId', 'Name', 'Ticket', '_rescued_data', 'Cabin']

# Explicit categories (order defines output column order)
CUSTOM_CATS = {
    "Embarked": ["S", "C", "Q"],
    "Sex": ["male", "female"]
}

# --- Step 1: helper to drop cols and fillna(0) ---
def drop_and_fill(X: pd.DataFrame) -> pd.DataFrame:
    X = X.drop(columns=[c for c in DROP_COLS if c in X.columns])
    return X.fillna(0)

dropper = FunctionTransformer(drop_and_fill)

cat_cols = [c for c in CUSTOM_CATS.keys() if c in X.columns]
num_cols = [c for c in X.columns if c not in cat_cols and c not in DROP_COLS]
categories_in_order = [CUSTOM_CATS[c] for c in cat_cols]

print(cat_cols, num_cols, categories_in_order, TARGET)

# --- OneHotEncoder with fixed categories ---
cat_ohe = OneHotEncoder(
    categories=categories_in_order,
    handle_unknown="ignore",   # unseen → all zeros
    sparse_output=False
)

pre = ColumnTransformer(
    transformers=[
        ("cat", cat_ohe, cat_cols),
        ("num", "passthrough", num_cols),
    ],
    remainder="drop"
)

# --- Full pipeline ---
pipe = Pipeline(steps=[
    ("prep", dropper),
    ("encode", pre)
])

# Make sklearn respect DataFrame column names from transformers
pipe.set_output(transform="pandas")

X_transformed = pipe.fit_transform(X)
print(X_transformed.head(2))
print(X_transformed.isnull().any().any())
print(X_transformed.columns, df.columns)

# --- 1) Log the transformer pipeline to MLflow ---
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature

mlflow.set_experiment("/Shared/titanic-feature-pipeline")  # pick your path

with mlflow.start_run():
    # If not already fitted, fit now (you can comment this out if pipe is fitted)
    # pipe.fit(X, y)
    
    X_sig = X.head(5).copy()
    int_like = X_sig.select_dtypes(include=["int64", "int32", "Int64", "Int32"]).columns
    X_sig[int_like] = X_sig[int_like].astype("float64")

    # Signature: raw input -> transformed output
    sig = infer_signature(model_input=X_sig, model_output=pipe.transform(X_sig))

    run_model_info = mlflow.sklearn.log_model(
        sk_model=pipe,
        artifact_path="feature_transformer",
        signature=sig,
        registered_model_name="titanic_feature_pipeline"  # <- choose any registry name
    )

print("Logged to MLflow. Model URI:", run_model_info.model_uri)


In [0]:
from databricks.feature_engineering import FeatureEngineeringClient
CATALOG = "dp_ml_raw"             # your UC catalog
SCHEMA  = "features"         # your UC schema
TABLE   = "titanic_features" # feature table name
FULL    = f"{CATALOG}.{SCHEMA}.{TABLE}"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

X_features = X_transformed.reset_index().rename(columns={"index": "rowId"})
spark_df = spark.createDataFrame(X_features)
fe = FeatureEngineeringClient()

# X_transformed should include the primary key (e.g., passenger_id)
try:
    fe.drop_table(name=FULL)   # removes FE metadata and underlying Delta table
    print(f"[info] Dropped feature table metadata: {FULL}")
except Exception as e:
    print(f"[info] fe.drop_table skipped: {e}")
spark.sql(f"DROP TABLE IF EXISTS {FULL}")

fe = FeatureEngineeringClient()

# One-time create (Delta table + feature-table metadata in UC)
# Requirements: feature tables must have a primary key
ft = fe.create_table(
    name=FULL,
    primary_keys=["rowId"],
    df=spark_df,                       # writes data & schema
    description="Titanic engineered features with fixed OHE"
)
