### Raw Data Ingestion

Already uploaded athletes.csv to /workspace/default in the Catalog through Databricks UI (not able to do programatically).

In [0]:
import matplotlib.pyplot as plt
import numpy as np
from scipy import stats
import warnings
from scipy.stats import gaussian_kde
from databricks.feature_engineering import FeatureEngineeringClient
from sentence_transformers import SentenceTransformer
import pandas as pd
from sklearn.decomposition import PCA

In [0]:
warnings.filterwarnings("ignore")

In [0]:
%sql
SHOW TABLES IN workspace.default;


In [0]:
raw_athletes = spark.table("workspace.default.athletes_raw").toPandas()
raw_athletes.head(5)

### Feature Engineering (1)

In [0]:
# Create our target feature total_lift
raw_athletes = raw_athletes.dropna(subset=['athlete_id'])
raw_athletes['total_lift'] = raw_athletes['snatch'] + raw_athletes['deadlift'] + raw_athletes['backsq'] + raw_athletes['candj']
len(raw_athletes)

In [0]:
# Helper function to help remove outliers: removes extremes first to reduce standard deviation first, then apply normal 68-95-99 rule
def remove_outliers(df, col_name):
    Q1 = df[col_name].quantile(0.25)
    Q3 = df[col_name].quantile(0.75)
    IQR = Q3 - Q1

    # Define bounds
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # Filter out the extreme outliers
    df = df[(df[col_name] >= lower_bound) & (df[col_name] <= upper_bound)]

    # Now, remove the more moderate outliers (3 z-scores away)
    df['z_' + col_name] = np.abs(stats.zscore(df[col_name]))
    df = df[df['z_' + col_name] < 3]
    return df.drop(['z_' + col_name], axis=1)


# Imputation via KDE
def impute_with_kde(series):
    np.random.seed(24)
    # Extract non-null values for creating the kernel
    non_null = series.dropna().astype(float)
    kde = gaussian_kde(non_null)
    # Number of missing values; we'll sample this many times
    n_missing = series.isna().sum()
    sampled = []
    # Just make sure we're always sampling non-negative values
    while len(sampled) < n_missing:
        s = kde.resample(n_missing).flatten()
        s = s[s >= 0]
        sampled.extend(s.tolist())

    # Just in case we oversampled, truncate
    sampled = np.array(sampled[:n_missing])

    # Replace NA values with sampled values
    series.loc[series.isna()] = sampled

    return series

# Creates learning labels on a dataframe using the ratios passed
def create_ml_labels(df, train_ratio=0.7, val_ratio=0.2):
    total_rows = len(df)
    # Shuffle all the rows
    shuffled_indices = np.random.permutation(total_rows)

    # Create split for train set
    train_end = int(train_ratio * total_rows)
    # Create split for val set
    val_end = train_end + int(val_ratio * total_rows)

    # Create empty array for storing the labels
    split_labels = np.empty(total_rows, dtype=object)
    # Populate the array
    split_labels[shuffled_indices[:train_end]] = 'train'
    split_labels[shuffled_indices[train_end:val_end]] = 'val'
    split_labels[shuffled_indices[val_end:]] = 'test'
    return split_labels


In [0]:
# Remove NA values from our features
features = ['gender', 'age', 'height', 'weight', 'total_lift', 'helen', 'grace', 'fran']
df_cleaned = raw_athletes.dropna(subset=features)
len(df_cleaned)

### Create Additional Feature (Pullups/Lb)

In [0]:
df_cleaned['pullups'] = impute_with_kde(df_cleaned['pullups'])
df_cleaned['pullups_per_lb'] = df_cleaned['pullups'] / df_cleaned['weight']

### Additional Cleaning (Removing Outliers)

In [0]:
num_features = ['age', 'height', 'weight', 'total_lift', 'helen', 'grace', 'fran', 'pullups_per_lb']
for feature in num_features:
    df_cleaned = remove_outliers(df_cleaned, feature)
