# k means ||

In [None]:
# import the python libraries to create/connect to a Spark Session
from pyspark.sql import SparkSession

# build a SparkSession 
#   connect to the master node on the port where the master node is listening (7077)
#   declare the app name 
#   configure the executor memory to 512 MB
#   either *connect* or *create* a new Spark Context
spark = SparkSession.builder \
    .master("spark://10.67.22.124:7077")\
    .appName("prova iniziale")\
    .config("spark.executor.memory", "512m")\
    .getOrCreate()

In [None]:
spark

In [None]:
# create a spark context
sc = spark.sparkContext

# print its status
sc

# Import data from sklearn (load in the disk)

This is going to be removed in the Cloud Veneto version


In [None]:
sc.setLogLevel("ERROR") # To hide warnings, commented to debug

In [None]:
!pip install scikit-learn # to be run at the launch of "docker compose up" 

In [None]:
import numpy as np
import pandas as pd
import random
import sklearn.datasets #va installato
import time
import matplotlib.pyplot as plt

from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import col
from pyspark.sql.functions import mean
from pyspark.sql.functions import stddev
from pyspark.sql.functions import rand
from pyspark.sql.functions import least
from pyspark.sql import Row
from pyspark.ml.feature import StandardScaler
from pyspark.sql.types import StringType

from operator import add
from functools import reduce

In [None]:
# Hyperparameters
k = 30
G = 5 # multiplying factor
len_df = 50_000 # 494_021

In [None]:
# # Import dataframe directly from sklearn
# prova =  sklearn.datasets.fetch_kddcup99(percent10 = True, as_frame = True)
# X = prova.data

# n_cols = len(X.iloc[0])

# n_cols = n_cols - 3 #because we will remove the binary variables

# # Create the spark dataframe with a smaller chunk of data (just to work easily)
# X_smaller = X.iloc[random.sample(range(0, len(X.index)), len_df)]
# spark_X = spark.createDataFrame(X_smaller)

# spark_X = spark_X.persist()

# Create a Spark dataframe
And then acquire important informations about the dataframe: # columns, # rows, # partitions, the schema, ...

In [None]:
%%time

spark_X = spark.createDataFrame(sklearn.datasets.fetch_kddcup99(percent10 = True, as_frame = True)['frame'].iloc[random.sample(range(0, len_df), len_df)])
# spark_X = spark.createDataFrame(sklearn.datasets.fetch_kddcup99(percent10 = True, as_frame = True)['frame'])
spark_X = spark_X.persist()


In [None]:
len_df = spark_X.count()

In [None]:
# Check the number of partitions the DataFrame is divided into the three workers yet
spark_X.rdd.getNumPartitions()

In [None]:
# Define a function that is able to select the single row in the dataframe
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])

In [None]:
# Delete the non-numerical values in an easy way
# clean_X = spark_X.select('*').drop('protocol_type', 'service', 'flag')
# clean_X.printSchema()

# Delete the non-numerical values in a reusable way
col_type = np.array(spark_X.dtypes)
types = col_type[:,1] 
colnames = col_type[:,0]
clean_X = spark_X.select([col(colnames[i]) for i in range(len(colnames)) if not types[i] == 'binary'])

In [None]:
# Define the number of rows and columns in the dataframe
n_rows = len_df
n_cols = len(getrows(clean_X, rownums=[0]).collect()[0])

In [None]:
n_cols

In [None]:
spark_X.printSchema()

# Step 0: Useful functions for the initialization

In [None]:
# Define the function for the distance between the cluster centers and the pandas dataframe
def distance(xrow, centers, num_cols=n_cols):
    '''Distance between a dataframe.row
    and the broadcasted value list of centers
    in the form of dataframe.row.value:
    
    broadC = sc.broadcast(center_rows).value
    xrow = clean_X.collect()[1]
    '''
    x = np.array(xrow)[:num_cols]
    the_ds = np.zeros(len(centers))
    
    for c in range(len(centers)):
        c_array = np.array(centers[c])[:num_cols]
        dist2 = np.linalg.norm(x - c_array)**2
        the_ds[c] = dist2
        
    return np.min(the_ds)

# Formula to evaluate the oversamping factor with the pre-factor G
def evaluate_l(log_phi, k, G):
    return G * k/log_phi # G = over-oversampling factor

