# Regression

## Description
This Python script is to do linear or logistic regression, using gradient descent or normal equation.

In [370]:
#imports
# Scientific Calculation Imports
import numpy as np
import pandas as pd

# Built-in Module Imports
import math
import time
import enum
import abc

In [371]:
# Regression Type
class RegressionType(enum.Enum):
    linear = 1
    logistic = 2

In [372]:
# Linear Regression Algorithm Class
class RegressionAlgorithm(enum.Enum):
    unspecified = 0
    gradient_descent = 1
    normal_equation = 2

In [373]:
class DataProcessor():
    
    @staticmethod
    def add_x0_column(A):
        return np.insert(A, obj=0, values=1, axis=1)

    @staticmethod
    def augmented_to_coefficient_and_b(A):
        return (A[:, :-1], A[:, -1])

    @staticmethod
    def partition(A, atInd):
        return (A[:atInd], A[atInd:])
    
    @staticmethod
    def get_unique_categories(output, case_sensitive=True):
        if not case_sensitive:
            output = [x.lower() if isinstance(x, str) else x for x in output]
        return np.unique(output)
    
    @staticmethod
    def get_unique_categories_and_binary_outputs(output, case_sensitive=True):
        unique_cat = DataProcessor.get_unique_categories(output, case_sensitive)

        if np.size(unique_cat) <= 2:
            outputs_b = np.zeros(np.size(output))
            mask_0 = (output != unique_cat[0])
            mask_1 = (output == unique_cat[0])
            outputs_b[mask_0] = 0
            outputs_b[mask_1] = 1
        else:
            outputs_b = np.tile(output, (np.size(unique_cat), 1))
            for i, cat in enumerate(unique_cat):
                row = outputs_b[i]
                mask_0 = (output != cat)
                mask_1 = (output == cat)
                row[mask_0] = 0
                row[mask_1] = 1
        
        return (unique_cat, outputs_b)
                

In [374]:
class DataScalar():
    
    def __init__(self, data, data_has_x0_column=False):
        self.__data = data
        self.__data_has_x0_column = data_has_x0_column 
        self.__scalars = np.ones(np.size(data, axis=1))
        self.__calculate_scalars()
    
    def scaled_data(self):
        return self.scale_new_data(self.__data, self.__data_has_x0_column)
    
    def scale_new_data(self, data, input_has_x0_column=False):
        if input_has_x0_column:
            avg = np.insert(self.__avg, obj=0, values=0)
            std = np.insert(self.__std, obj=0, values=1)
        else:
            avg = self.__avg
            std = self.__std
        return (data - avg) / std
        
    def __calculate_scalars(self):
        if self.__data_has_x0_column:
            self.__avg = np.average(self.__data[:, 1:], axis=0)
            self.__std = np.std(self.__data[:, 1:], axis=0)
        else:
            self.__avg = np.average(self.__data, axis=0)
            self.__std = np.std(self.__data, axis=0)

In [375]:
class RegressionTrainer(metaclass=abc.ABCMeta):
    
    def __init__(self, coefficient_matrix, outputs, regularization_lambda=0.0):
        self.__x = coefficient_matrix
        self.__y = outputs
        self.__regularization_lambda = regularization_lambda
        self.__setup_training()

    @property
    def weights(self):
        return self.__theta

    def start_training(self,
                      training_algorithm=RegressionAlgorithm.unspecified,
                      learning_rate=0.01,
                      print_cost_while_training=False):
        self.__reset_thetas()
        self.__print_start_training_message_and_log_time()
        
        self.__setup_training()
        if not training_algorithm:
            training_algorithm = self.__calculate_optimized_training_alg()
        
        self.__train(self,
                     training_algorithm,
                     learning_rate,
                     print_cost_while_training)
        
        self.__print_end_training_message()

    # Should override in subclass
    def __setup_training(self):
        print('Initializing trainer......')    

    @abc.abstractmethod
    def __calculate_optimized_training_alg(self):
        pass

    @abc.abstractmethod
    def __hypothesis(self):
        pass
    
    @abc.abstractmethod
    def __cost(self):
        pass
    
    @abc.abstractmethod
    def __derivative_of_cost(self):
        pass
    
    @abc.abstractmethod
    def __train(self,
                training_algorithm=RegressionAlgorithm.unspecified,
                learning_rate=0.01,
                print_cost_while_training=False):
        pass
    
    def __get_num_features(self):
        return np.size(self.__x, axis=1)
    
    def __get_num_samples(self):
        return np.size(self.__x, axis=0)
    
    def __print_start_training_message_and_log_time(self):
        print('Started training......')
        self.__training_start_time = time.time()
    
    def __print_end_training_message(self):
        end_time = time.time()
        print('Used {0:.10f} seconds to train model with {1} samples and {2} features.'.format\
             (end_time - start_time, self.__get_num_samples, self.__get_num_features - 1))
    
    def __reset_thetas(self):
        self.__theta.fill(0)

