## Demo - Build a Feature Engineering Pipeline with Embeddings

In this demo, we will build a feature engineering pipeline that performs data loading, imputation, transformation, and embedding generation for categorical features. The pipeline will be applied to training and testing datasets, ensuring consistency in data preprocessing. Finally, we will save the pipeline for future reuse, allowing efficient and reproducible data preparation for machine learning.

### Learning Objectives:

**By the end of this demo, you will be able to:**

- Build a structured feature engineering pipeline that includes multiple preprocessing steps.
- Create a pipeline with tasks for data imputation and numerical feature scaling.
- Generate embeddings for categorical features to represent categorical data effectively.
- Assemble transformed numerical and embedded categorical features into a single feature vector.
- Apply the feature engineering pipeline to both training and test datasets.
- Display the results of the transformation.
- Save a data preparation and feature engineering pipeline to Unity Catalog for potential future use.


## Requirements

Please review the following requirements before starting the lesson:

- To run this notebook, you need to use one of the following Databricks runtime(s): **16.0.x-cpu-ml-scala2.12**

---

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default. Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.
2. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

   - In the drop-down, select **More**.
   - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select _Open in new tab_.
2. Find the triangle icon to the right of your compute cluster name and click it.
3. Wait a few minutes for the cluster to start.
4. Once the cluster is running, complete the steps above to select your cluster.

---

## Classroom Setup

Before starting the demo, run the provided classroom setup script. This script will define configuration variables necessary for the demo. Execute the following cell:


In [0]:
%run ../Includes/Classroom-Setup-2.2


### Other Conventions:

Throughout this demo, we’ll refer to the object `DA`. This object, provided by Databricks Academy, contains **variables such as your username, catalog name, schema name, working directory, and dataset locations**. Run the code block below to view these details:


In [0]:
print(f"Username: {DA.username}")
print(f"Catalog Name: {DA.catalog_name}")
print(f"Schema Name: {DA.schema_name}")
print(f"Working Directory: {DA.paths.working_dir}")
print(f"Dataset Location: {DA.paths.datasets}")


## Data Preparation

Before constructing the feature engineering pipeline, we need to ensure the dataset is consistent and properly formatted. This includes handling data types, addressing missing values, and preparing the dataset for further transformations. The _Telco Customer Churn_ dataset will be used for this process.

### Steps in Data Preparation:

1. Load the dataset into a Spark DataFrame.
2. Split the dataset into training and testing sets.
3. Convert Integer and Boolean columns to Double to ensure compatibility with Spark ML.
4. Handle missing values by identifying and imputing them in:
   - Numeric columns
   - String columns


## Loading the Dataset

We start by loading the dataset from the specified file path using Spark.

> This step ensures that only relevant columns are included for feature engineering and model training.


In [0]:
from pyspark.sql.functions import when, col

# Load dataset with spark
shared_volume_name = 'telco'  # From Marketplace
csv_name = 'telco-customer-churn-missing'  # CSV file name

dataset_path = f"{DA.paths.datasets.telco}/{shared_volume_name}/{csv_name}.csv"  # Full path

telco_df = spark.read.csv(dataset_path, header="true", inferSchema="true", multiLine="true", escape='"')

# Select columns of interest
telco_df = telco_df.select(
    "gender", "SeniorCitizen", "Partner", "tenure", "InternetService",
    "Contract", "PaperlessBilling", "PaymentMethod", "TotalCharges", "Churn"
)


In [0]:
# se toma captura de pantalla de los datos y se crea un df
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType

# Crear sesión de Spark
spark = SparkSession.builder.appName("ChurnData").getOrCreate()

