<a ><img src="https://upload.wikimedia.org/wikipedia/commons/thumb/b/b9/Logo_INSA_Lyon_%282014%29.svg/langfr-2560px-Logo_INSA_Lyon_%282014%29.svg.png"  width="200" align="left"> </a>
<div style="text-align: right"> <a ><img src="https://2024.debs.org/img/debs-logo_keep.png"  width="200" align="right"> </a> </div>

<br>
<br>
<br>


<h1><center>DEBS 2024 </center></h1>
<h2><center> <span style="font-weight:normal"><font color='#e42618'> 18TH ACM INTERNATIONAL CONFERENCE ON DISTRIBUTED AND EVENT-BASED SYSTEMS (June 25th–28th 2024, Lyon, France)</font>  </span></center></h2>


<h3><center><font color='gray'>ABNER | JONAS | KEVIN | BENJAMIN</font></center></h3>

As a part of the _Cloud Computing and Big Data Application_ course we participate at the Grand Challenge:
    
    
> The 2024 DEBS Grand Challenge focuses on real-time complex event processing of real-world telemetry data provided by Backblaze (https://www.backblaze.com/).The dataset used for the Grand Challenge is based on fine-granular telemetry data about over 200k hard drives in data centers operated by Backblaze. The goal of the challenge is to identify models that exhibit comparable behavior with respect to a set of attributes best suited to predict failures. Further details on the dataset provided, the queries, non-functional requirements and the overall submission process can be found here: https://2024.debs.org/call-for-grand-challenge-solutions/.


This year’s DEBS Grand Challenge requires you to implement two queries
- Query 1: Count of the recent number of failures detected for each vault (group by storage servers) (Continuous Querying)
- Query 2: Use this number to continuously compute a cluster of the drives (Streaming K-Means)

Input data consists of batches and each batch contains
- SMART readings for a list of drives (```s_1, ..., s_242```)
- ```vault_ids```: a list of vault identifiers of interest for this batch (used in Q1, see below)
- ```cluster_ids```: a list of cluster identifiers of interest for this batch (used in Q2, see below)
- ```day_end```: a flag that marks the end of one day of readings (could not be found - will be created)

#### Requirements
- ```Python 3.9.18``` (conda env)
- ```pip freeze > requirements.txt```
- ```conda env export > environment.yml```

## Our approach

We opted for Apache Spark, not only because some of us already gave a short presentation on Spark, but because of its polyglot nature and vast implementations of different libraries. Also we can easily start a data stream from a folder of ```CSV``` files!

In [41]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
#from pyspark.sql.functions import *
from pyspark.sql.functions import window, sum as _sum

# Exploration

In [42]:
spark = SparkSession.builder.appName("StreamingFromCSV").getOrCreate()

# load df from csv
df = spark.read.csv("streaming_data_dir/drivedata-small.csv", header=True, inferSchema=True)



                                                                                

In [43]:
# show only rows where failure is not 0
df.where(df.failure != 0).show()


+-------------------+-------------+--------------------+-------+--------+------------------+-------------------------+---------------+-------------------+---------------------------+------------------+------------------------+-----------------+--------------------+---------------------+------------------------+--------------------------------+-------------------------+----------------------------------+--------------------+--------------------+----------------------------+-----------------------+----------------------------+----------------------------+------------------------+---------------------------+----------------------------+---------------------------+--------------------------+-------------------------+--------------------------+---------------+-----------------+---------------------+-----------------+----------------------+-----------------------+--------------------+
|               date|serial_number|               model|failure|vault_id|s1_read_error_rate|s2_throughput_pe

                                                                                

In [44]:
# show the column names and types
df.printSchema()


root
 |-- date: timestamp (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- model: string (nullable = true)
 |-- failure: integer (nullable = true)
 |-- vault_id: integer (nullable = true)
 |-- s1_read_error_rate: integer (nullable = true)
 |-- s2_throughput_performance: integer (nullable = true)
 |-- s3_spin_up_time: integer (nullable = true)
 |-- s4_start_stop_count: integer (nullable = true)
 |-- s5_reallocated_sector_count: integer (nullable = true)
 |-- s7_seek_error_rate: long (nullable = true)
 |-- s8_seek_time_performance: integer (nullable = true)
 |-- s9_power_on_hours: integer (nullable = true)
 |-- s10_spin_retry_count: integer (nullable = true)
 |-- s12_power_cycle_count: integer (nullable = true)
 |-- s173_wear_leveling_count: long (nullable = true)
 |-- s174_unexpected_power_loss_count: integer (nullable = true)
 |-- s183_sata_downshift_count: integer (nullable = true)
 |-- s187_reported_uncorrectable_errors: integer (nullable = true)
 |-- s188_command_

In [45]:
# load cluster centers from csv
cluster_centers = spark.read.csv("cluster_data_dir/clusters.csv", header=True, inferSchema=True)

In [46]:
# show the column names and types
cluster_centers.printSchema()

root
 |-- cluster: integer (nullable = true)
 |-- s1_read_error_rate: double (nullable = true)
 |-- s2_throughput_performance: double (nullable = true)
 |-- s3_spin_up_time: double (nullable = true)
 |-- s4_start_stop_count: double (nullable = true)
 |-- s5_reallocated_sector_count: double (nullable = true)
 |-- s7_seek_error_rate: double (nullable = true)
 |-- s8_seek_time_performance: double (nullable = true)
 |-- s9_power_on_hours: double (nullable = true)
 |-- s10_spin_retry_count: double (nullable = true)
 |-- s12_power_cycle_count: double (nullable = true)
 |-- s173_wear_leveling_count: double (nullable = true)
 |-- s174_unexpected_power_loss_count: double (nullable = true)
 |-- s183_sata_downshift_count: double (nullable = true)
 |-- s187_reported_uncorrectable_errors: double (nullable = true)
 |-- s188_command_timeout: double (nullable = true)
 |-- s189_high_fly_writes: double (nullable = true)
 |-- s190_airflow_temperature_cel: double (nullable = true)
 |-- s191_g_sense_error_

In [47]:
# drop _c35: string (nullable = true) from cluster_centers
cluster_centers = cluster_centers.drop("_c35")

_c35: string (nullable = true) is just an empty cell

We use date and vault ID to cluster all features s1 through s242 and re-write the centers

In [48]:

schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("serial_number", StringType(), True),
    StructField("model", StringType(), True),
    StructField("failure", IntegerType(), True),
    StructField("vault_id", IntegerType(), True),
    StructField("s1_read_error_rate", IntegerType(), True),
    StructField("s2_throughput_performance", IntegerType(), True),
    StructField("s3_spin_up_time", IntegerType(), True),
    StructField("s4_start_stop_count", IntegerType(), True),
    StructField("s5_reallocated_sector_count", IntegerType(), True),
    StructField("s7_seek_error_rate", IntegerType(), True),
    StructField("s8_seek_time_performance", IntegerType(), True),
    StructField("s9_power_on_hours", IntegerType(), True),
    StructField("s10_spin_retry_count", IntegerType(), True),
    StructField("s12_power_cycle_count", IntegerType(), True),
    StructField("s173_wear_leveling_count", IntegerType(), True),
    StructField("s174_unexpected_power_loss_count", IntegerType(), True),
    StructField("s183_sata_downshift_count", IntegerType(), True),
    StructField("s187_reported_uncorrectable_errors", IntegerType(), True),
    StructField("s188_command_timeout", IntegerType(), True),
    StructField("s189_high_fly_writes", IntegerType(), True),
    StructField("s190_airflow_temperature_cel", IntegerType(), True),
    StructField("s191_g_sense_error_rate", IntegerType(), True),
    StructField("s192_power_off_retract_count", IntegerType(), True),
    StructField("s193_load_unload_cycle_count", IntegerType(), True),
    StructField("s194_temperature_celsius", IntegerType(), True),
    StructField("s195_hardware_ecc_recovered", IntegerType(), True),
    StructField("s196_reallocated_event_count", IntegerType(), True),
    StructField("s197_current_pending_sector", IntegerType(), True),
    StructField("s198_offline_uncorrectable", IntegerType(), True),
    StructField("s199_udma_crc_error_count", IntegerType(), True),
    StructField("s200_multi_zone_error_rate", IntegerType(), True),
    StructField("s220_disk_shift", IntegerType(), True),
    StructField("s222_loaded_hours", IntegerType(), True),
    StructField("s223_load_retry_count", IntegerType(), True),
    StructField("s226_load_in_time", IntegerType(), True),
    StructField("s240_head_flying_hours", IntegerType(), True),
    StructField("s241_total_lbas_written", IntegerType(), True),
    StructField("s242_total_lbas_read", IntegerType(), True)
])