In [376]:
class RegressionTrainerLinear(RegressionTrainer):
    
    def __get_feature_count_threshold(self):
        return 10000
    
    # Override
    def __setup_training(self):
        super().__setup_training()
        self.__theta = np.zeros(np.size(self.__data, axis=1))
    
    # Override (abstract)
    def __calculate_optimized_training_alg(self):
        feature_count_small = self.__get_num_features < self.__get_feature_count_threshold()
        if feature_count_small:
            return RegressionAlgorithm.normal_equation
        else:
            return RegressionAlgorithm.gradient_descent
    
    # Override (abstract)
    def __hypothesis(self):
        return self.__theta @ self.__x.transpose()
    
    # Override (abstract)
    def __cost(self):
        h_theta_x = self.__hypothesis()
        diff = self.__y - h_theta_x
        diff_squared = np.power(diff, 2)
        diff_squared_sum = np.sum(diff_squared)
        theta_squared = np.power(self.__theta, 2)
        theta_squared_sum = np.sum(theta_squared)
        total = diff_squared_sum + self.__regularization_lambda * theta_squared_sum
        return total / (2 * self.__get_num_samples())
    
    # Override (abstract)
    def __derivative_of_cost(self):
        h_theta_x = self.__hypothesis()
        diff = h_theta_x - self.__y
        diff_scaled_with_x = (self.__x.transpose() * diff).transpose()
        regularization_vector = self.__theta * self.__regularization_lambda / self.__get_num_samples()
        return np.average(diff_scaled_with_x, axis=0) + regularization_vector

    # Override (abstract)
    def __train(self,
                training_algorithm=RegressionAlgorithm.unspecified,
                learning_rate=0.01,
                print_cost_while_training=False):
        if training_algorithm == RegressionAlgorithm.gradient_descent:
            self.__train_with_gradient_descent(learning_rate, print_cost_while_training)
        elif training_algorithm == RegressionAlgorithm.normal_equation:
            self.__train_with_normal_equation()
        else:
            raise ValueError('Cannot start training, no linear regression algorithm specified.')

    def __train_with_gradient_descent(self,
                                      learning_rate=0.01,
                                      print_cost_while_training=False):
        last_cost = self.__cost_of_training_set()
        cost_not_change_count = 0
        cost_check_frequency = 10
        i = 1
        # If the cost hasn't changed in 20 (2 * 10) iterations, it converged.
        while cost_not_change_count < 2:
            self.__theta -= self.__derivative_of_cost() * self.__learning_rate
            # Check and print cost every 10 iterations
            if i == 1 or i % cost_check_frequency == 0:
                current_cost = self.__cost()
                if print_cost_while_training:
                    print('Cost of iteration {0}: {1:.2f}'.format(i, current_cost))
                if current_cost == last_cost:
                    cost_not_change_count += 1
                last_cost = current_cost
            i += 1

    def __train_with_normal_equation(self):
        x = self.__x
        x_trans = x.transpose()
        y = self.__y
        regularization_matrix = np.identity(self.__get_num_features())
        regularization_matrix[0][0] = 0
        regularization_matrix *= self.__regularization_lambda
        try:
            result = np.linalg.inv(x_trans @ x + regularization_matrix) @ x_trans @ y
        except ValueError as e:
            raise Exception('Cannot calculate weights with normal equation.') from e
        else:
            self.__theta = result

