# Botnet Logistic Regression Classifier - Parallelized Version

Parallelized version of readFile, normalize, train and accuracy functions. The algorithm converges during training, the cost value decreases asymptotically at each iteration. The final reported accuracy of 92.87% suggests that the model is performing well on the given dataset.

### Preprocessing

The `readFile` function in Python uses PySpark to read data from a specified file, process it, and return it as an RDD. Each row in the file is transformed into a record consisting of a tuple, where the first element is a list of 11 floating-point feature values, and the second element is an integer label (0 or 1). 

In [2]:
import os

def readFile(filename):
    """
    Return an RDD containing the data of filename.
    Each example (row) of the file corresponds to one RDD record.
    Each record of the RDD is a tuple (X,y). “X” is an array containing the 11 features (float number) of an example
    “y” is the 12th column of an example (integer 0/1)
    """

    current_directory = os.getcwd()
    parent_directory = os.path.dirname(current_directory)
    sc = SparkContext.getOrCreate()
    data = sc.textFile(parent_directory + "/" + filename)
    processed_data = data.map(lambda line: line.split(",")).map(
        lambda cols: ([float(x) for x in cols[:11]], int(cols[11]))
    )
    return processed_data

### Normalizing

The `normalize` function, using PySpark, calculates the mean and standard deviation for each feature in an RDD and normalizes the features to have a mean of 0 and a standard deviation of 1. It involves computing sums and sums of squares of features, broadcasting the calculated mean and standard deviation to all nodes, and applying normalization to each record in the RDD.

In [4]:
from pyspark import SparkContext
import numpy as np

def normalize(RDD_Xy):
    sc = SparkContext.getOrCreate()

    # Function to compute sum and sum of squares for each feature
    def compute_sum_and_squares(record):
        X, _ = record
        return (np.array(X), np.array(X)**2, 1)

    # Aggregate the sum and sum of squares for each feature, and count the examples
    sum_squares_count = RDD_Xy.map(compute_sum_and_squares).reduce(
        lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2]))

    # Calculate the mean and variance for each feature
    mean = sum_squares_count[0] / sum_squares_count[2]
    variance = (sum_squares_count[1] / sum_squares_count[2]) - mean**2
    std_dev = np.sqrt(variance)

    # Replace zeros in standard deviation with ones to avoid division by zero
    std_dev[std_dev == 0] = 1

    # Broadcast the mean and std_dev to all the nodes
    broadcast_mean = sc.broadcast(mean)
    broadcast_std_dev = sc.broadcast(std_dev)

    # Function to normalize features
    def normalize_features(record):
        X, y = record
        X_normalized = (X - broadcast_mean.value) / broadcast_std_dev.value
        return (X_normalized, y)

    # Normalize each feature and return the new RDD
    return RDD_Xy.map(normalize_features)

### Training

The `train` function, using PySpark, performs logistic regression training on an RDD dataset. It iterates a specified number of times, updating the model's weights and bias based on computed gradients. In each iteration, it calculates gradients using a distributed approach, updates the weights and bias, and prints the cost for monitoring. The function includes additional helper functions (`compute_gradients` and `compute_cost`) to compute gradients and the cost for each data point, respectively.

In [5]:
from pyspark import SparkContext
import numpy as np

def train(RDD_Xy, iterations, learning_rate, lambda_reg):
    sc = SparkContext.getOrCreate()

    # Number of features (assuming all records have the same number of features)
    num_features = len(RDD_Xy.first()[0])
    m = RDD_Xy.count()  # Total number of examples

    np.random.seed(0)  # For reproducibility
    w = np.random.rand(num_features)  # Weight vector
    b = np.random.rand()  # Bias term

    for i in range(iterations):
        # Broadcast weights and bias
        broadcast_w = sc.broadcast(w)
        broadcast_b = sc.broadcast(b)

        # Compute gradients
        gradients = RDD_Xy.map(lambda x: compute_gradients(x, broadcast_w.value, broadcast_b.value, m, lambda_reg)) \
            .reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))

        # Update weights and bias
        w -= learning_rate * gradients[0]
        b -= learning_rate * gradients[1]

        # Optional: Print cost for monitoring (not recommended for large datasets)
        cost = RDD_Xy.map(lambda x: compute_cost(x, broadcast_w.value, broadcast_b.value, lambda_reg, m)) \
            .reduce(lambda x, y: x + y)
        print(f"Iteration {i}, Cost: {cost}")

    return w, b

