In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, min, max
from pyspark.sql.types import *
import pandas as pd

from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.utils.data as td
import torch.nn.functional as F

import mlflow
from mlflow.models.signature import infer_signature

from hyperopt import STATUS_OK, fmin, tpe, hp, Trials

spark = SparkSession.builder.getOrCreate()

In [0]:
def one_hot_encode(df, column_name):
    distinct_values = [row[column_name] for row in df.select(column_name).distinct().collect()]
    
    for value in distinct_values:
        df = df.withColumn(f"{column_name}_{value}", when(col(column_name) == value, 1).otherwise(0))
    
    df = df.drop(column_name)
    return df


def normalize_column(df, column_name):
    min_value = df.agg(min(col(column_name))).first()[0]
    max_value = df.agg(max(col(column_name))).first()[0]
    return df.withColumn(column_name, (col(column_name) - min_value) / (max_value - min_value))


# Define the neural network architecture
class ChurnNet(nn.Module):
    def __init__(self, input_dim):
        super(ChurnNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 15)
        self.fc2 = nn.Linear(15, 8)
        self.fc3 = nn.Linear(8, 2)
   
    def forward(self, x):
        fc1_output = torch.relu(self.fc1(x))
        fc2_output = torch.relu(self.fc2(fc1_output))
        y = F.log_softmax(self.fc3(fc2_output).float(), dim=1)
        return y  
    

def train(model, data_loader, optimizer, loss_criteria):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.train()
    train_loss = 0
    for batch, tensor in enumerate(data_loader):
        data, target = tensor
        optimizer.zero_grad()
        out = model(data)
        loss = loss_criteria(out, target)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
    avg_loss = train_loss / (batch+1)
    # print('Training set: Average loss: {:.6f}'.format(avg_loss))
    return avg_loss


def test(model, data_loader, loss_criteria):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        batch_count = 0
        for batch, tensor in enumerate(data_loader):
            batch_count += 1
            data, target = tensor
            out = model(data)
            test_loss += loss_criteria(out, target).item()
            _, predicted = torch.max(out.data, 1)
            correct += torch.sum(target==predicted).item()
    avg_loss = test_loss/batch_count
    # print('Validation set: Average loss: {:.6f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        # avg_loss, correct, len(data_loader.dataset),
        # 100. * correct / len(data_loader.dataset)))
    return avg_loss


In [0]:
class ChurnPredictionPipeline:
    def __init__(self, df):
        self.df = df

    def reset_df(self):
        self.df = spark.sql("SELECT * FROM hive_metastore.default.telco_customer_churn")

    def data_cleasing(self):
        self.df = self.df.na.drop()
        self.df = self.df.withColumn("TotalCharges", col("TotalCharges").cast(DoubleType()))
        self.df = self.df.withColumn("Churn", when(col("Churn") == "Yes", 1).otherwise(0))
        self.df = self.df.dropDuplicates(['customerID', "TotalCharges"])
        df_minority = self.df.filter((col("SeniorCitizen") == 1) & (col("PhoneService") == "No"))
        df_minority = df_minority.sample(withReplacement=True, fraction=15.0, seed=42)
        self.df = df_minority.union(self.df.filter((col("SeniorCitizen") == 0) | (col("PhoneService") == "Yes")))
        self.df = self.df.na.drop()

    def feature_engineering(self):
        self.df = self.df.drop("customerID")
        categorical_columns = ["gender",'SeniorCitizen', "Partner", "Dependents", "PhoneService", "MultipleLines", "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod"]
        numeric_columns = ["tenure", "MonthlyCharges", "TotalCharges"]
        for col_name in categorical_columns:
            self.df = one_hot_encode(self.df, col_name)
        for col_name in numeric_columns:
            self.df = normalize_column(self.df, col_name)
    
    def modelling_split_data(self):
        features = [col for col in self.df.columns if col != "Churn"]
        label = "Churn"
        x_train, x_test, y_train, y_test = train_test_split(
            self.df.toPandas()[features].values,
            self.df.toPandas()[label].values,
            test_size=0.30,
            random_state=0)
        print ('\nTraining Set: %d rows, Test Set: %d rows \n' % (len(x_train), len(x_test)))
        torch.manual_seed(0)
        return x_train, x_test, y_train, y_test
    
    def modelling_create_dataloaders(self, x_train, x_test, y_train, y_test, batch_size=20):
        train_x = torch.Tensor(x_train).float()
        train_y = torch.Tensor(y_train).long()
        train_ds = td.TensorDataset(train_x,train_y)
        train_loader = td.DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=1)

        test_x = torch.Tensor(x_test).float()
        test_y = torch.Tensor(y_test).long()
        test_ds = td.TensorDataset(test_x,test_y)
        test_loader = td.DataLoader(test_ds, batch_size=batch_size,
        shuffle=False, num_workers=1)
        return train_loader, test_loader, train_x.shape[1]

    def objective(self, params):
        epochs = int(params['epochs'])
        learning_rate = params['learning_rate']
        batch_size = int(params['batch_size'])

        self.reset_df()
        self.data_cleasing()
        self.feature_engineering()
        x_train, x_test, y_train, y_test = self.modelling_split_data()
        train_loader, test_loader, input_dim = self.modelling_create_dataloaders(x_train, x_test, y_train, y_test, batch_size=batch_size)
        model = ChurnNet(input_dim)
        print(f"Below is the defined architecture: \n{model}\n")

        loss_criteria = nn.CrossEntropyLoss() # loss criteria: CrossEntropyLoss for multi-class classification
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) # optimizer to adjust weights and reduce loss
        optimizer.zero_grad()
        epoch_nums = []
        training_loss = []
        validation_loss = []
        with mlflow.start_run() as run:
            mlflow.log_param("epochs", epochs)
            mlflow.log_param("learning_rate", learning_rate)
            mlflow.log_param("batch_size", batch_size)
            for epoch in range(1, epochs + 1):
                train_loss = train(model, train_loader, optimizer, loss_criteria)
                test_loss = test(model, test_loader, loss_criteria)

                epoch_nums.append(epoch)
                training_loss.append(train_loss)
                validation_loss.append(test_loss)

                mlflow.log_metric("train_loss", train_loss, step=epoch)
                mlflow.log_metric("test_loss", test_loss, step=epoch)
                
            signature = infer_signature(pd.DataFrame(x_test), model(torch.tensor(x_test).float()).detach().numpy())
            mlflow.pytorch.log_model(model, "model", signature=signature)
        return {'loss': test_loss, 
                'status': STATUS_OK}

    def hyperparameter_tuning(self, max_evals=3):
        search_space = {
        'epochs': hp.quniform('epochs', 5, 10, 5),
        'learning_rate': hp.loguniform('learning_rate', -5, -3),
        'batch_size': hp.quniform('batch_size', 5, 30, 5)}
        trials = Trials()
        argmin = fmin(
            fn=self.objective,
            space=search_space,
            algo=tpe.suggest,
            max_evals=max_evals,
            trials=trials)
        
        print("Best param values: ", argmin)
        return argmin