# Function to select a row based on its probability
def select_row(x):
    if x > np.random.uniform(low = 0, high = 1):
        return True
    else:
        return False


### This is NOT $\Phi$. This is $log(\Phi)$. Pay attention budeo.

# Step 1: Choose a random sample from the dataset
This is the required step to begin the algorithm (doesn't need to be parallelized, since it is a select task)

In [None]:
# Choose the first sample randomly: select the random row
random_n = [np.random.randint(0, n_rows)]
random_sample = getrows(clean_X, random_n).collect()

# Step 2: Initial cost
Then we evaluate the cost function (sum of the squares after the first selection)

In [None]:
colnames = list(clean_X.dtypes[i][0] for i in range(len(clean_X.dtypes)))

In [None]:
from pyspark.sql.functions import lit

clean_X = clean_X.withColumn("minimum_cost", lit(0))
for colname in colnames:
    clean_X = clean_X.withColumn("minimum_cost", clean_X['minimum_cost'] + (clean_X[colname]-random_sample[0][colname])**2)

In [None]:
# Add the colunm to keep the minimum distance between each point and the closest center
clean_X = (clean_X.select('*')
           .withColumn('minimum_cost', sum((col(colname)-random_sample[0][colname])**2 for colname in colnames)))

In [None]:
prova = sum((col(colname)-random_sample[0][colname])**2 for colname in colnames)

In [None]:
# Initial cost
initial_cost = np.log(clean_X.agg({"minimum_cost": "sum"}).collect()[0][0])
initial_cost

In [None]:
# Add the newly introduced value to the row that I selected in the first place
temp = random_sample[0].asDict()
temp["minimum_cost"] = 0.
random_sample[0] = Row(**temp)

In [None]:
# Broadcasting the row over the workers
bCent = sc.broadcast(random_sample)

# To show the content of the broadcast: b.value

## Step 2.5: evaluate number of iterations

After having evaluated $\log \phi$, we find a number of iterations which is of the same order of magnitude.

In [None]:
n_iter = int(initial_cost)
n_iter

# Step 3: For loop
Implement the for loop in order to evaluate probabilities and choose new centers



In [None]:
phi_iter = initial_cost
l = evaluate_l(phi_iter, k, G)
l

In [None]:
%%time

i = 0
last_centers = 1
start_time = time.time()

while i < n_iter:   # or len(bCent.value) < k: # added as a safety condition (useless for very long datasets)

        '''
        Nel ciclo for:
            - Evaluate for each row l * d()^2 / phi
            - Sample with that probability
            - Broadcast centers to nodes
            - Evaluate new cost

        Ricordiamoci che distance è già al quadrato e che phi è il logaritmo
        '''

        # Evaluate the probability and select the new rows
        mod_phi = l/np.exp(phi_iter)
        new_rows = clean_X.select('*').withColumn('random_number', rand(seed=int(time.time())))\
                          .filter(col('random_number') < col('minimum_cost')*mod_phi).drop('random_number').collect()

        print("--- %s seconds ---" % (time.time() - start_time))
        start_time = time.time()
        
        # Update the broadcast in a non-usual way (newest at the beginning)
        bCent = sc.broadcast(new_rows + bCent.value)
        
        print("--- %s seconds ---" % (time.time() - start_time))
        start_time = time.time()

        # Update the minimum distance
        if len(new_rows) == 1:
            clean_X = clean_X.select('*').\
                      withColumn('minimum_cost', least('minimum_cost', reduce(add, [(col(colname)-bCent.value[0][colname])**2 for colname in colnames]))).cache()

        elif len(new_rows) > 1:
            clean_X = clean_X.select('*').withColumn('dummy', least(*[reduce(add, [(col(colname)-bCent.value[center][colname])**2 for colname in colnames]) for center in range(len(new_rows))] )).\
                      withColumn('minimum_cost', least('minimum_cost', 'dummy')).\
                      drop('dummy').cache()

        last_centers = len(bCent.value)

        # Evaluate new cost
        phi_iter = np.log(clean_X.agg({"minimum_cost": "sum"}).collect()[0][0])

        print("--- %s seconds ---" % (time.time() - start_time))
        start_time = time.time()

        i += 1

        print("\n at iteration", i ,", #values: ",len(bCent.value), " \n")

for each row evaluate distance from new points --> if it's less than mindistance substitute

In [None]:
clean_X.printSchema()

In [None]:
bCent.value

# Step 4: Select a subset of the possible centroids using k-means ++

Using kmeans ++:

Algorithm 1 k-means++(k) initialization.
1: C ← sample a point uniformly at random from X
2: while |C| < k do
2
3:
Sample x ∈ X with probability dφX(x,C)
(C)
4:
C ← C ∪ {

#### Function to find closest center among list

In [None]:
# Define the function to find the closest center from any point in order to find the weights
def find_closest_center(xrow, broadC, num_cols=n_cols):
    '''Find the center in the broadcasted value 
    list whose distance from a dataframe.row 
    is the lowest:
    
    broadC = sc.broadcast(center_rows)
    xrow = clean_X.collect()[1]
    '''
    x = np.array(xrow)[:num_cols]
    centers = broadC.value
    the_ds = np.zeros(len(centers))
    
    for c in range(len(centers)):
        c_array = np.array(centers[c])[:num_cols]
        dist2 = np.linalg.norm(x - c_array)**2
        the_ds[c] = dist2
        
    return np.argmin(the_ds)

In [None]:
dfCent = spark.createDataFrame(bCent.value) 

In [None]:
over_sampled_centers = dfCent.count()

In [None]:
dfCent.rdd.getNumPartitions()

In [None]:
len(bCent.value)

## Reduce size of dfCent to have only `k` centroids 

#### Step 1: calc weights $w_x$
set wx to be the number of points in X closer to x than any other point in C

In [None]:
%%time

wx = clean_X.rdd.map(lambda row: (find_closest_center(row,bCent), 1)).\
        reduceByKey(lambda x,y: x+y).\
        takeOrdered(over_sampled_centers)

In [None]:
wx

In [None]:
type(wx)

In [None]:
wx = (np.array(wx)[:,1]).astype(float)
wx

In [None]:
wx /= np.sum(wx)
wx

In [None]:
# Maybe we need to get rid of bCent ??
# bCent.destroy()
# bCent.unpersist()

#### Step 2: Use weighted `k-means++`
    2.1 Draw 1 random center
    2.2 update cost function
    2.3 repeat 1-2 until happy

In [None]:
# 2.1 Draw 1 random center

first_index = np.random.choice(a=range(wx.shape[0]), size=1, p=wx) 
#first point sampled uniformly wrt distance

first_index

In [None]:
first_center = getrows(dfCent, first_index).collect()
first_center

In [None]:
# Adds the first center to the ultimate center
bCent_ultimate = sc.broadcast(first_center)
bCent_ultimate.value

In [None]:
# Define the total distance from other centers (minimum cost) over the chosen center
dfCent = (dfCent.select('*')
         .withColumn('minimum_cost', sum((col(colname)-first_center[0][colname])**2 for colname in colnames)))

In [None]:
# Initialize the total cost value
phi0 = np.log(dfCent.agg({"minimum_cost": "sum"}).collect()[0][0])
phi0

In [None]:
wx

In [None]:
%%time

# 2.2 Update cost function

ultimate_sample_n = len(bCent_ultimate.value)

while ultimate_sample_n < k: #n_iter or len(bCent.value) < k:
    '''
    Nel ciclo for:
        - Evaluate for each row wx * d()^2 / phi
        - Sample with that probability
        - Broadcast centers to nodes
        - Evaluate new cost
        
    Ricordiamoci che distance è già al quadrato e che phi è il logaritmo
    '''
    
    # Evaluate the probability and select the new rows
    rows_prob = np.array(dfCent.rdd\
                .map(lambda row: distance(row,bCent_ultimate.value)).collect())
    # Forse pensare ad un modo alternativo di fare questo conto qui.... crescita lineare di bCent_ultimate
    
    # Sample new weighted random center
    another_index = np.random.choice(a=range(wx.shape[0]), size=1, p = rows_prob*wx/(rows_prob@wx) ) 
    another_center = getrows(dfCent, another_index).collect()
    
    # Update the broadcast
    bCent_ultimate = sc.broadcast(another_center + bCent_ultimate.value)

    # For consistency reason:    
    # Update minimum cost
    dfCent = dfCent.select('*').\
    withColumn('minimum_cost', least('minimum_cost', sum((col(colname)-another_center[0][colname])**2 for colname in colnames) )).cache()
    
    # Evaluate new cost
    phi0 = np.log(dfCent.agg({"minimum_cost": "sum"}).collect()[0][0])
    
    ultimate_sample_n = len(bCent_ultimate.value)
    print("i: ",ultimate_sample_n)
    

## k means

In [None]:
from pyspark.sql.functions import row_number
from pyspark.sql import Window
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import when


In [None]:
%%time

final_df = clean_X.withColumn("closest", lit(None).cast(StringType()))

for centro in range(k):
    print(centro)
    final_df = final_df.withColumn("closest",
            when((sum((col(colname)-bCent_ultimate.value[centro][colname])**2 for colname in colnames) == col("minimum_cost")) & (final_df["closest"].isNull()), str(centro))
            .otherwise(final_df["closest"])).cache()



In [None]:
%%time

# Update new centers:
new_centers = final_df.groupBy('closest').agg(*[mean(c).alias(c) for c in colnames]).drop('closest').collect()
bCent_ultimate = sc.broadcast(new_centers)

In [None]:
%%time
# Update the minimum cost function
clean_X = clean_X.select('*').withColumn('minimum_cost', least(*[reduce(add, [(col(colname)-new_centers[center][colname])**2 for colname in colnames]) for center in range(len(new_centers))] )).cache()


In [None]:
%%time

# Add indexes to clean_X
clean_X = clean_X.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())))