#read from csv file
#drivedata = spark.readStream.csv("streaming_data_dir/drivedata-small.csv", schema=schema, header=True)

# Read the CSV files as a data stream
streamingData = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv("streaming_data_dir")
# print true if both are true
print((streamingData.schema == schema)==( streamingData.isStreaming == True))

True


# First Query

- Size = 30 days
- Slide = 1 day
- For every vault $v$, count the number of failures $NF_v$ in a sliding window $W$
- For a given day $i$, $NF_v^i$ is the count of failures in the window that starts at $i-31$ (included) and ends at $i-1$ (included)
- The first window closes at day 0: you can assume 0 failures for every day $i <= 0$
- Each batch of input data will contain the identifiers of 5 vaults: you are to return the current value of $NF_v$ for those vaults

In [None]:

# Apply a window function to the streaming data
windowedData = streamingData \
    .withWatermark("date", "31 days") \
    .groupBy(
        streamingData.vault_id,
        window(streamingData.date, "30 days", "1 day"),
        streamingData.model
    ) \
    .agg(_sum("failure").alias("total_failures"))

# Select only the date and vault_id fields
selectedData = windowedData.select("window.start", "vault_id", "total_failures")
filteredData = selectedData.filter(selectedData.total_failures > 0)

# Write the windowed data stream out to a memory sink for testing
query = filteredData \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("filteredData") \
    .option("numRows", 50) \
    .option("truncate", "false") \
    .start()



