In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from tabulate import tabulate
from pyspark.sql.functions import when, col, isnan, isnull
import numpy as np
import pandas as pd
from pyspark.sql import Row
from pyspark.ml.classification import OneVsRest, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder \
    .appName("BasalamAnalysis") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/26 15:24:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Reading the Data and Replacing numerical nans with median for each column


In [6]:
df = spark.read.csv("/kaggle/input/basalam-comments-and-products/BaSalam.products.csv", header=True, inferSchema=True)

medians = {}
for col_name in ['sales_count_week', 'price', 'rating_average']:
    # Use approxQuantile to calculate median (0.5 quantile)
    median_val = df.approxQuantile(col_name, [0.5], 0.01)[0]
    medians[col_name] = median_val

# Handle null values in features columns by filling with calculated medians
df = df.fillna({
    'sales_count_week': medians['sales_count_week'],
    'price': medians['price'],
    'rating_average': medians['rating_average']
})

                                                                                                    

In [7]:
# Step 1: Assemble the features into a vector
feature_cols = ['sales_count_week', 'price', 'rating_average']

interaction_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = interaction_assembler.transform(df)

# Step 2: Define the score ranges for each class
df = df.withColumn("label", 
                               when(df["_score"] < 150, 0)
                               .when((df["_score"] >= 150) & (df["_score"] < 300), 1)
                               .otherwise(2))

# Step 3: Splitting the data into train, test and validation
(train, validation, test) = df.randomSplit([0.6, 0.2, 0.2], seed=321)
training_df = train.select("label", "features")
validation_df = validation.select("label", "features")
test_df = test.select("label", "features")

# Step 4: Create RDD to be used in map reduce
training_rdd = training_df.rdd
validation_rdd = validation_df.rdd
test_rdd = test_df.rdd

#Step 5: Repartition the RDD for better performance
training_rdd = training_rdd.repartition(200)

#Step 6: collect RDD elements into a list
validation_list = validation_rdd.collect()
test_list = test_rdd.collect()

                                                                                                    

# KNN

In [14]:
def apply_k_nearest_neighbors(data_rdd, target_point, num_neighbors):
  def compute_cosine_sim(vector_a, vector_b):
    # Calculate dot product of vectors
    dot_prod = np.dot(vector_a, vector_b)

    # Determine vector magnitudes
    norm_a = np.sqrt(np.sum(vector_a ** 2))
    norm_b = np.sqrt(np.sum(vector_b ** 2))

    # Prevent division by zero
    if norm_a == 0 or norm_b == 0:
        return 0
    # Return cosine similarity value
    return dot_prod / (norm_a * norm_b)

  def partition_map(partition_data):
      """Partition mapping: Identify nearest neighbors in each partition."""
      neighbor_list = []
      for record in partition_data:
          label = record.label
          features = record.features
          # Transform PySpark vectors to NumPy arrays
          vec_a = np.array(target_point.toArray())
          vec_b = np.array(features.toArray())
          # Compute similarity score
          sim_score = compute_cosine_sim(vec_a, vec_b)
          neighbor_list.append((None, {'score': sim_score, 'label': label}))
      # Order neighbors by similarity score
      neighbor_list.sort(key=lambda x: x[1]['score'], reverse=True)
      # Select top k neighbors
      top_neighbors = neighbor_list[:num_neighbors]

      return [top_neighbors]

  def aggregate_neighbors(neighbors_a, neighbors_b):
      """Aggregation: Combine and select top k neighbors."""
      # Combine neighbor lists
      combined_neighbors = neighbors_a + neighbors_b
      # Sort by similarity score in descending order
      combined_neighbors.sort(key=lambda x: x[1]['score'], reverse=True)
      # Return top k neighbors
      return combined_neighbors[:num_neighbors]

  def predict_label(neighbor_data):
      # Extract labels and similarity scores
      labels = np.array([item[1]['label'] for item in neighbor_data])
      scores = np.array([item[1]['score'] for item in neighbor_data])
      
      # Calculate class weights (inverse of class frequency)
      unique_labels, counts = np.unique(labels, return_counts=True)
      class_weights = {label: 1.0/count for label, count in zip(unique_labels, counts)}
      
      # Initialize vote counters for each class
      vote_counts = {}
      
      # Apply weighted voting
      for i in range(len(labels)):
          label = labels[i]
          # Weight by similarity and class frequency
          weight = scores[i] * class_weights[label]
          
          if label not in vote_counts:
              vote_counts[label] = 0
          vote_counts[label] += weight
      
      # Return class with highest weighted vote
      return max(vote_counts.items(), key=lambda x: x[1])[0]

  # Mapping stage: Process partitions to find neighbors
  partitioned_neighbors = data_rdd.mapPartitions(partition_map)

  # Aggregation stage: Combine results to get final neighbors
  selected_neighbors = partitioned_neighbors.reduce(aggregate_neighbors)
  return predict_label(selected_neighbors)

