In [None]:
from pyspark import SparkContext
import findspark
import numpy as np

sc = SparkContext("local[*]", "Practica1")


## Ejercicio 1

In [None]:
def readFile (filename): 
    '''Arguments: 
    filename – name of the spam dataset file 
    12 columns: 11 features/dimensions (X) + 1 column with labels (Y) 
    Y -- Train labels (0 if normal traffic, 1 if botnet)  
    m rows: number of examples (m) 
    Returns: 
    An RDD containing the data of filename. Each example (row) of the file 
    corresponds to one RDD record. Each record of the RDD is a tuple (X,y).  
    “X” is an array containing the 11 features (float number) of an example  
    “y” is the 12th column of an example (integer 0/1) '''
    result = sc.textFile(filename)
    map_result = result.map(lambda row: [float(x) for x in row.split(",")])
    rdd_xy = map_result.map(lambda row: (row[:11],row[11]))
    return rdd_xy

In [None]:
# Prueba
data=readFile("./botnet_reduced_10k_l.csv")
data.take(2)

In [None]:
rows_rdd = data.map(lambda line: line[0])
print(rows_rdd.take(1))
cols_rdd = rows_rdd.flatMap(lambda row: [(i, (x,x*x, 1)) for i, x in enumerate(row)])
print(cols_rdd.take(5))
group_rdd = cols_rdd.reduceByKey(lambda a,b:(a[0]+b[0],a[1]+b[1],a[2]+b[2]))
print(group_rdd.take(1))
mean_rdd = group_rdd.map(lambda t: (t[0],(t[1][0] / t[1][2], np.sqrt((t[1][1] / t[1][2]) - (t[1][0] / t[1][2])**2))))
print(mean_rdd.collect())

broadcast_var = sc.broadcast(dict(mean_rdd.collect()))

In [None]:
def normalize (RDD_Xy): 
    '''Arguments: 
    RDD_Xy is an RDD containing data examples. Each record of the RDD is a tuple 
    (X,y). 
    “X” is an array containing the 11 features (float number) of an example 
    “y” is the label of the example (integer 0/1)  
    Returns: 
    An RDD rescaled to N(0,1) in each column (mean=0, standard deviation=1) '''
    def map_normalize (RDD_Xy): 
        result = []
        x, y = RDD_Xy
        var = broadcast_var.value
        for i, x in enumerate(x):
             mean_aux, std_aux = var[i]
             if(std_aux!=0):
                 result.append((x - mean_aux)/std_aux)
             else:
                 result.append(0.0)
        return result, y
        
    rdd_norm = RDD_Xy.map(map_normalize)
    return rdd_norm

In [None]:
rdd_norm = normalize(data)
print(rdd_norm.take(2))

In [None]:
# QUITAR: COMPROBACION SI DA 1 PARA CADA INDICE LA SUMA
import math
# comprobar que cada columna queda ~ media 0 y std 1
x_norm = rdd_norm.map(lambda xy: xy[0])

check = (
    x_norm
    .flatMap(lambda row: [(i, (v, v*v, 1)) for i, v in enumerate(row)])
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2]))
    .mapValues(lambda t: (
        t[0]/t[2],  # mean
        math.sqrt(max((t[1]/t[2]) - (t[0]/t[2])**2, 0.0))  # std
    ))
    .collect()
)

print(sorted(check, key=lambda x: x[0]))
# medias ~ 0, std ~ 1 (salvo redondeo numérico)

In [None]:
# PRIMERA PRUEBA TRAIN
import numpy as np
import math

def sigmoid(z):
    if z >= 0:
        ez = math.exp(-z)
        return 1.0 / (1.0 + ez)
    else:
        ez = math.exp(z)
        return ez / (1.0 + ez)

def _sample_grad(xy, w, b):
    X, y = xy
    X = np.asarray(X, dtype=np.float64)
    y = float(y)

    z = float(np.dot(w, X) + b)
    yhat = sigmoid(z)
    diff = yhat - y

    grad_w = diff * X      # vector (11,)
    grad_b = diff          # escalar
    return (grad_w, grad_b)