In [0]:
df = spark.sql("SELECT * FROM hive_metastore.default.telco_customer_churn")
pipeline = ChurnPredictionPipeline(df)
Best_params = pipeline.hyperparameter_tuning(max_evals=3)

  0%|          | 0/3 [00:00<?, ?trial/s, best loss=?]                                                     
Training Set: 5968 rows, Test Set: 2559 rows 

  0%|          | 0/3 [00:14<?, ?trial/s, best loss=?]                                                     Below is the defined architecture: 
ChurnNet(
  (fc1): Linear(in_features=46, out_features=15, bias=True)
  (fc2): Linear(in_features=15, out_features=8, bias=True)
  (fc3): Linear(in_features=8, out_features=2, bias=True)
)

  0%|          | 0/3 [00:14<?, ?trial/s, best loss=?]




Uploading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

 33%|███▎      | 1/3 [01:20<02:40, 80.01s/trial, best loss: 0.3987602092784073]                                                                               
Training Set: 5968 rows, Test Set: 2559 rows 

 33%|███▎      | 1/3 [01:35<02:40, 80.01s/trial, best loss: 0.3987602092784073]                                                                               Below is the defined architecture: 
ChurnNet(
  (fc1): Linear(in_features=46, out_features=15, bias=True)
  (fc2): Linear(in_features=15, out_features=8, bias=True)
  (fc3): Linear(in_features=8, out_features=2, bias=True)
)

 33%|███▎      | 1/3 [01:35<02:40, 80.01s/trial, best loss: 0.3987602092784073]

Uploading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

 67%|██████▋   | 2/3 [02:14<01:04, 64.83s/trial, best loss: 0.3987602092784073]                                                                               
Training Set: 5968 rows, Test Set: 2559 rows 

 67%|██████▋   | 2/3 [02:28<01:04, 64.83s/trial, best loss: 0.3987602092784073]                                                                               Below is the defined architecture: 
ChurnNet(
  (fc1): Linear(in_features=46, out_features=15, bias=True)
  (fc2): Linear(in_features=15, out_features=8, bias=True)
  (fc3): Linear(in_features=8, out_features=2, bias=True)
)

 67%|██████▋   | 2/3 [02:28<01:04, 64.83s/trial, best loss: 0.3987602092784073]

Uploading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

100%|██████████| 3/3 [02:55<00:00, 54.04s/trial, best loss: 0.3987602092784073]100%|██████████| 3/3 [02:55<00:00, 58.47s/trial, best loss: 0.3987602092784073]
Best param values:  {'batch_size': 5.0, 'epochs': 10.0, 'learning_rate': 0.008313049444266815}