# EVALUATION

In [17]:
def compute_confusion_matrix(actual_labels, predicted_labels, class_labels):
    class_count = len(class_labels)
    matrix = [[0] * class_count for _ in range(class_count)]
    label_indices = {lbl: idx for idx, lbl in enumerate(class_labels)}
    for actual, predicted in zip(actual_labels, predicted_labels):
        actual_idx = label_indices[actual]
        predicted_idx = label_indices[predicted]
        matrix[actual_idx][predicted_idx] += 1
    return matrix

def compute_accuracy(matrix):
    correct = sum(matrix[i][i] for i in range(len(matrix)))
    total = sum(sum(row) for row in matrix)
    return correct / total if total != 0 else 0

def compute_f1_score(prec, rec):
    return 2 * (prec * rec) / (prec + rec) if (prec + rec) != 0 else 0

def compute_precision(matrix, cls_idx):
    true_pos = matrix[cls_idx][cls_idx]
    pred_pos = sum(matrix[i][cls_idx] for i in range(len(matrix)))
    return true_pos / pred_pos if pred_pos != 0 else 0

def compute_recall(matrix, cls_idx):
    true_pos = matrix[cls_idx][cls_idx]
    actual_pos = sum(matrix[cls_idx])
    return true_pos / actual_pos if actual_pos != 0 else 0

def compute_macro_precision(matrix):
    class_count = len(matrix)
    precisions = [compute_precision(matrix, i) for i in range(class_count)]
    return sum(precisions) / class_count if class_count != 0 else 0

def compute_macro_recall(matrix):
    class_count = len(matrix)
    recalls = [compute_recall(matrix, i) for i in range(class_count)]
    return sum(recalls) / class_count if class_count != 0 else 0

def compute_micro_precision(matrix):
    class_count = len(matrix)
    true_pos = sum(matrix[i][i] for i in range(class_count))
    pred_pos = sum(sum(matrix[i][j] for j in range(class_count)) for i in range(class_count))
    return true_pos / pred_pos if pred_pos != 0 else 0

def compute_micro_recall(matrix):
    class_count = len(matrix)
    true_pos = sum(matrix[i][i] for i in range(class_count))
    actual_pos = sum(sum(row) for row in matrix)
    return true_pos / actual_pos if actual_pos != 0 else 0

def print_confusion_matrix(matrix, class_labels):
    print("Confusion Matrix:")
    table = [[''] + class_labels]
    for i, label in enumerate(class_labels):
        table.append([label] + matrix[i])
    print(tabulate(table, tablefmt='grid'))

def assess_knn_performance(test_data):
    predictions = []
    actuals = []
    for record in test_data:
        actual_label = record.label
        features = record.features
        # KNN classification
        knn_result = apply_k_nearest_neighbors(training_rdd, features, 3)
        predictions.append(knn_result)
        actuals.append(actual_label)
    
    class_labels = [0, 1, 2]  # Include all possible label values
    conf_matrix = compute_confusion_matrix(actuals, predictions, class_labels)
    print_confusion_matrix(conf_matrix, class_labels)

    accuracy = compute_accuracy(conf_matrix)
    macro_prec = compute_macro_precision(conf_matrix)
    micro_prec = compute_micro_precision(conf_matrix)
    macro_rec = compute_macro_recall(conf_matrix)
    micro_rec = compute_micro_recall(conf_matrix)
    f1_macro_score = compute_f1_score(macro_prec, macro_rec)
    f1_micro_score = compute_f1_score(micro_prec, micro_rec)

    metrics_table = [
        ['Accuracy', accuracy],
        ['Macro Precision', macro_prec],
        ['Micro Precision', micro_prec],
        ['Macro Recall', macro_rec],
        ['Micro Recall', micro_rec],
        ['Macro F1 Score', f1_macro_score],
        ['Micro F1 Score', f1_micro_score]
    ]
    print(tabulate(metrics_table, tablefmt='grid'))

In [20]:
assess_knn_performance(validation_list[:500])



Confusion Matrix:
+---+----+-----+----+
|   |  0 |   1 |  2 |
+---+----+-----+----+
| 0 | 28 |   2 |  0 |
+---+----+-----+----+
| 1 | 81 | 353 |  0 |
+---+----+-----+----+
| 2 |  0 |   0 | 36 |
+---+----+-----+----+
+-----------------+----------+
| Accuracy        | 0.834    |
+-----------------+----------+
| Macro Precision | 0.750416 |
+-----------------+----------+
| Micro Precision | 0.834    |
+-----------------+----------+
| Macro Recall    | 0.915566 |
+-----------------+----------+
| Micro Recall    | 0.834    |
+-----------------+----------+
| Macro F1 Score  | 0.824805 |
+-----------------+----------+
| Micro F1 Score  | 0.834    |
+-----------------+----------+


                                                                                                    