In [1]:
import gc
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from sklearn.cluster import AgglomerativeClustering


# Extracting the dataset

In [None]:
df = pd.read_csv('data/tracks.csv', index_col=0, header=[0, 1])
df

In [None]:
#df.columns

In [None]:
small_data = df[df[('set','subset')] == 'small'].index.to_list()

In [None]:
data = pd.read_csv('data/features.csv', header=[0,1,2], index_col=0)
data.head()

In [None]:
small_dataset = data[data.index.isin(small_data)]
small_dataset

In [None]:
# clean memory
del data
del df
del small_data
gc.collect()

# 1.1 - Hierarchical Clustering

In [None]:
clustering_stats = pd.DataFrame(columns=['num_clusters', 'radius', 'diameter', 'density_r2',
                                'density_d2', 'avg_radius', 'avg_diameter', 'avg_density_r2', 'avg_density_d2'])

for i in range(8, 17):
    # Hierarchical clustering of the dataset for k = 8 to k = 16
    clustering = AgglomerativeClustering(n_clusters=i).fit_predict(small_dataset)
    clustered = small_dataset.copy()
    clustered['cluster'] = clustering
    
    # Calculating the centroids assuming euclidean distance was used
    centroids = clustered.groupby("cluster").mean().values
    
    radius = {}
    density_r2 = {}
    density_d2 = {}
    diameter = {}
    
    # Calculate the radius, diameter and density of each cluster 
    # and add it to a dictionary
    for x in range(0, len(centroids)):
        points = clustered[clustered[('cluster', '', '')] == x].values.tolist()
        # remove the cluster number from the list of points
        points = [x.__delitem__(-1) for x in points]
        calc = max([np.linalg.norm(centroids[x], point) for point in points])
        radius[x] = calc
        diameter[x] = radius[x] * 2
        density_r2[x] = len(points) / (radius[x] ** 2)
        density_d2[x] = len(points) / (diameter[x] ** 2)
    
    # Calculate avg metrics
    avg_radius = np.mean(list(radius.values()))
    avg_diameter = np.mean(list(diameter.values()))
    avg_density_r2 = np.mean(list(density_r2.values()))
    avg_density_d2 = np.mean(list(density_d2.values()))

    # Add the metrics of each number of clusters to a dataframe to be easier to compare them
    df_row = {'num_clusters': i, 'radius': radius, 'diameter': diameter, 'density_r2': density_r2,
            'density_d2': density_d2, 'avg_radius': avg_radius, 'avg_diameter': avg_diameter, 
            'avg_density_r2': avg_density_r2, 'avg_density_d2': avg_density_d2}
    df_row = pd.DataFrame(df_row)
    
    clustering_stats = pd.concat([clustering_stats, df_row], ignore_index=True)
    
    

In [None]:
clustering_stats

In [None]:
clustering_stats.loc[clustering_stats.num_clusters == 8]

# 1.2 BRF Implementation

In [2]:
sc = SparkContext(appName="Assignment2_E1")

22/05/29 18:03:09 WARN Utils: Your hostname, Luiss-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.127 instead (on interface en0)
22/05/29 18:03:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/29 18:03:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/29 18:03:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
i = 0
def process_batch(batch):
    global i
    print(batch)
    chunk = np.array(batch)
    if i == 0:       
        #chunk = chunk[4:,:]
        #print(chunk)
        pass
    else:
        chunk = sc.parallelize(chunk)
        chunk = chunk.map(lambda line: line.split(","))
        pass
        
    return

In [None]:
# Helper function to read the file in chunks
with open('data/features.csv') as f:
    batch = []
    for line in f:
        #line = line.rstrip('\n').split(',')
        batch.append(line.rstrip('\n').split(','))
        if len(batch) == 8000:
            process_batch(batch)
            batch = []
process_batch(batch)


In [None]:
file = sc.textFile('data/features.csv')
# removing the headers
file = file.map(lambda line: line.split(","))
file = file.filter(lambda x: x[0].isnumeric())

In [None]:
point_sample = file.sample(withReplacement=False, fraction=0.05).collect()

In [None]:
file = file.filter(lambda x: x not in point_sample)

In [None]:
# splitting the file into 10 chunks of equal size to emulate batch reading
file = file.collect()

In [None]:
file = np.array(file)
file = np.split(file, 10, axis=1)


In [None]:
file.shape

## Step 1

In [None]:
DISCARD_SET = []
COMPRESSION_SET = []
RETAINED_SET = []
clusters_indices = {}

In [None]:

# Auxiliar functions
def centroid(stats):
    # centroid = SUM / N
    return stats[1] / stats[0]