# Crear los datos manualmente (según la imagen)
data = [
    (None, False, "Yes", 1, "DSL", "Month-to-month", "Yes", "Electronic check", 29.85, "No"),
    ("Male", False, "No", 34, "DSL", "One year", "No", "Mailed check", 1889.5, "Yes"),
    ("Male", False, "Yes", 2, "DSL", "Month-to-month", "Yes", "Mailed check", 108.15, "No"),
    ("Male", False, "No", 45, "DSL", "One year", "No", "Bank transfer (automatic)", 1840.75, "No"),
    ("Female", False, "No", None, "Fiber optic", "Month-to-month", "Yes", "Electronic check", 151.65, "Yes"),
    ("Female", False, "No", 8, "Fiber optic", "Month-to-month", "Yes", "Electronic check", 820.5, "Yes"),
    ("Male", False, "No", None, "Fiber optic", "Month-to-month", "Yes", "Credit card (automatic)", 1949.4, "Yes"),
    ("Female", False, "No", 10, "DSL", "Month-to-month", "No", "Mailed check", 301.9, "No"),
    (None, False, "Yes", 28, "Fiber optic", "Month-to-month", "Yes", "Electronic check", 3046.05, "Yes"),
    ("Male", False, "No", 62, "DSL", "One year", "No", "Bank transfer (automatic)", 3487.95, "No"),
    ("Male", False, "Yes", 13, None, "Month-to-month", "Yes", "Mailed check", 587.45, "No"),
    ("Male", False, "No", 16, "No", "Two year", "No", "Credit card (automatic)", 326.8, "No"),
    (None, False, "Yes", 58, "Fiber optic", "One year", "No", "Credit card (automatic)", 5681.1, "No"),
    ("Male", False, None, 49, "Fiber optic", "Month-to-month", "Yes", None, None, "Yes"),
]

# Definir el esquema
schema = StructType([
    StructField("gender", StringType(), True),
    StructField("SeniorCitizen", BooleanType(), True),
    StructField("Partner", StringType(), True),
    StructField("tenure", IntegerType(), True),
    StructField("InternetService", StringType(), True),
    StructField("Contract", StringType(), True),
    StructField("PaperlessBilling", StringType(), True),
    StructField("PaymentMethod", StringType(), True),
    StructField("TotalCharges", FloatType(), True),
    StructField("Churn", StringType(), True),
])

# Crear DataFrame
df = spark.createDataFrame(data, schema=schema)

# Mostrar el DataFrame
df.show(truncate=False)


## Preprocessing the Dataset

Some columns require cleaning and type conversion to maintain consistency and compatibility with machine learning models.

### Handling Null Values

- In some datasets, missing values might be stored as the string `"null"`, which needs to be properly converted to `NULL` values.


In [0]:
# Replace string "null" with actual NULL values
for column in telco_df.columns:
    telco_df = telco_df.withColumn(column, when(col(column) == "null", None).otherwise(col(column)))


### Converting Data Types

- The `SeniorCitizen` column is a binary categorical variable (0 or 1) and should be converted to **Boolean**.
- The `TotalCharges` column should be cast to a **double** type since it represents a numerical feature.

> These conversions help in maintaining proper data representation and ensure compatibility with Spark ML.


In [0]:
# clean-up columns
telco_df = telco_df.withColumn("SeniorCitizen", when(col("SeniorCitizen") == 1, True).otherwise(False))
telco_df = telco_df.withColumn("TotalCharges", col("TotalCharges").cast("double"))

display(telco_df)


## Splitting the Dataset into Training and Testing Sets

Once the data has been cleaned, we split it into training and testing sets using an 80-20 split.

- Since `telco_df` is a PySpark DataFrame, we will use `randomSplit()`.


In [0]:
train_df, test_df = telco_df.randomSplit([0.8, 0.2], seed=42)


## Transforming the Dataset

To ensure that all numerical and categorical features are compatible with machine learning algorithms, we perform several transformations.

### Convert Integer and Boolean Columns to Double

- Many machine learning algorithms require numeric input, so we convert all **integer** and **boolean** columns to **double**.

> This ensures numerical consistency in the dataset.


In [0]:
from pyspark.sql.types import IntegerType, BooleanType, StringType, DoubleType
from pyspark.sql.functions import col, count, when

