In [1]:
import time
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score,roc_curve, precision_recall_curve, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pandas as pd


In [4]:


def predict(w, b, X):
    z = np.dot(X, w) + b
    return 1 if sigmoid(z) > 0.5 else 0

def sigmoid(z):
    # Use a numerically stable version
    return np.where(z >= 0, 1 / (1 + np.exp(-z)), np.exp(z) / (1 + np.exp(z)))



class ParallelLogisticRegression:
    
    #@staticmethod
    def readFile(self,filename):
        filename = 'botnet_tot_syn_l.csv'
        
        rdd = self.spark.sparkContext.textFile(filename)
        header = rdd.first()
        data_rdd = rdd.filter(lambda line: line != header) \
                     .map(lambda line: [float(x) for x in line.split(',')])
        return data_rdd.map(lambda x: (np.array(x[:-1]), x[-1]))

    @staticmethod
    def normalize(rdd_Xy):
        count = rdd_Xy.count()
        sums = rdd_Xy.map(lambda x: x[0]).reduce(lambda a, b: a + b)
        means = sums / count
        var_sums = rdd_Xy.map(lambda x: (x[0] - means) ** 2).reduce(lambda a, b: a + b)
        stds = np.sqrt(var_sums / count)
        
        # Normalize features and cache the RDD for reuse in training
        normalized_rdd = rdd_Xy.map(lambda x: ((x[0] - means) / stds, x[1]))
        normalized_rdd.persist(StorageLevel.MEMORY_ONLY)  # Caching normalized data
        return normalized_rdd

    def train(rdd_Xy, iterations, learning_rate, lambda_reg):

        n = len(rdd_Xy.first()[0])  # number of features
        w = np.zeros(n)
        b = 0
        m = rdd_Xy.count()
        costs = []
        

        for i in range(iterations):
            # Parallel computation of gradients and cost
            gradients = rdd_Xy.map(lambda x: {
                'pred': sigmoid(np.dot(x[0], w) + b), 
                'x': x[0],  
                'y': x[1]   
            }).map(lambda d: {
                'dw': (d['pred'] - d['y']) * d['x'],
                'db': d['pred'] - d['y'],
                'cost': -d['y'] * np.log(d['pred']) - (1 - d['y']) * np.log(1 - d['pred'])
            }).reduce(lambda a, b: {
                'dw': a['dw'] + b['dw'],
                'db': a['db'] + b['db'],
                'cost': a['cost'] + b['cost']
            })

            # Update weights and bias
            dw = gradients['dw'] / m + (lambda_reg / m) * w
            db = gradients['db'] / m
            cost = gradients['cost'] / m + (lambda_reg / (2 * m)) * np.sum(w ** 2)

            # Gradient descent update step
            w -= learning_rate * dw
            b -= learning_rate * db
            costs.append(cost)
            
            # Log cost every 10 iterations to reduce verbosity
            if i % 2 == 0:
                print(f"Iteration {i}, Cost: {cost}")
        
        # Unpersist RDD after training
        rdd_Xy.unpersist()
        
        return w, b, costs

    @staticmethod
    def accuracy(w, b, rdd_Xy):

        correct_predictions = rdd_Xy.map(lambda x: (predict(w, b, x[0]) == x[1])).reduce(lambda a, b: a + b)
        return correct_predictions / rdd_Xy.count()

    
    def cross_validate(self,normalized_rdd, k_folds, iterations, learning_rate, lambda_reg):
        if hasattr(self, 'spark') and self.spark is not None:
            self.spark.stop()
            self.spark=None
        
        self.spark = SparkSession.builder \
            .appName("LogisticRegression") \
            .getOrCreate()
        filename = 'botnet_tot_syn_l.csv'
        data_rdd = self.readFile(filename)
        
        partitions = data_rdd.randomSplit([1 / k_folds] * k_folds)
        accuracies = []
        
       
        
        for i in range(k_folds):
            # Split data into training and testing sets
            test_rdd = partitions[i]
            train_rdd = self.spark.sparkContext.union([partitions[j] for j in range(k_folds) if j != i])

            # Normalize training data and train model
            normalized_train_rdd = ParallelLogisticRegression.normalize(train_rdd)
            w, b, _ = ParallelLogisticRegression.train(normalized_train_rdd, iterations, learning_rate, lambda_reg)

            # Calculate accuracy on test set
            norm_test_rdd = ParallelLogisticRegression.normalize(test_rdd)
            accuracy = ParallelLogisticRegression.accuracy(w, b, norm_test_rdd)
            accuracies.append(accuracy)
            print(f"Fold {i}, Accuracy: {accuracy}")

        avg_accuracy = np.mean(accuracies)
        print(f"Cross-validation Average Accuracy: {avg_accuracy}")
        self.spark.stop()
        self.spark=None

        return avg_accuracy
    



In [5]:
# Perform cross-validation
print("\nPerforming cross-validation...")
learning_rates = 0.1
lambda_regs = 0.1

model=ParallelLogisticRegression()
acc =model.cross_validate(
        normalized_rdd=None,
        k_folds=4,
        iterations=10,
        learning_rate=learning_rates,
        lambda_reg=lambda_regs)
    



Performing cross-validation...


[Stage 4:>                                                          (0 + 1) / 1]

24/11/17 22:31:40 WARN BlockManager: Task 82 already completed, not releasing lock for rdd_10_0


                                                                                

Iteration 0, Cost: 0.6931471805597181


                                                                                

Iteration 2, Cost: 0.6397212987754329


                                                                                

Iteration 4, Cost: 0.5952658703260666


                                                                                

Iteration 6, Cost: 0.5580265839446013


                                                                                

Iteration 8, Cost: 0.5265667702637546


                                                                                

Fold 0, Accuracy: 0.8988173931213508


[Stage 24:>                                                         (0 + 1) / 1]

24/11/17 22:33:33 WARN BlockManager: Task 506 already completed, not releasing lock for rdd_34_0


                                                                                

Iteration 0, Cost: 0.6931471805597209


                                                                                

Iteration 2, Cost: 0.6398390848507236


                                                                                

Iteration 4, Cost: 0.5954649753004224


                                                                                

Iteration 6, Cost: 0.5582811763032497


                                                                                

Iteration 8, Cost: 0.52685895885351


                                                                                

Fold 1, Accuracy: 0.8998649631245456


[Stage 44:>                                                         (0 + 1) / 1]

24/11/17 22:35:14 WARN BlockManager: Task 930 already completed, not releasing lock for rdd_57_0


                                                                                

Iteration 0, Cost: 0.6931471805597209


                                                                                

Iteration 2, Cost: 0.6398266593605021


                                                                                

Iteration 4, Cost: 0.5954454143558198


                                                                                

Iteration 6, Cost: 0.5582578004278489


                                                                                

Iteration 8, Cost: 0.5268338034851011


                                                                                

Fold 2, Accuracy: 0.9000075966878441


[Stage 64:>                                                         (0 + 1) / 1]

24/11/17 22:36:56 WARN BlockManager: Task 1354 already completed, not releasing lock for rdd_80_0


                                                                                

Iteration 0, Cost: 0.693147180559722


                                                                                

Iteration 2, Cost: 0.6398787424451801


                                                                                

Iteration 4, Cost: 0.5955313140177895


                                                                                

Iteration 6, Cost: 0.5583653633256157


                                                                                

Iteration 8, Cost: 0.5269551934701853


                                                                                

Fold 3, Accuracy: 0.9004391160247437
Cross-validation Average Accuracy: 0.899782267239621