# Loop
n = 0
phi1 = phi0
still_different = True
eps = 1e-4

while still_different:

    # # Create the weights and switch to indexed dataframe
    # weight_X = clean_X.rdd.map(lambda row: find_closest_center(row,bCent_ultimate)).collect()
    # dfw = spark.createDataFrame(pd.DataFrame({'closest':weight_X})) #?ocio che sarà lungo con tutto il dataset in uso
    # dfw = dfw.withColumn("index",row_number().over(Window.orderBy(monotonically_increasing_id())))

    # # Join with the initial clean_X
    # final_df = clean_X.join(dfw, on="index").drop('index')

    final_df = clean_X.withColumn("closest", lit(None).cast(StringType()))

    for centro in range(k):
        print(centro)
        final_df = final_df.withColumn("closest",
                when((sum((col(colname)-bCent_ultimate.value[centro][colname])**2 for colname in colnames) == col("minimum_cost")) & (final_df["closest"].isNull()), str(centro))
                .otherwise(final_df["closest"])).cache()

    # Update new centers:
    new_centers = final_df.groupBy('closest').agg(*[mean(c).alias(c) for c in colnames]).drop('closest').collect()
    bCent_ultimate = sc.broadcast(new_centers)

    # Update the minimum cost function
    clean_X = clean_X.select('*').withColumn('minimum_cost', least(*[reduce(add, [(col(colname)-new_centers[center][colname])**2 for colname in colnames]) for center in range(len(new_centers))] )).cache()

    # Evaluate new cost
    phi1 = np.log(clean_X.agg({"minimum_cost": "sum"}).collect()[0][0])
    print('Cost variation:', abs(phi1 - phi0)/phi0)
    if abs(phi1 - phi0)/phi0 < eps:
        still_different = False
    else:
        phi0 = phi1

    n += 1
    
