In [1]:
import findspark
import numpy as np
from pyspark.sql import SparkSession
from pyspark import RDD

In [2]:
findspark.init()

spark = SparkSession.builder.master("local[*]").appName('Exercise_2').getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/19 00:54:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Previous functions from Exercise 1

In [3]:
def readFile(filename: str) -> RDD:
    rdd = sc.textFile(filename)
    return (
        rdd.map(lambda x: x.split(","))
        .map(lambda x: tuple(map(float, x)))
        .map(lambda x: (x[:-1], int(x[-1])))
    )

def normalize(rdd_xy: RDD) -> RDD:
    rdd_X = rdd_xy.map(lambda x: x[0])
    _ = rdd_xy.map(lambda x: x[1])  # <--- CUIDADO, no se usa, así que habría que borrarlo

    mu, rows = rdd_X.map(lambda x: (np.array(x), 1)).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    mu /= rows
    std = (rdd_X.map(lambda x: (np.array(x) - mu) ** 2).reduce(lambda x, y: x + y) / rows) ** 0.5

    return rdd_xy.map(lambda x: ((np.array(x[0]) - mu) / std, x[1]))

def train(rdd_xy: RDD, iterations: int, learning_rate: float, lambda_reg: float, show_logs: bool = True) -> tuple[np.ndarray, float]:
    its_to_print = list(range(0, iterations, iterations // 10)) + [iterations]
    k = len(rdd_xy.first()[0])
    m = rdd_xy.count()
    w = np.random.rand(k)
    b = 0

    for it in range(iterations):
        # Gradients
        dw = (
            rdd_xy.map(lambda x: (predict_proba(w, b, x[0]) - x[1]) * np.array(x[0]))
            .reduce(lambda x, y: x + y) / m
        )
        db = (
            rdd_xy.map(lambda x: predict_proba(w, b, x[0]) - x[1])
            .reduce(lambda x, y: x + y) / m
        )

        # Regularization
        dw += lambda_reg / k * w

        # Update parameters
        w -= learning_rate * dw
        b -= learning_rate * db

        # Logging
        if show_logs and it in its_to_print:
            c = cost(w, b, rdd_xy, lambda_reg, k, m)
            acc = accuracy(w, b, rdd_xy)
            print(f"Iteration {it}/{iterations}, Cost: {c:.4f}, Accuracy: {acc:.4f}")

    return w, b


def sigmoid(logit: np.ndarray) -> np.ndarray:
    return 1 / (1 + np.exp(-np.clip(logit, -250, 250)))


def predict_proba(w: np.ndarray, b: float, X: np.ndarray) -> float:
    X = np.array(X)
    w = np.array(w)
    return sigmoid(np.dot(w, X) + b)


def predict(w: np.array, b: float, X: np.array) -> int:
    return 1 if predict_proba(w, b, X) > 0.5 else 0


def accuracy(w: np.ndarray, b: float, rdd_xy: RDD) -> float:
    preds = rdd_xy.map(lambda x: (int(predict(w, b, x[0]) == x[1]), 1)).reduce(
        lambda x, y: (x[0] + y[0], x[1] + y[1])
    )
    return preds[0] / preds[1]


def cost(w: np.ndarray, b: float, rdd_xy: RDD, lambda_reg: float, k: int, m: int) -> float:
    return (
        rdd_xy.map(lambda x: (predict_proba(w, b, x[0]), x[1]))
        .map(lambda x: (x[1] * np.log(x[0]) + (1 - x[1]) * np.log(1 - x[0])))
        .reduce(lambda x, y: x + y) / (-m) + lambda_reg * (w**2).sum() / (2 * k)
    )

### New functions

In [4]:
def transform(rdd_xy: RDD, blocks: int) -> RDD:
    return rdd_xy.map(lambda x: (x, np.random.randint(0, blocks))).cache()

def get_block_data(rdd_blocked: RDD, block_id: int) -> tuple[RDD, RDD]:
    train_rdd = rdd_blocked.flatMap(lambda x: [x[0]] if x[1] != block_id else [])
    test_rdd = rdd_blocked.flatMap(lambda x: [x[0]] if x[1] == block_id else [])
    return train_rdd, test_rdd

In [None]:
path = "data/botnet_reduced_10k_l.csv"  # botnet_tot_syn_l.csv // botnet_reduced_10k_l.csv
nIter = 20
leaning_rate =  1.5
lambda_reg = 0

# Read data
data = readFile(path).cache()
print(f"Total samples: {data.count()}")

# Standardize
data = normalize(data)
print("Data normalized.")

sums, n = data.map(lambda x: (x[0], 1)).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))
medias = sums / n
print(f"Feature means after normalization: {np.array2string(medias, precision=4, suppress_small=True)}")
desvs = (data.map(lambda x: (x[0]-medias)**2).reduce(lambda x, y: x + y) / (n-1))**0.5
print(f"Feature stds after normalization: {np.array2string(desvs, precision=4, suppress_small=True)}")

# Shuffle rows and transform data, specifying the number of blocks
num_blocks_cv = 5
data_cv = transform(data, num_blocks_cv).cache()
print(f"Data transformed for {num_blocks_cv}-fold CV.")

avg_acc = []
for i in range(num_blocks_cv):
    train_data, test_data = get_block_data(data_cv, i)
    w_b = train(train_data, nIter, leaning_rate, lambda_reg, show_logs=False)
    acc = accuracy(w_b[0], w_b[1], test_data)
    avg_acc.append(acc)
    print(f"  Block {i+1}, Accuracy: {acc:.4f}")

print(f"Average CV Accuracy: {np.mean(avg_acc):.4f}±{np.std(avg_acc):.4f}")

                                                                                

Total samples: 10000
Data normalized.
Feature means after normalization: [ 0.  0. -0.  0. -0. -0.  0. -0. -0.  0. -0.]
Feature stds after normalization: [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
Data transformed for 5-fold CV.
  Block 1, Accuracy: 0.9377
  Block 2, Accuracy: 0.9334
  Block 3, Accuracy: 0.9320
  Block 4, Accuracy: 0.9234
  Block 5, Accuracy: 0.9324
Average CV Accuracy: 0.9318±0.0047