def train(RDD_Xy, iterations, learning_rate, lambda_reg):
    """
    Returns:
      [w, b]  (w: np.array de 11, b: float)
    """
    sc = RDD_Xy.context
    data = RDD_Xy.cache()

    m = data.count()
    if m == 0:
        raise ValueError("RDD_Xy vacío")

    k = len(data.first()[0])  # 11

    # inicialización
    rng = np.random.default_rng(42)
    w = rng.normal(0, 0.01, size=k).astype(np.float64)
    b = 0.0

    for _ in range(iterations):
        bc_w = sc.broadcast(w)
        bc_b = sc.broadcast(b)

        # suma de gradientes por todo el dataset
        sum_grad_w, sum_grad_b = data.map(
            lambda xy: _sample_grad(xy, bc_w.value, bc_b.value)
        ).reduce(
            lambda a, c: (a[0] + c[0], a[1] + c[1])
        )

        bc_w.unpersist()
        bc_b.unpersist()

        # promedio + L2
        grad_w = (sum_grad_w / m) + (lambda_reg / k) * w   # si tu rúbrica usa /k
        grad_b = (sum_grad_b / m)

        # update
        w = w - learning_rate * grad_w
        b = b - learning_rate * grad_b

    return [w, b]

In [None]:
def train (RDD_Xy, iterations, learning_rate, lambda_reg): 
    '''Arguments: 
    RDD_Xy --- RDD containing data examples. Each record of the RDD is a tuple 
    (X,y). 
    “X” is an array containing the 11 features (float number) of an example 
    “y” is the label of the example (integer 0/1)  
    iterations -- number of iterations of the optimization loop 
    learning_rate -- learning rate of the gradient descent 
    lambda_reg – regularization rate: l2 es el que vamos a aplicar
    
    Returns: 
    A list or array containing the weights “w” and bias “b”	at the end of the 
    training process'''	

    tuple_weigth_bias = (np.random.random(11), np.random.rand()) # ARRAY DONDE LOS 11 PRIMEROS ELEMENTOS SON LOS PESOS Y EL ULTIMO EL SESGO
    broadcast_var = sc.broadcast(tuple_weigth_bias)

    for i in range(iterations):
        # DERIVADAS DE PESOS Y SESGO
        ## el map recibe como parametro la fila (y hay un broadcast con el array de pesos y sesgo) y genera rdd_actualizacion = (indice, (peso, sesgo))
        # el reduce_by_key recibe como parametro en el rdd (indice, (peso, sesgo)) y devuelve (indice, (media_pesos, media_sesgo))

In [None]:
def accuracy (w, b, RDD_Xy): 
    '''Arguments: 
    w -- weights 
    b -- bias 
    RDD_Xy – RDD containing examples to be predicted  
    Returns: 
    accuracy -- the number of predictions that are correct divided by the number         
    of records (examples) in RDD_xy.  
    Predict function can be used for predicting a single example'''
    pred_ok = RDD_Xy.map(
            lambda xy: 1 if predict(w, b, xy[0]) == int(xy[1]) else 0
        )
    
        correct = pred_ok.reduce(lambda a, c: a + c)
        total = RDD_Xy.count()
        return correct / total if total > 0 else 0.0

In [None]:
def predict (w, b, X): 
    '''Arguments: 
    w -- weights 
    b -- bias 
    X – Example to be predicted  
     
    Returns: 
    Y_pred – a value (0/1) corresponding to the prediction of X '''
    threshold=0.5
    z = float(np.dot(np.asarray(w, dtype=float), np.asarray(X, dtype=float)) + float(b))
    p = sigmoid(z)
    return 1 if p >= threshold else 0

In [None]:
# read data
data=readFile(path)

# standarize
data = normalize(data)

ws = train(data,nIter,learningRate)
acc = accuracy(data,ws)
print("acc:",acc)

## Ejercicio 2

In [None]:
# read data
data=readFile(path)

# standarize
data = normalize(data)
num_blocks_cv=10

# shuffle rows and transform data, specifying the number of blocks
data_cv = transform(data,num_blocks_cv)

for i in range(num_blocks_cv):
    tr_data,test_data=get_block_data(data_cv,i)
    
    ws = train(data,nIter,learningRate)
    acc = accuracy(data,ws)
    print("acc:",acc)

print("average acc:" avg_acc)

In [None]:
sc.stop()