len(df_cleaned)

We end up having a dataset of roughly 10K data points, which is adequate enough for fitting a small/ simple model.

### Feature Correlation/ Visualization

In [0]:
feature1 = 'pullups_per_lb'
feature2 = 'total_lift'
plt.figure(figsize=(6, 6))
plt.scatter(df_cleaned[feature1], df_cleaned[feature2], alpha=0.6)
plt.xlabel(feature1)
plt.ylabel(feature2)
plt.title(f"Scatter Plot (Correlation = {df_cleaned[feature1].corr(df_cleaned[feature2]):.2f})")
plt.grid(True)
plt.tight_layout()
plt.show()

In [0]:
# Append learning labels to dataframe and convert to spark
df_cleaned['data_split'] = create_ml_labels(df_cleaned)
total_features1 = ['athlete_id', 'age', 'height', 'weight', 'gender', 'helen', 'grace', 'fran', 'pullups_per_lb', 'data_split']

# Convert problematic columns to numeric and boolean explicitly
df_cleaned['total_lift'] = pd.to_numeric(df_cleaned['total_lift'])
df_cleaned['gender'] = df_cleaned['gender'].map({'Male': True, 'Female': False})

feature_table1 = spark.createDataFrame(df_cleaned[total_features1])

### Feature Table 1 Creation

In [0]:
feature_table1_name = 'workspace.default.strengthFeatures'

# Create feature table
fe = FeatureEngineeringClient()
fe.create_table(
    name = feature_table1_name,
    primary_keys = ["athlete_id"],
    df = feature_table1,
    description = "Engineered strength features for total_lift prediction. This table does not contain total_lift values."
)

### Feature Engineering (2)

In [0]:
# Load model
embedder = SentenceTransformer('all-MiniLM-L6-v2')

In [0]:
# Cleans a string by converting to lower case and removing leading/trailing spaces
def clean_text(s):
    if pd.isna(s):
        return ""
    return str(s).lower().strip()

# Converts a feature column to embeddings by applying the embedding model
# Each embedded dimension becomes a column, and the model we are using produces a 384-dimensional embedding, resulting in 384 columns
def text_to_embedding(series, model=embedder):
    cleaned_texts = series.apply(clean_text).tolist()
    embeddings = model.encode(cleaned_texts, show_progress_bar=True)
    return pd.DataFrame(embeddings, index=series.index)

In [0]:
behavior_features = ['eat', 'train', 'background', 'experience', 'schedule']
# First drop NA values from our feature set and label
df_cleaned_2 = raw_athletes.dropna(subset=behavior_features + ['total_lift'])
len(df_cleaned_2)

In [0]:
num_features2 = ['total_lift']
for feature in num_features2:
    df_cleaned_2 = remove_outliers(df_cleaned_2, feature)
len(df_cleaned_2)

### Embedding Pipeline

In [0]:
# Create dataframes for each of our behavior features
train_embeddings = text_to_embedding(df_cleaned_2['train'])
background_embeddings = text_to_embedding(df_cleaned_2['background'])
experience_embeddings = text_to_embedding(df_cleaned_2['experience'])
schedule_embeddings = text_to_embedding(df_cleaned_2['schedule'])
eat_embeddings = text_to_embedding(df_cleaned_2['eat'])

# Rename columns to avoid clashes when joining
train_embeddings.columns = [f"train_emb_{i}" for i in range(train_embeddings.shape[1])]
background_embeddings.columns = [f"background_emb_{i}" for i in range(background_embeddings.shape[1])]
experience_embeddings.columns = [f"experience_emb_{i}" for i in range(experience_embeddings.shape[1])]
schedule_embeddings.columns = [f"schedule_emb_{i}" for i in range(schedule_embeddings.shape[1])]
eat_embeddings.columns = [f"eat_emb_{i}" for i in range(eat_embeddings.shape[1])]

