In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

In [2]:
df_load = pd.read_csv('customer_experience_data.csv')

# clean the data: fill the missing data with the mean
def clean(df):
    for col in df.columns:
        mean_ = df[col].mean()
        df[col] = df[col].fillna(mean_)
    return df

df_numerical = df_load[['Num_Interactions', 'Feedback_Score', 'Products_Purchased', 'Products_Viewed','Time_Spent_on_Site','Satisfaction_Score']].copy()
df = clean(df_numerical)

In [3]:
#split the data into training dataset and testing dataset
train,test = train_test_split(df, train_size = 0.7, random_state=42)

X_train = train[['Num_Interactions', 'Feedback_Score', 'Products_Purchased', 'Products_Viewed', 'Time_Spent_on_Site']].to_numpy()
X_test = test[['Num_Interactions', 'Feedback_Score', 'Products_Purchased', 'Products_Viewed', 'Time_Spent_on_Site']].to_numpy()
y_train = train['Satisfaction_Score'].to_numpy()
y_test = test['Satisfaction_Score'].to_numpy()

# normalize the data using z-score
mean = X_train.mean(axis = 0)
sd = X_train.std(axis = 0)
X_train_normalized = (X_train - mean) / sd
X_test_normalized = (X_test - mean) / sd

In [4]:
# Model 1 :linear regression model: build from scratch
def linear_w_b(X, y, alpha, threshold):
    # initialize the w and b
    w = np.ones(X.shape[1])
    b = 1

    cost_previous = float('Inf')
    improvement = threshold + 1
    
    while improvement > threshold:

         #get the y_hat using the current w and b
        y_hat = np.matmul(X, w)+ b
        
         # get the gradient of w and b
        m = len(y)
        X_T = np.transpose(X)
        w_grad = (1 / m) * X_T @ (y_hat - y)
        b_grad = (1 / m) * (np.sum(y_hat - y))
        
        w = w - alpha * w_grad
        b = b - alpha * b_grad
        
        # compute the loss function
        cost = (1 / (2 * m)) * (np.sum(np.square(y_hat - y)))
        improvement = cost_previous - cost
        
        # prevent the divergence
        if cost > cost_previous:
            return 'model is diverging'
        
        cost_previous = cost
        
    return w, b

In [5]:
#fit the model and get the test accracy
w, b = linear_w_b(X_train_normalized, y_train, alpha=0.001, threshold=1e-20)
prediction_linear = np.matmul(X_test_normalized, w) + b

In [6]:
# Model 2: k-Nearest Neighbors for regression: build from scratch
class KNN():
    def __init__(self, X_train, y_train, k = 3):
        self.k = k
        self.X_train = X_train
        self.y_train = y_train
    
    def predict(self, X_test):
        predictions = []
        
        for row_test in X_test:
            #Euclidean distances
            distance = []
            for row_train in self.X_train:
                difference_sum = np.sum((row_train - row_test) ** 2)
                distance_ = np.sqrt(difference_sum)
                distance.append(distance_)
            
            distance = np.array(distance)
            sorted_distance_index = np.argsort(distance)
            k_index = sorted_distance_index[:self.k]
            selected_y = self.y_train[k_index]
            prediction = np.mean(selected_y)
            predictions.append(prediction)
                
        return np.array(predictions)

In [7]:
#fit the model and get the test accracy
KNN_ = KNN(X_train_normalized, y_train, k=3)
prediction_knn = KNN_.predict(X_test_normalized)

In [8]:
# Model 3: Multivariant Polynomial Regression: use library
polynomial_feature = PolynomialFeatures(degree=2)

#transform the data to prepare for the fit
X_train_transformed = polynomial_feature.fit_transform(X_train_normalized)
X_test_transformed = polynomial_feature.transform(X_test_normalized)

#fit the polynomial model
polynomial = LinearRegression()
polynomial.fit(X_train_transformed, y_train)

#make prediction on test data
prediction_polynomial = polynomial.predict(X_test_transformed)

In [9]:
# determine the accuracy
def accuracy(y_real, y_predict):
    rmse = np.sqrt(mean_squared_error(y_real, y_predict))
    message = f'rmse = {rmse}'
    return message

print(f'linear regression {accuracy(y_test, prediction_linear)}')
print(f'knn {accuracy(y_test, prediction_knn)}')
print(f'polynomial {accuracy(y_test, prediction_polynomial)}')

linear regression rmse = 2.8658289792600367
knn rmse = 3.3302207690137258
polynomial rmse = 2.942011299575351


Conclusion:
1. Linear Regression is the best model for fitting the data.
2. The difference between the performace of Multivariant Polynomial Regression and Linear Regression is small.

In [10]:
# Node class: determine which independent variable is most important
class Node():
    def __init__(self, feature_index = None, threshold = None, left = None, right = None, variance_reduction = None, value = None):
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.variance_reduction = variance_reduction
        self.value = value