24/01/31 15:37:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/p0/qbnst5tj08g1z35zyllsm5vc0000gn/T/temporary-9889bdad-7c46-47b2-b432-896e8d914660. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/31 15:37:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.




In [50]:
query.awaitTermination()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+--------+--------------+
|start              |vault_id|total_failures|
+-------------------+--------+--------------+
|2023-03-31 02:00:00|1149    |1             |
|2023-06-05 02:00:00|1042    |1             |
|2023-06-01 02:00:00|1097    |1             |
|2023-05-24 02:00:00|1042    |1             |
|2023-06-17 02:00:00|1406    |1             |
|2023-03-13 01:00:00|1124    |1             |
|2023-06-06 02:00:00|1406    |1             |
|2023-06-02 02:00:00|1406    |1             |
|2023-05-31 02:00:00|1042    |1             |
|2023-04-09 02:00:00|1149    |1             |
|2023-06-11 02:00:00|1042    |1             |
|2023-05-27 02:00:00|1406    |1             |
|2023-06-01 02:00:00|1406    |1             |
|2023-04-03 02:00:00|1149    |1             |
|2023-06-13 02:00:00|1042    |1             |
|2023-03-09 01:00:00|1124    |1             |
|2023-03-10 01:00:00|1124    

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computing-and-Big-Data-Applications/.conda/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computing-and-Big-Data-Applications/.conda/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computing-and-Big-Data-Applications/.conda/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [51]:
query.stop()

# Second query

- For each reading at day $i$ regarding a drive $d$ belonging to vault $v$, add $NF_v^i$ to the reading
- Rescale the smart values within the provided ranges
- Compute dynamic K-means clustering:
    - Assign incoming readings to the nearest centroid
    - At the end of one day, which is marked by a ```day_end``` flag in the batch, update the centroids positions with the average coordinates of all the readings currently associated to that centroid
    - The initial coordinates of centroids are provided 
    - Each batch of input data will contain a list of cluster identifiers ($0 - 49$) 
    - Return the number of drives associated to each of those clusters