def compute_gradients(point, w, b, m, lambda_reg):
    X, y = point
    z = np.dot(X, w) + b
    y_hat = 1 / (1 + np.exp(-z))
    dw = (1 / m) * np.dot(X, (y_hat - y)) + (lambda_reg / m) * w
    db = (1 / m) * np.sum(y_hat - y)
    return dw, db

def compute_cost(point, w, b, lambda_reg, m):
    X, y = point
    z = np.dot(X, w) + b
    y_hat = 1 / (1 + np.exp(-z))
    cost = (-1 / m) * (y * np.log(y_hat) + (1 - y) * np.log(1 - y_hat))
    cost += (lambda_reg / (2 * m)) * np.sum(w ** 2)
    return cost

### Predict

The `predict` function calculates the linear combination of features and weights, adds bias, applies the sigmoid function to derive a probability, and then classifies the input as either class 0 or 1 based on this probability (threshold at 0.5). This function is used for making predictions on individual data examples using trained logistic regression parameters.

In [6]:
import numpy as np

def predict(w, b, X):
    # Compute the linear combination of the weights and the example
    z = np.dot(w, X) + b
    # Apply the sigmoid function to get the probability
    p = 1 / (1 + np.exp(-z))
    # Predict the class label (0 or 1) based on the probability
    if p >= 0.5:
        return 1
    else:
        return 0

### Accuracy

The `accuracy` function, using PySpark, calculates the accuracy of a logistic regression model on an RDD dataset. It maps each record in the RDD to 1 or 0 based on whether the model's prediction matches the actual label, sums up these correct predictions using `reduce`, and then computes the accuracy by dividing the number of correctly classified records by the total number of records in the RDD. The function relies on an external `predict` function to make predictions based on the model's weights (`w`) and bias (`b`).

In [7]:
from predict import predict

def accuracy(w, b, RDD_xy):
    prediction_results = RDD_xy.map(lambda record: 1 if predict(w, b, record[0]) == record[1] else 0)

    # Step 2: Use reduce to sum up the correct predictions
    correctly_classified = prediction_results.reduce(lambda a, b: a + b)

    # Step 4: Calculate accuracy
    accuracy = correctly_classified / RDD_xy.count()
    return accuracy

### Testing

The following code snippet is used for testing the whole system:

In [9]:
from preprocess import readFile
from train import train
from normalize import normalize
from accuracy import accuracy

if __name__ == "__main__":
    # read data
    data = readFile("data/botnet_tot_syn_l.csv")
    # standardize
    data = normalize(data)
    # train
    weights, bias = train(data, 10, 1.5, 0.05)
    # accuracy
    accuracy = accuracy(weights, bias, data)
    print("accuracy:", accuracy)

                                                                                

Iteration 0, Cost: 1.6187735561216878


                                                                                

Iteration 1, Cost: 0.7508907786587498


                                                                                

Iteration 2, Cost: 0.47055519879066693


                                                                                

Iteration 3, Cost: 0.3873216955002531


                                                                                

Iteration 4, Cost: 0.3558542271726476


                                                                                

Iteration 5, Cost: 0.3414883097603333


                                                                                

Iteration 6, Cost: 0.33415582134112765


                                                                                

Iteration 7, Cost: 0.3301571193448983


                                                                                

Iteration 8, Cost: 0.32788337723046357


                                                                                

Iteration 9, Cost: 0.3265529557781806


                                                                                

accuracy: 0.928756


                                                                                