In [377]:
class RegressionTrainerLogistic(RegressionTrainer):
    
    def __init__(self,
                 coefficient_matrix,
                 outputs,
                 regularization_lambda=0.0,
                 output_case_sensitive=True):
        super().__init__(coefficient_matrix, outputs, regularization_lambda)
        self.__output_case_sensitive = output_case_sensitive
    
    @property
    def categories(self):
        return self.__categories

    def __get_num_categories(self):
        if self.__regression_type == RegressionType.logistic:
            return np.size(self.__categories)
        else:
            raise InterruptedError('Should not query number of categories in non-logistic regression.')
    
    # Override
    def __setup_training(self):
        super().__setup_training()
        unique_cat, b_output = DataProcessor.get_unique_categories_and_binary_outputs(self.__y, self.__output_case_sensitive)
        self.__categories = unique_cat
        self.__y = b_output
        feature_count = np.size(self.__data, axis=1)
        cat_count = np.size(self.__categories, axis=0)
        if cat_count < 2:
            raise ValueError('Cannot do logistic regression, there is only one kind of output.')
        elif cat_count == 2:
            self.__binary_classification = True
            self.__theta = np.zeros(feature_count)
        else:
            self.__binary_classification = False
            theta_shape = (cat_count, feature_count)
            self.__theta = np.zeros(shape=theta_shape)
            
    # Override (abstract)
    def __calculate_optimized_training_alg(self):
        return RegressionAlgorithm.gradient_descent
    
    # Override (abstract)
    def __hypothesis(self):
        theta_transpose_x = self.__theta @ self.__x.transpose()
        result = np.zeros(shape=(self.__get_num_categories, self.__get_num_samples))
        result.fill(math.e)
        result = result ** (-1 * theta_transpose_x)
        result = 1/ (1 + result)
        return result

    # Override (abstract)
    def __train(self,
                training_algorithm=RegressionAlgorithm.unspecified,
                learning_rate=0.01,
                print_cost_while_training=False):
        if training_algorithm == RegressionAlgorithm.gradient_descent:
            self.__train_with_gradient_descent(learning_rate, print_cost_while_training)
        else:
            raise ValueError('Cannot start training, no logistic regression algorithm specified.')
    
    def __train_with_gradient_descent(self,
                                      learning_rate=0.01,
                                      print_cost_while_training=False):
        pass # TODO

In [378]:
class RegressionPredictor(metaclass=abc.ABCMeta):
    def __init__(self, weights, data_scalar):
        self.__weights = weights
        self.__data_scalar = data_scalar
    
    @abc.abstractmethod
    def predict(self, data):
        pass

In [379]:
class RegressionPredictorLinear(RegressionPredictor):
    # Override (abstract)
    def predict(self, data):
        scaled_data = self.__data_scalar.scale_new_data(data, False)
        scaled_data = DataProcessor.add_x0_column(scaled_data)
        return self.__weights @ scaled_data.transpose()

In [380]:
class RegressionPredictorLogistic(RegressionPredictor):
    
    # Override
    def __init__(self, weights, data_scalar, categories):
        super().__init__(weights, data_scalar)
        self.__categories = categories

    # Override (abstract)
    def predict(self, data):
        scaled_data = self.__data_scalar.scale_new_data(data, False)
        scaled_data = DataProcessor.add_x0_column(scaled_data)
        hypothesis = self.__weights @ scaled_data.transpose()
        max_ind = np.argmax(hypothesis, axis=0)
        return [self.__categories[ind] for ind in max_ind]

In [381]:
class RegressionSetup(metaclass=abc.ABCMeta):
    def __init__(self,
                 data,
                 test_sample_ratio=0.05,
                 learning_rate=0.01,
                 regularization_lambda=0.0,
                 regression_algorithm=RegressionAlgorithm.unspecified):
        if data is None:
            raise ValueError('Cannot initialize regression setup, no data.')
        if not 0 <= test_sample_ratio < 1:
            raise ValueError('Cannot initialize regression setup, invaild test sample ratio.')

        self.data = data
        self.test_sample_ratio = test_sample_ratio
        self.learning_rate = learning_rate
        self.regularization_lambda = regularization_lambda
        self.regression_algorithm = regression_algorithm
        
    @abc.abstractproperty
    def regression_type(self):
        pass

In [382]:
class RegressionSetupLinear(RegressionSetup):
        
    # Override (abstract)
    @property
    def regression_type(self):
        return RegressionType.linear

In [383]:
class RegressionSetupLogistic(RegressionSetup):
    def __init__(self,
                 data,
                 test_sample_ratio=0.05,
                 learning_rate=0.01,
                 regularization_lambda=0.0,
                 regression_algorithm=RegressionAlgorithm.unspecified,
                 output_case_sensitive=True):
        super().__init__(data, test_sample_ratio, learning_rate, regularization_lambda, regression_algorithm)
        self.output_case_sensitive = output_case_sensitive

    # Override (abstract)
    @property
    def regression_type(self):
        return RegressionType.logistic