## Our preliminary approach
- Use data stream from folder with CSV file
- Batch by day, because there are no day_end flags
- Normalize the data within the given ranges
- Start KMeans Model initialized by CSV centers
- Return the assigned labels for the batched data 
- Count the drives (entries) by label ($0-49$)
- Return new centers after end of day and training

we use min max scaling:
$$
x_{\text {scaled }}=\frac{x-x_{\min }}{x_{\max }-x_{\min }}
$$
and scale it to the desired range with data from the ```norm.csv```
by multiplying with the difference from (```upper_thresh```-```lower_thresh```) and adding the lower_thresh.

In [32]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
#from pyspark.sql.functions import *
from pyspark.sql.functions import window, col, sum as _sum

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.clustering import StreamingKMeans

schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("serial_number", StringType(), True),
    StructField("model", StringType(), True),
    StructField("failure", IntegerType(), True),
    StructField("vault_id", IntegerType(), True),
    StructField("s1_read_error_rate", IntegerType(), True),
    StructField("s2_throughput_performance", IntegerType(), True),
    StructField("s3_spin_up_time", IntegerType(), True),
    StructField("s4_start_stop_count", IntegerType(), True),
    StructField("s5_reallocated_sector_count", IntegerType(), True),
    StructField("s7_seek_error_rate", IntegerType(), True),
    StructField("s8_seek_time_performance", IntegerType(), True),
    StructField("s9_power_on_hours", IntegerType(), True),
    StructField("s10_spin_retry_count", IntegerType(), True),
    StructField("s12_power_cycle_count", IntegerType(), True),
    StructField("s173_wear_leveling_count", IntegerType(), True),
    StructField("s174_unexpected_power_loss_count", IntegerType(), True),
    StructField("s183_sata_downshift_count", IntegerType(), True),
    StructField("s187_reported_uncorrectable_errors", IntegerType(), True),
    StructField("s188_command_timeout", IntegerType(), True),
    StructField("s189_high_fly_writes", IntegerType(), True),
    StructField("s190_airflow_temperature_cel", IntegerType(), True),
    StructField("s191_g_sense_error_rate", IntegerType(), True),
    StructField("s192_power_off_retract_count", IntegerType(), True),
    StructField("s193_load_unload_cycle_count", IntegerType(), True),
    StructField("s194_temperature_celsius", IntegerType(), True),
    StructField("s195_hardware_ecc_recovered", IntegerType(), True),
    StructField("s196_reallocated_event_count", IntegerType(), True),
    StructField("s197_current_pending_sector", IntegerType(), True),
    StructField("s198_offline_uncorrectable", IntegerType(), True),
    StructField("s199_udma_crc_error_count", IntegerType(), True),
    StructField("s200_multi_zone_error_rate", IntegerType(), True),
    StructField("s220_disk_shift", IntegerType(), True),
    StructField("s222_loaded_hours", IntegerType(), True),
    StructField("s223_load_retry_count", IntegerType(), True),
    StructField("s226_load_in_time", IntegerType(), True),
    StructField("s240_head_flying_hours", IntegerType(), True),
    StructField("s241_total_lbas_written", IntegerType(), True),
    StructField("s242_total_lbas_read", IntegerType(), True)
])

#read from csv file
#drivedata = spark.readStream.csv("streaming_data_dir/drivedata-small.csv", schema=schema, header=True)
spark = SparkSession.builder.appName("StreamingFromCSV").getOrCreate()

# Read the CSV files as a data stream
streamingData = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv("streaming_data_dir")
# print true if both are true
print((streamingData.schema == schema)==( streamingData.isStreaming == True))


True


In [33]:
# Load initial center points from a CSV file
centers_df = spark.read.csv("cluster_data_dir/clusters.csv", header=True, inferSchema=True)

# drop _c35: string (nullable = true) from cluster_centers
centers_df = centers_df.drop("_c35")
centers_df = centers_df.drop("cluster")
# Convert the DataFrame to a list of DenseVector
centers = [Vectors.dense(row) for row in centers_df.collect()]