# Combine embeddings into final feature set (include athlete_id)
behavior_features_df = pd.concat([
    df_cleaned_2[['athlete_id']],
    train_embeddings,
    background_embeddings,
    experience_embeddings,
    schedule_embeddings,
    eat_embeddings
], axis=1)


In [0]:
# Append learning labels to dataframe and convert to spark
behavior_features_df['data_split'] = create_ml_labels(behavior_features_df)
feature_table2 = spark.createDataFrame(behavior_features_df)

### Feature Table 2 Creation

In [0]:
feature_table2_name = 'workspace.default.behaviorFeaturesEnhanced'

# Create feature table
fe.create_table(
    name = feature_table2_name,
    primary_keys = ["athlete_id"],
    df = feature_table2,
    description = "Engineered behavior features for total_lift prediction. This table does not contain total_lift values. May need to perform additional engineering before ML training."
)

### PCA Analysis

In [0]:
behavior_features = spark.read.table('workspace.default.behaviorFeaturesEnhanced').toPandas()
behavior_features.shape


In [0]:
# Let's see if we can reduce the features but retain variability explained using PCA
embedding_data = behavior_features.drop(columns=['athlete_id', 'data_split'])

# Start with 50 components
pca = PCA(n_components=50, random_state=24)
embedding_pca = pca.fit_transform(embedding_data)

# Convert back to DataFrame
embedding_pca_df = pd.DataFrame(embedding_pca, columns=[f'pca_emb_{i}' for i in range(50)])

embedding_pca_df.shape


In [0]:
explained_variance = pca.explained_variance_ratio_
cumulative_variance = np.cumsum(explained_variance)

threshold = 0.95
components_needed = np.argmax(cumulative_variance >= threshold) + 1

# Plot
plt.figure(figsize=(10, 5))
plt.plot(cumulative_variance, marker='o', label='Cumulative Explained Variance')
plt.axvline(
    x=components_needed - 1,
    color='red',
    linestyle='--',
    linewidth=1,
    label=f'95% Variance at {components_needed} Components'
)

plt.xlabel('Number of Components')
plt.ylabel('Cumulative Explained Variance')
plt.title('Explained Variance vs. Number of Components')
plt.grid(True)
plt.legend()
plt.show()

From the graph, we can see the first 27 components of the PCA retain ~95% of the variance from the embeddings generated. As a result, we can just use this smaller subset of features for our ML model, resulting in faster training and smaller complexity.

In [0]:
# Let's cut to 27 features and upload to Feature Store
# First, extract the PCA columns
# pca_columns = [f'pca_emb_{i}' for i in range(components_needed)]


# # Cut the DataFrame and add back relevant columns (athlete_id and data_split)
# final_pca_df = embedding_pca_df[pca_columns]
# final_pca_df['athlete_id'] = behavior_features['athlete_id']
# final_pca_df['data_split'] = behavior_features['data_split']

# # Upload to Feature Store
# feature_table3 = spark.createDataFrame(final_pca_df)
feature_table3_name = 'workspace.default.behaviorFeaturesModel'

fe.create_table(
    name = feature_table3_name,
    primary_keys = ["athlete_id"],
    df = final_df,
    description = "Engineered behavior features for total_lift prediction. This table does not contain total_lift values, and only contains a subset of features, based on PCA analysis. Refer to behaviorFeaturesEnhanced for full embeddings.",
)

### Feature Table for Labels

In [0]:
# Lastly, let's create a feature table for our labels. We'll use the raw table so it includes all athletes
# Remember to include the athlete_id column (to combine with other features)
label_table = spark.createDataFrame(raw_athletes[['athlete_id', 'total_lift']])

label_table_name = 'workspace.default.total_lift_labels'

# Create feature table
fe.create_table(
    name = label_table_name,
    primary_keys = ["athlete_id"],
    df = label_table,
    description = "This table contains total_lift values. Use athlete_id to join with features found in other tables."
)