def variance(stats):
    # variance = (SUMSQ / N) - np.square(SUM / N)
    return stats[2] / stats[0] - np.square((stats[1] / stats[0]))

#std = np.sqrt(variance)

def calculate_malahanobis(point, centroid, std_dev):
    #print(std_dev)
    return np.sqrt(np.sum(np.square((point-centroid)/std_dev)))


In [None]:
# Step 1
def start_bfr(initial_chunk, num_clusters):
    global clusters_indices
    
    # removing the index before the clustering
    points = initial_chunk[:,1:]
    initial_clusters = AgglomerativeClustering(n_clusters=num_clusters).fit(points)
    # indices of the points in each cluster
    clusters_indices = {cluster: np.where(initial_clusters.labels_ == cluster) for cluster in range(initial_clusters.n_clusters_)}
    # extracting the labels
    labels = np.array([[x] for x in initial_clusters.labels_])
    points = np.append(points, labels, axis=1)
    
    clusters = {cluster: [x[:-1] for x in points if x[-1] == cluster] for cluster in set(initial_clusters.labels_)}
    
    summarized_ds = calc_ds_stats(clusters)
    
    return summarized_ds


def calc_ds_stats(cluster):
    summarized_clusters = {}
    for cluster_id, points in cluster.items():
        N = len(points)
        SUM = np.sum(points, axis=0)
        SUMSQ = np.sum(np.square(points), axis=0)

        summarized_clusters[cluster_id] = (N, SUM, SUMSQ)

    return summarized_clusters

In [None]:
b = small_dataset.sample(frac=0.1, replace=False).copy()

In [None]:
b = b.reset_index()

In [None]:
small_dataset = small_dataset[~small_dataset.isin(b)].dropna()
small_dataset.shape

In [None]:
small_dataset = small_dataset.reset_index()

In [None]:
small_dataset = np.array(small_dataset)

In [None]:
b = np.array(b)

In [None]:
b = np.array(b)

In [None]:
summarized_ds = start_bfr(np.array(small_dataset), 8)

In [None]:
summarized_ds

In [None]:

for point in b:
    #c = [u for u in summarized_ds.values()]
    e = min([(calculate_malahanobis(point[1:], centroid(x[1]), np.sqrt(variance(x[1]))),x[0]) for x in summarized_ds.items()], 
            key= lambda x: np.isnan(x[0]))
    #a = min([calculate_malahanobis(point[1:], centroid(x), np.sqrt(variance(x))) for x in summarized_ds.values]) < 2 * np.sqrt(variance(x))
    print(e)

In [None]:
file = ''
for chunk in file:
    
    DISCARD_SET = chunk.filter(lambda point: (point, x) for point, x in 
                               min([calculate_malahanobis(point[1:], centroid(x), np.sqrt(variance(x))) for x in summarized_ds]) < 2*np.sqrt(variance(x))
                               )

In [None]:
# step 3
def process_chunk(chunk):
    #global summarized_ds
    #global clusters_indices
    
    # iterating through the chunk
    for point in chunk:
        min_distance = np.inf

        # calculating the minimum distance to a cluster
        for cluster, stats in summarized_ds:
            std = np.square(variance(stats))
            distance = calculate_malahanobis(point[1:], centroid(stats), std)
            if distance < min_distance:
                distance = min_distance
                label = cluster
                
        # checking if it is closer than the threshold: 2x STD
        if min_distance < 2 * std:
            # saving the point id to the clusters dictionary
            clusters_indices[label].append(point[0])
            
            # updating the statistics
            # statistics = (N, SUM, SUMSQ)
            # using point[1:] in order to remove the id from the point
            statistics = summarized_ds[label]
            N = statistics[0] +1
            SUM = statistics[1] + point[1:]
            SUMSQ = statistics[2] + np.square(point[1:])
            summarized_ds.update({label: (N, SUM, SUMSQ)})
        
        else:
            RETAINED_SET.append(point)   
    return

In [None]:
def process_retained():
    # distance threshold = 2x number of dimensions(517)
    cs_clusters = AgglomerativeClustering(n_clusters=None, distance_threshold=2*np.sqrt(518)).fit(RETAINED_SET)
    
    labels = np.array([[x] for x in cs_clusters.labels_])
    points = np.append(RETAINED_SET, labels, axis=1)
    clusters_idx = {cluster: [x[0] for x in points if x[-1] == cluster] for cluster in set(cs_clusters.labels_)}
    clusters = {cluster: [x[1:-1] for x in points if x[-1] == cluster] for cluster in set(cs_clusters.labels_)}
    
    return


In [None]:
def calc_cs_stats(grouping):
    return

In [None]:
def merge_cs():
    return