# we load the norm data from csv
norm_data = spark.read.csv("cluster_data_dir/norm.csv", header=True, inferSchema=True)

# Define the model
#model = StreamingKMeans(k=len(centers), decayFactor=1.0)

In [34]:
from pyspark.sql.functions import col

# create list of weights for each center point
#weights = [1.0 for center in centers]

#model.setInitialCenters(centers, weights)

# List of columns to ignore
ignore_columns = ['date', 'serial_number', 'model', 'failure', 'vault_id']

# Get the list of columns to normalize
columns_to_normalize = [col for col in streamingData.columns if col not in ignore_columns]


In [None]:
from pyspark.sql.functions import max, min
# fall back values if no data is available in the current batch
batchData = spark.read.format("csv").option("header", "true").load("cluster_data_dir/drivedata-tiny.csv")

max_values = {column: int(batchData.agg(max(column)).first()[0]) if batchData.agg(max(column)).first()[0] is not None and batchData.agg(max(column)).first()[0].isdigit() else 1 for column in columns_to_normalize}
min_values = {column: int(batchData.agg(min(column)).first()[0]) if batchData.agg(min(column)).first()[0] is not None and batchData.agg(min(column)).first()[0].isdigit() else 0 for column in columns_to_normalize}


In [60]:
from pyspark.sql.functions import col
# Calculate the maximum and minimum values

def min_max_normalization(df, column, norm_data, min_val, max_val):
    # Get the min and max values of the column from norm_data
    norm_row = norm_data.filter(norm_data['column'] == column).first()
    if norm_row is not None:  # Check if a matching row was found
        lower_thresh = norm_row['min']
        upper_thresh = norm_row['max']
        scaler = upper_thresh-lower_thresh
        # if min(col(column)).isdigit():
        #     min_val = min(column)
        # if max(col(column)).isdigit():
        #     max_val = max(column)
        #try to turn into int, if not possible (because its None), set to 0

        # Perform min-max normalization and rescaling
        #normalized_column = (col(column)) / (dif)
        normalized_column =  lower_thresh + scaler*((col(column) - min_val)/(max_val-min_val)) 
        
        return normalized_column
    else:
        raise ValueError(f"No normalization data found for column {column}")


# Apply min_max_normalization on all columns
for column in columns_to_normalize:
    
    normalized_column = min_max_normalization(streamingData, column, norm_data, min_values[column], max_values[column])
    finalstreamingData = streamingData.withColumn(column, normalized_column)

# Start the streaming query
query = finalstreamingData \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

24/01/31 15:50:02 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/p0/qbnst5tj08g1z35zyllsm5vc0000gn/T/temporary-53f25016-b424-4dc9-b4c1-f6a665825f6d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/31 15:50:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computing-and-Big-Data-Applications/.conda/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computing-and-Big-Data-Applications/.conda/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computing-and-Big-Data-Applications/.conda/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

+-------------------+--------------+--------------------+-------+--------+------------------+-------------------------+---------------+-------------------+---------------------------+------------------+------------------------+-----------------+--------------------+---------------------+------------------------+--------------------------------+-------------------------+----------------------------------+--------------------+--------------------+----------------------------+-----------------------+----------------------------+----------------------------+------------------------+---------------------------+----------------------------+---------------------------+--------------------------+-------------------------+--------------------------+---------------+-----------------+---------------------+-----------------+----------------------+-----------------------+--------------------+
|               date| serial_number|               model|failure|vault_id|s1_read_error_rate|s2_throughput_

In [61]:
query.stop()

## KMeans

### initial model

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.clustering import KMeansModel


# Ensure all columns in centers_df are numeric and replace nulls with 0
for column in centers_df.columns:
    centers_df = centers_df.withColumn(column, col(column).cast("double")).na.fill(0)

# Assemble the features into a single vector
assembler = VectorAssembler(inputCols=centers_df.columns, outputCol="features")
data = assembler.transform(centers_df)