In [384]:
class Regression():
    def __init__(self, setup):
        self.__trained = False
        self.__raw_data = setup.data
        self.__reg_type = setup.regression_type
        self.__test_sample_ratio = setup.test_sample_ratio
        self.__learning_rate = setup.learning_rate
        self.__regularization_lambda = setup.regularization_lambda
        if self.__reg_type == RegressionType.logistic:
            self.__output_case_sensitive = setup.output_case_sensitive
        elif self.__reg_type == RegressionType.linear:
            self.__reg_alg = setup.regression_algorithm
        self.__setup_samples()
        
        if self.__reg_type == RegressionType.linear:
            self.__trainer = RegressionTrainerLinear(coefficient_matrix=self.__x_training,
                                                     outputs=self.__y_training,
                                                     regularization_lambda=self.__regularization_lambda)
            print('Linear trainer setup.')
        elif self.__reg_type == RegressionType.logistic:
            self.__trainer = RegressionTrainerLogistic(coefficient_matrix=self.__x_training,
                                                       outputs=self.__y_training,
                                                       regularization_lambda=self.__regularization_lambda,
                                                       output_case_sensitive=self.__output_case_sensitive)
            print('Logistic trainer setup.')
            
    def train(self):
        self.__trained = False
        if not self.__trainer:
            raise AttributeError('Cannot start training, trainer not found.')
        self.__trainer.start_training(training_algorithm=self.__reg_alg,
                                      learning_rate=self.__learning_rate,
                                      print_cost_while_training=False)
        self.__trained = True
        self.__setup_predictor()
        self.__print_error_rate()

    def predict(self, data):
        if not self.__predictor:
            raise Exception('Cannot predict, no predictor found.')
        return self.__predictor.predict(data)

    def __setup_samples(self):
        if self.__raw_data is None:
            raise ValueError('Cannot setup samples, no data.')
        num_training_sample = self.__get_training_sample_count()
        self.__x_training = self.__raw_data[:num_training_sample, :-1]
        self.__y_training = self.__raw_data[:num_training_sample, -1]
        self.__x_testing = self.__raw_data[num_training_sample:, :-1]
        self.__y_testing = self.__raw_data[num_training_sample:, -1]
        self.__preprocess_training_set_features()
    
    def __setup_predictor(self):
        if not self.__trained:
            raise Exception('Cannot setup predictor, model has not been trained.')
        if self.__reg_type == RegressionType.linear:
            self.__predictor = RegressionPredictorLinear(weights=self.__trainer.weights,
                                                         data_scalar=self.__data_scalar)
        elif self.__reg_type == RegressionType.logistic:
            self.__predictor = RegressionPredictorLogistic(weights=self.__trainer.weights,
                                                           data_scalar=self.__data_scalar,
                                                           categories=self.__trainer.categories)

    def __print_error_rate(self):
        error_rate = self.__get_testing_set_error_rate()
        print('Error rate is {0:.2f}%.'.format(error_rate * 100))

    def __get_training_sample_count(self):
        total_sample_count = np.size(self.__raw_data, axis=0)
        return math.ceil((1.0 - self.__test_sample_ratio) * total_sample_count)
    
    def __preprocess_training_set_features(self):
        if self.__x_training is None:
            raise ValueError('Cannot preprocess training set features, no data.')
        self.__data_scalar = DataScalar(self.__x_training)
        self.__x_training = self.__data_scalar.scaled_data()
        self.__x_training = DataProcessor.add_x0_column(self.__x_training)
    
    def __get_testing_set_error_rate(self):
        if not self.__predictor:
            raise Exception('Cannot get error rate, no predictor found.')
        testing_sample_predictions = self.predict(self.__x_testing)
        return self.__get_error_rate(testing_sample_predictions, self.__y_testing)
            
    def __get_error_rate(self, prediction, actual):
        if self.__reg_type == RegressionType.linear:
            diff = np.abs((prediction - actual)/actual)
            diff = diff[~np.isnan(diff)]
            return np.average(diff)
        elif self.__reg_type == RegressionType.logistic:
            match = prediction == actual
            match_count = np.count_nonzero(match)
            total_count = np.size(actual)
            return (total_count - match_count) / total_count

## Linear Regression Testing

In [385]:
# Get Data
df = pd.read_csv('housing_data/housing.data', header=None, delim_whitespace=True)
data_linear_reg = df.as_matrix()
if data_linear_reg is not None:
    print('Successfully queried data.')

Successfully queried data.


### Gradient Descent Testing

In [386]:
# Initialize Setup
setup = RegressionSetupLinear(data=data_linear_reg,
                              test_sample_ratio=0.05,
                              learning_rate=0.01,
                              regularization_lambda=0.0,
                              regression_algorithm=RegressionAlgorithm.gradient_descent)

In [387]:
# Initialize Regression
regression = Regression(setup)
regression.train()

TypeError: Can't instantiate abstract class RegressionTrainerLinear with abstract methods _RegressionTrainer__calculate_optimized_training_alg, _RegressionTrainer__cost, _RegressionTrainer__derivative_of_cost, _RegressionTrainer__hypothesis, _RegressionTrainer__train