# Get a list of integer & boolean columns
integer_cols = [column.name for column in train_df.schema.fields 
                if column.dataType == IntegerType() or column.dataType == BooleanType()]

# Loop through integer columns to cast each one to double
for column in integer_cols:
    train_df = train_df.withColumn(column, col(column).cast("double"))
    test_df = test_df.withColumn(column, col(column).cast("double"))


## Identifying Missing Values

Handling missing data is crucial to prevent errors and bias in machine learning models. We first check for missing values in numerical and categorical columns.

- **Find Numeric Columns with Missing Values**


In [0]:
from pyspark.sql.functions import count, when

# Identify numeric columns
num_cols = [c.name for c in train_df.schema.fields if c.dataType == DoubleType()]

# Count missing values in numeric columns
num_missing_values_logic = [count(when(col(column).isNull(), column)).alias(column) for column in num_cols]
row_dict_num = train_df.select(num_missing_values_logic).first().asDict()
num_missing_cols = [column for column in row_dict_num if row_dict_num[column] > 0]

print(f"Numeric columns with missing values: {num_missing_cols}")


Find string columns with missing values

In [0]:
# Identify string columns
string_cols = [c.name for c in train_df.schema.fields if c.dataType == StringType()]

# Count missing values in string columns
string_missing_values_logic = [count(when(col(column).isNull(), column)).alias(column) for column in string_cols]
row_dict_string = train_df.select(string_missing_values_logic).first().asDict()
string_missing_cols = [column for column in row_dict_string if row_dict_string[column] > 0]

print(f"String columns with missing values: {string_missing_cols}")


## Creating a Feature Engineering Pipeline

To efficiently preprocess and transform data for machine learning, we construct a **Spark ML pipeline**. This pipeline automates key preprocessing steps, ensuring **consistency** and **reproducibility** in data preparation. The pipeline processes the _Telco Customer Churn_ dataset by performing:

### Key Feature Engineering Steps:

#### Generating Embeddings for Categorical Features
- Instead of traditional categorical encoding techniques, we generate **dense vector representations** (non-zero embeddings) using SparkML’s **Word2Vec**.
- These embeddings capture **semantic relationships** between categories, improving model performance.

> We are interested in the differences between a **senior citizen with fiber optic internet** and a **senior citizen with DSL**, for example. Therefore, we embed those datasets separately. Otherwise, **fiber optic** and **senior citizen status** would have their own embeddings.

#### Handling Missing Values
- Missing values in **numerical columns** (e.g., `tenure`, `TotalCharges`) are imputed using the **mean strategy** to ensure completeness.
- Missing **categorical values** are automatically encoded as a separate category.

#### Standardizing Numerical Features
- SparkML’s `VectorAssembler` **combines numerical columns** into a single feature vector.
- SparkML’s `StandardScaler` **standardizes numerical values**, reducing sensitivity to outliers.

#### Combining Features into a Final Vector
- The **scaled numerical features** and **embedded categorical representations** are **combined** into a single feature vector.
- This ensures a **structured and uniform format** for machine learning models.

#### Encapsulating Steps into a Pipeline
- All preprocessing steps are included in a **Spark ML pipeline**, making the transformations **modular and reusable**.


## Generating Embeddings for Categorical Features

To improve model performance, we create **embedding vectors** for categorical columns using **Word2Vec**. These embeddings capture complex relationships between different categories.


In [0]:
from pyspark.ml.feature import StringIndexer, Word2Vec, VectorAssembler
from pyspark.sql.functions import split, concat_ws, col
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
import pyspark.sql.functions as F