# Train an initial KMeans model on centers_df
kmeans = KMeans(k=centers_df.count(), seed=1)  # Adjust the number of clusters k as needed
kmeans.maxIter = 1  # Force the algorithm to train only 1 iteration
initial_model = kmeans.fit(data.select('features'))

# Save the model
initial_model.save("/model/initial_model")

### return labels

In [53]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.clustering import KMeansModel

initial_model = KMeansModel.load("/model/initial_model")

def kmeans_centers(df, initial_model):
    # Ensure all columns are numeric and replace nulls with 0
    for column in df.columns:
        df = df.withColumn(column, col(column).cast("double")).na.fill(0)

    # Assemble the features into a single vector
    assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
    data = assembler.transform(df)

    # Load a KMeans model with the initial model
    #TBD
    model = initial_model

    # Make predictions
    predictions = model.transform(data)

    # train on df
    model = kmeans.fit(data.select('features'))

    # Get the cluster centers
    centers = model.clusterCenters()
    # save on the CSV file
    centers_df = spark.createDataFrame(centers)
    centers_df.write.csv("cluster_data_dir/new_clusters.csv", header=True, mode="overwrite")

    return predictions

# Apply the function on micro-batches of the data stream
query = streamingData \
    .writeStream \
    .foreachBatch(lambda df, epoch_id: kmeans_centers(df, initial_model)) \
    .start()

query.awaitTermination()

24/01/30 17:06:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/p0/qbnst5tj08g1z35zyllsm5vc0000gn/T/temporary-ef222dd5-bc6f-485a-bd20-0c8e02c523e0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/30 17:06:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computing-and-Big-Data-Applications/.conda/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/jonas/Documents/Uni/Erasmus/Cloud Computing for Big Data/Cloud-Computi

KeyboardInterrupt: 

In [6]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
data = [(Vectors.dense([0.0, 0.0]), 2.0), (Vectors.dense([1.0, 1.0]), 2.0),
        (Vectors.dense([9.0, 8.0]), 2.0), (Vectors.dense([8.0, 9.0]), 2.0)]
df = spark.createDataFrame(data, ["features", "weighCol"])
kmeans = KMeans(k=2)
kmeans.setMaxIter(1)

model = kmeans.fit(df)

24/01/31 08:38:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
import random

# Function to generate a random vector
def random_vector(length):
    return Vectors.dense([random.randint(0, 10) for _ in range(length)])

# Create original data with random vectors
original_data = [(random_vector(39), 1.0) for _ in range(50)]

# Create DataFrame
df = spark.createDataFrame(original_data, ["features", "weighCol"])

In [16]:
df.show()