print('It took', n, 'iterations to converge')

In [None]:
src_np = clean_X.select("src_bytes").collect()

In [None]:
dst_np = clean_X.select("dst_bytes").collect()

In [None]:
plt.scatter(src_np[1:], dst_np[1:])

for i in range(len(bCent_ultimate.value)):
   plt.scatter(bCent_ultimate.value[i]["src_bytes"],bCent_ultimate.value[i]["dst_bytes"],color='black', alpha = 0.2)


In [None]:
'''
- Prendere la lista dei centroidi e farla diventare un distributed dataset
- Fare una NUOVA lista di centroidi 
- for i in range(k):
    pesca un singolo dato con probabilità d(x)/phi --> come fare?
    aggiungilo alla lista

'''

# TO DO:
    - Capire come scegliere l
    - Fare la normalizzazione dei dati (se la facciamo)
    - Capire se abbiamo ottimizzato nel modo giusto
    - Implementare tutto k means (optional)
    - Creare un file gemello con un dataframe visualizzabile e vedere se ha senso
    - Iniziare a fare i benchmark su cloud veneto
    - Grafici etc...

# Stop worker and master
Stop the running Spark context (sc) and Spark session (spark)

In [None]:
# sc.stop()
# spark.stop()

Finally, use `docker compose down` to stop and clear all running containers.