def generate_categorical_embeddings(df, categorical_cols, vector_size=5):
    """
    Generate embeddings for categorical columns using Word2Vec.

    Parameters:
    - df (DataFrame): Input Spark DataFrame
    - categorical_cols (list): List of categorical column names
    - vector_size (int): Size of the embedding vectors

    Returns:
    - DataFrame with embeddings as a single vector column
    """
    # Replace NULL categorical values with "unknown"
    for col_name in categorical_cols:
        df = df.withColumn(col_name, F.when(F.col(col_name).isNull(), "unknown").otherwise(F.col(col_name)))

    # Combine all categorical columns into a single text column for Word2Vec
    df = df.withColumn("categorical_sequence", concat_ws(" ", *categorical_cols))

    # Tokenize categorical data
    df = df.withColumn("categorical_tokens", split(col("categorical_sequence"), " "))

    # Train Word2Vec model
    word2vec = Word2Vec(vectorSize=vector_size, minCount=0, inputCol="categorical_tokens", outputCol="embedding_struct")
    model = word2vec.fit(df)
    df = model.transform(df)

    return df


## Applying Embeddings to Categorical Features

We apply the embedding function to the categorical columns:


In [0]:
# Define categorical columns
categorical_columns = ["gender", "Partner", "InternetService", "Contract", "PaperlessBilling", "PaymentMethod", "Churn"]

# Generate embeddings for categorical columns
train_df = generate_categorical_embeddings(train_df, categorical_columns)
test_df = generate_categorical_embeddings(test_df, categorical_columns)

display(train_df)


## Converting Embeddings into Dense Vectors

For compatibility with Spark ML models, we convert embedding lists into **DenseVector** format.


In [0]:
# Extract the 'values' field from the Word2Vec struct
DenseVector_udf = F.udf(lambda v: DenseVector(v.values) if v else DenseVector([0.0] * 5), VectorUDT())

# Convert embeddings into DenseVectors
for col_name in categorical_columns:
    train_df = train_df.withColumn(col_name + "_embedding", DenseVector_udf(F.col("embedding_struct")))
    test_df = test_df.withColumn(col_name + "_embedding", DenseVector_udf(F.col("embedding_struct")))

# Drop unnecessary columns after embeddings are extracted
train_df = train_df.drop("categorical_sequence", "categorical_tokens", "embedding_struct")
test_df = test_df.drop("categorical_sequence", "categorical_tokens", "embedding_struct")

display(train_df)


## Preview the Embedded Features

We can inspect a sample of categorical features transformed into embedding vectors.


In [0]:
# Show a sample of embedded categorical features
train_df.select("PaymentMethod", "PaymentMethod_embedding").show(truncate=False)


## Feature Engineering and Pipeline Initialization

Now that categorical columns have been transformed into embeddings, we finalize the feature engineering steps.

- **Handle Missing Values in Numerical Columns**
  - Impute missing numerical values with the mean of each column.

- **Standardize Numerical Features**
  - Use `StandardScaler` to normalize numerical features, reducing sensitivity to outliers.

- **Assemble the Final Feature Vector**
  - Combine numerical and categorical embeddings into a single feature vector.

- **Initializing the Spark ML Pipeline**
  - Encapsulate all transformations into a Spark ML Pipeline for structured data processing.


In [0]:
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Define numerical columns for imputation
numerical_cols = ["SeniorCitizen", "tenure", "TotalCharges"]

# Impute missing numerical features
imputer = Imputer(
    inputCols=numerical_cols,
    outputCols=[col + "_imputed" for col in numerical_cols]
)

# Assemble numerical columns into a single vector
numerical_assembler = VectorAssembler(
    inputCols=[col + "_imputed" for col in numerical_cols],
    outputCol="numerical_assembled"
)

# Scale numerical features to standardize values
numerical_scaler = StandardScaler(
    inputCol="numerical_assembled",
    outputCol="numerical_scaled"
)

# Assemble all features (numerical + categorical embeddings) into a single feature vector
feature_cols = ["numerical_scaled"] + [col + "_embedding" for col in categorical_columns]
vector_assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="all_features"
)

# Define the sequence of transformations
stages_list = [imputer, numerical_assembler, numerical_scaler, vector_assembler]