+--------------------+--------+
|            features|weighCol|
+--------------------+--------+
|[5.0,4.0,8.0,9.0,...|     1.0|
|[3.0,2.0,0.0,4.0,...|     1.0|
|[3.0,0.0,9.0,10.0...|     1.0|
|[4.0,1.0,10.0,5.0...|     1.0|
|[0.0,5.0,9.0,9.0,...|     1.0|
|[0.0,6.0,2.0,9.0,...|     1.0|
|[7.0,4.0,8.0,5.0,...|     1.0|
|[2.0,5.0,7.0,9.0,...|     1.0|
|[8.0,8.0,5.0,5.0,...|     1.0|
|[9.0,8.0,6.0,3.0,...|     1.0|
|[9.0,5.0,6.0,6.0,...|     1.0|
|[2.0,4.0,1.0,10.0...|     1.0|
|[7.0,10.0,4.0,10....|     1.0|
|[2.0,2.0,9.0,5.0,...|     1.0|
|[6.0,1.0,0.0,3.0,...|     1.0|
|[8.0,2.0,4.0,0.0,...|     1.0|
|[6.0,1.0,3.0,4.0,...|     1.0|
|[3.0,1.0,6.0,6.0,...|     1.0|
|[10.0,5.0,10.0,6....|     1.0|
|[0.0,7.0,0.0,7.0,...|     1.0|
+--------------------+--------+
only showing top 20 rows



In [19]:
kmeans = KMeans(k=50)
kmeans.setMaxIter(10)

model = kmeans.fit(df)

In [18]:
model.clusterCenters()

[array([ 8.,  0.,  1.,  8.,  7.,  1.,  7.,  3.,  3.,  0.,  3.,  6.,  7.,
         4.,  7., 10.,  4.,  7.,  3.,  6.,  8.,  1.,  2.,  0.,  7.,  1.,
         3.,  1.,  9.,  4.,  8.,  3.,  1.,  1.,  2.,  6.,  5.,  2.,  4.]),
 array([ 5.,  4.,  8.,  9.,  1.,  3.,  4.,  2.,  5.,  3.,  5.,  7.,  2.,
         8.,  3.,  1.,  6.,  7.,  6.,  3.,  4.,  0.,  1.,  4.,  0.,  6.,
         3.,  2., 10.,  5.,  6., 10., 10.,  6.,  2., 10.,  6.,  2.,  6.]),
 array([ 3.,  2.,  0.,  4.,  5.,  8.,  4.,  6., 10.,  3.,  9., 10.,  4.,
        10.,  1.,  2.,  7., 10.,  9.,  5.,  4.,  4., 10.,  4.,  5.,  0.,
         9.,  5.,  2.,  1.,  6.,  7.,  2.,  3., 10.,  2.,  5.,  3.,  1.]),
 array([ 3.,  0.,  9., 10.,  8.,  5., 10.,  2.,  2.,  9.,  8.,  7.,  0.,
         6.,  7.,  3.,  0.,  7.,  3.,  6.,  3.,  6.,  0.,  4.,  2.,  6.,
         4.,  7.,  9.,  9., 10.,  2.,  7.,  9.,  8.,  9.,  6.,  1.,  3.]),
 array([ 4.,  1., 10.,  5.,  9., 10.,  1.,  5., 10., 10.,  3.,  0.,  4.,
         8.,  5.,  7.,  3.,  9.,  2.,  0., 

In [81]:
predictions = model.transform(df)

In [83]:
predictions.show()

+---------+--------+----------+
| features|weighCol|prediction|
+---------+--------+----------+
|[0.0,0.0]|     2.0|         0|
|[1.0,1.0]|     2.0|         0|
|[9.0,8.0]|     2.0|         1|
|[8.0,9.0]|     2.0|         1|
+---------+--------+----------+



In [None]:
from pyspark.sql.functions import when, sum
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeansModel
from pyspark.sql.functions import col

# Assemble the features into a single vector but ignore the date and serial_number fields

# without the first two columns
# Train a KMeans model on a static DataFrame
staticData_tiny = spark.read.csv("cluster_data_dir/drivedata-tiny.csv", header=True, inferSchema=True)
staticData = staticData_tiny.limit(5).cache()

#only the data from static data after the first 5 columns
staticData = staticData.select(staticData.columns[5:])

# Ensure all columns are numeric and replace nulls with 0
for column in staticData.columns:
    staticData = staticData.withColumn(column, col(column).cast("double")).na.fill(0)

# Select the first 500 rows

# Assemble the features into a single vector
assembler = VectorAssembler(inputCols=centers_df.columns, outputCol="features")
data = assembler.transform(centers_df)

# Train a KMeans model
kmeans = KMeans(k=(centers_df.count()), seed=1)
kmeans.setMaxIter(1)

model = kmeans.fit(data.select('features'))


Now we need to normalize the data based on the CSV

In [74]:
# Define the initial centroids
centroids = spark.sparkContext.broadcast(np.array([0.0, 0.0]))

def closest_centroid(point, centroids):
    closest_index = np.argmin(np.sum((centroids - point)**2, axis=1))
    return (closest_index, (point, 1))

for _ in range(10):  # Number of iterations
    res = df.rdd.map(lambda row: (Vectors.dense(row['features']), row['weight'])) \
                .map(lambda point: closest_centroid(point, centroids.value)) \
                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                .collect()

    # Update centroids
    centroids = spark.sparkContext.broadcast(np.array([x[1][0] / x[1][1] for x in res]))

# Print final centroids
print(centroids.value)

0