# Tree class
class DecisionTree():
    def __init__(self, min_samples_split = 2, max_depth = 2):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.root = None
        
    def build_tree(self, X, y, current_depth = 0):
        nr_samples = X.shape[0]
        nr_features = X.shape[1]
        
        # split further if the limits are not met
        if nr_samples >= self.min_samples_split and current_depth <= self.max_depth:
            best_split = self.get_best_split(X, y, nr_samples, nr_features)
            
            # split further if the variance reduces
            if best_split['variance_reduction'] > 0:
                left_subtree = self.build_tree(best_split['subset_left_X'], best_split['subset_left_y'],current_depth + 1)
                right_subtree = self.build_tree(best_split['subset_right_X'], best_split['subset_right_y'],current_depth + 1)
                
                node = Node(best_split['feature_index'], best_split['threshold'], left_subtree, right_subtree, best_split['variance_reduction'])
                return node
        
        # in other cases: create a leaf node
        else:
            leaf_value = self.get_leaf_value(y)
            node = Node(value = leaf_value)
            return node
        
    def variance_reduction(self, parents, left_child, right_child):
        weight_left = len(left_child) / len(parents)
        weight_right = len(right_child) / len(parents)

        variance_reduction_ = np.var(parents) - (weight_left * np.var(left_child) + weight_right * np.var(right_child))
        return variance_reduction_

    def get_leaf_value(self, y):
        mean_y = np.mean(y)
        return mean_y

    def split_left_right(self, X, y, feature_index, threshold):
        boolean_left = X[:, feature_index] <= threshold
        boolean_right = X[:, feature_index] > threshold
        
        left_X = X[boolean_left]
        left_y = y[boolean_left]
        right_X = X[boolean_right]
        right_y = y[boolean_right]

        return left_X, left_y, right_X, right_y
    
    def get_best_split(self, X, y, nr_samples, nr_features):
        best_split = {}
        max_variance_reduction = -float('inf')
        
        # iterate over features to find best split
        for i in range(nr_features):
            feature_values = X[:, i]
            potential_thresholds = np.unique(feature_values)
            
            # calculate variance reduction for every threshold in each feature
            for threshold in potential_thresholds:
                left_X, left_y, right_X, right_y = self.split_left_right(X, y, i, threshold)
                if len(left_X) > 0 and len(right_X) > 0:
                    current_variance_reduction = self.variance_reduction(y, left_y, right_y)
                    
                    # update the best split based on variance reduction
                    if current_variance_reduction > max_variance_reduction:
                        best_split['variance_reduction'] = current_variance_reduction
                        best_split['feature_index'] = i
                        best_split['threshold'] = threshold
                        best_split['subset_left_X'] = left_X
                        best_split['subset_left_y'] = left_y
                        best_split['subset_right_X'] = right_X
                        best_split['subset_right_y'] = right_y
                        
                        max_variance_reduction = current_variance_reduction
            
        return best_split
    
    def print_tree(self, tree = None, indent = ' '):
        if not tree:
            tree = self.root
        if tree.value is not None:
            print(tree.value)
            
        else:
            
            print(indent + "X_" + 
                  str(tree.feature_index) 
                  + " <= " + str(tree.threshold) 
                  + " (var_red: " 
                  + str(tree.variance_reduction) 
                  + ")")
            
            print(f"{indent}    Left:")
            self.print_tree(tree.left, indent + '       ')
            print(f"{indent}    Right:")
            self.print_tree(tree.right, indent + '       ')
            
    def fit(self, X, y):
        self.root = self.build_tree(X, y)
        
    def get_y_hat(self, single_x, tree):
        if tree.value is not None:
            return tree.value
        feature_value = single_x[tree.feature_index]
        
        if feature_value <= tree.threshold:
            y_hat = self.get_y_hat(single_x, tree.left)
            return y_hat
        else: 
            y_hat = self.get_y_hat(single_x, tree.right)
            return y_hat
    
    def predict(self, X):
        prediction = []
        for single_x in X:
            prediction.append(self.get_y_hat(single_x, self.root))
        
        return prediction
        
    def get_feature_importance(self):
        importance = {}
        
        def variance_accelerator(node):
            if node is None or node.value is not None:  
                return
            
            # accumulate the variance reduction for the feature
            feature_index = node.feature_index
            if feature_index in importance:
                importance[feature_index] += node.variance_reduction
            else:
                importance[feature_index] = node.variance_reduction
                
            #go to each of the left and right branches to visit every node
            variance_accelerator(node.left)
            variance_accelerator(node.right)
        
        #call from the beginning of the tree
        variance_accelerator(self.root)
        
        total = sum(importance.values())
        for f in importance:
            importance[f] = importance[f]/total
        return importance


In [11]:
# apply the model
decision_tree = DecisionTree(min_samples_split = 4, max_depth = 6)
decision_tree.fit(X_train, y_train)
# decision_tree.print_tree()

decision_tree.get_feature_importance()

{0: 0.103729187548726,
 3: 0.2061139724114159,
 4: 0.21552791102752097,
 2: 0.22849904912264762,
 1: 0.24612987988968943}

In [12]:
# test the model
predict_train = decision_tree.predict(X_train)
print(np.sqrt(mean_squared_error(y_train, predict_train)))

predict_test = decision_tree.predict(X_test)
print(np.sqrt(mean_squared_error(y_test, predict_test)))

2.457866745968065
3.316548895830722


Conclusion: 
1. The model fit is good, without big overfitting
2. The most important variable to predict Satisfaction Score: 'Feedback_Score'.