# Instantiate the pipeline
pipeline = Pipeline(stages=stages_list)


## Fit the Pipeline

In the context of machine learning and MLflow, `fitting` corresponds to the process of training a machine learning model on a specified dataset.

In the previous step we created a pipeline. Now, we will fit a model based on the pipeline. This pipeline will impute missing values, scale numerical columns, generate embeddings for categorical variables, and create a feature vector for modeling.


### What Happens During Fitting?

When we call `.fit(train_df)`, the pipeline applies the following transformations:

- **Imputation of Missing Values**
  - The `Imputer` calculates the **mean** for numerical columns in `train_df` and replaces missing values accordingly.

- **Scaling of Numerical Features**
  - The `StandardScaler` computes **scaling factors** based on the distribution of numerical features.
  - These factors are applied uniformly across datasets to **normalize feature values**.

- **Generating Embeddings for Categorical Variables**
  - The `Word2Vec` model converts categorical text data into **dense vector representations**.
  - These embeddings **capture semantic relationships** between categories.
  - The trained embedding model is **stored** and later applied to unseen data.

- **Combining Features into a Single Vector**
  - The `VectorAssembler` consolidates:

    - ▪ **Scaled numerical features** – standardized values from `StandardScaler`.
    - ▪ **Categorical embeddings** – dense embeddings generated by `Word2Vec`.

This results in a **final feature vector**, ready for input into machine learning models.

In [0]:
# Fit the Pipeline
pipeline_model = pipeline.fit(train_df)


## Apply the Feature Engineering Pipeline

Once the pipeline is **fitted** to the training data, it can be **applied to any dataset** using `.transform()`.

We apply the pipeline to both:

- **Train Dataset (`train_df`)** → Generates **transformed training features**.
- **Test Dataset (`test_df`)** → Ensures that the same transformations are applied consistently.

The output is a **transformed dataset** with the **final feature vector** ready for modeling.


In [0]:
# Transform both training_df and test_df
train_transformed_df = pipeline_model.transform(train_df)
test_transformed_df = pipeline_model.transform(test_df)


In [0]:
# Show transformed features
train_transformed_df.select("all_features").show(3, truncate=False)


## Save and Reuse the Pipeline

Preserving the Telco Customer Churn Prediction pipeline, encompassing the model, parameters, and metadata, is vital for maintaining reproducibility, enabling version control, and facilitating collaboration among team members. This ensures a detailed record of the machine learning workflow. In this section, we will follow these steps:

1. **Save the Pipeline**: Save the pipeline model, including all relevant components, to the designated artifact storage. The saved pipeline is organized within the `spark_pipelines` folder for clarity.
2. **Explore Loaded Pipeline Stages**: Upon loading the pipeline, inspect the stages to reveal key transformations and understand the sequence of operations applied during the pipeline's execution.


In [0]:
# Save the pipeline model with overwrite mode
pipeline_model.write().overwrite().save(f"{DA.paths.working_dir}/spark_pipelines")
print(f"Saved model to: {DA.paths.working_dir}/spark_pipelines")


## Load and Use Saved Model


In [0]:
# Load and use the saved model
from pyspark.ml import PipelineModel

loaded_pipeline = PipelineModel.load(f"{DA.paths.working_dir}/spark_pipelines")

# Show pipeline stages
loaded_pipeline.stages


In [0]:
# Use the loaded pipeline to transform the test dataset
test_transformed_df = loaded_pipeline.transform(test_df)
display(test_transformed_df)


## Conclusion

In this demo, we built a feature engineering pipeline to streamline data preparation. The pipeline handled data loading, missing value imputation, numerical feature scaling, and generated embeddings for categorical variables using `Word2Vec`.

By applying the pipeline to both training and test sets, we ensured a consistent and reproducible feature transformation process. Finally, saving the pipeline allows for future reuse, enabling efficient and standardized data preprocessing for machine learning tasks.
