In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.clustering import KMeans as KMeansSpark
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import SparseVector
from pyspark.sql.types import DoubleType
from minepy import MINE
import numpy as np
from scipy.spatial.distance import pdist, squareform
from pyspark.sql.functions import udf



# Define the KMeans class
class KMeans():
    def _init_(self, k=5, max_iterations=500):
        self.k = k
        self.max_iterations = max_iterations
        self.kmeans_centroids = None


    def _init_random_centroids(self, data):
        centroids = data.sample(False, 1.0).limit(self.k).collect()
        return [row["features"] for row in centroids]

    def _closest_centroid(self, sample, centroids):
        closest_i = None
        closest_score = float("inf")
        for i, centroid in enumerate(centroids):
            if isinstance(sample, SparseVector):
                distance = np.sqrt(sample.squared_distance(centroid))
                score = distance
            else:
                distance = np.linalg.norm(sample - centroid)
                score = distance
            if score < closest_score:
                closest_i = i
                closest_score = score
        return closest_i

    def _create_clusters(self, centroids, data):
        clusters = [[] for _ in range(self.k)]
        for row in data.collect():
            sample = row["features"]
            centroid_i = self._closest_centroid(sample, centroids)
            clusters[centroid_i].append(row)
        return clusters

    def _calculate_centroids(self, clusters, data):
        centroids = []
        for cluster in clusters:
            if cluster:
                cluster_points = [row["features"] for row in cluster]
                cluster_mean = np.mean(cluster_points, axis=0)
                centroids.append(cluster_mean)
            else:
                # If the cluster is empty, select a random point from the dataset as the centroid
                random_row = np.random.randint(0, data.count())
                centroids.append(data.collect()[random_row]["features"])
        return np.array(centroids)

    def fit(self, data):
        centroids = self._init_random_centroids(data)
        for _ in range(self.max_iterations):
            clusters = self._create_clusters(centroids, data)
            new_centroids = self._calculate_centroids(clusters, data)

            # Check convergence
            if np.allclose(new_centroids, centroids):
                self.kmeans_centroids = new_centroids
                return new_centroids

            centroids = new_centroids

        self.kmeans_centroids = centroids
        return centroids

    def predict(self, data):
        if self.kmeans_centroids is None:
            raise Exception("Run the fit function first")

        centroids = self.kmeans_centroids
        clusters = self._create_clusters(centroids, data)
        predicted_labels = []
        for i, cluster in enumerate(clusters):
            for row in cluster:
                predicted_labels.append((i, row["features"]))
        return predicted_labels

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KMeans with PySpark") \
    .getOrCreate()

# Load CSV data into DataFrame
df = spark.read.csv("./adult.csv", header=True, inferSchema=True) #As we have used a adult/universal data set for example , instead of this we can use the unstop dataset or any other to run the K means Cluster alogorithm.

# Updated column names
column_names = ['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation',
         'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income']

# Rename the columns
df = df.toDF(*column_names)

# Drop rows with missing values
df = df.dropna()

# Define categorical features
categorical_features = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']

# Index categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in categorical_features]
indexers_pipeline = Pipeline(stages=indexers)
indexed_data = indexers_pipeline.fit(df).transform(df)

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week'] + [column+"_index" for column in categorical_features], outputCol="features")
assembled_data = assembler.transform(indexed_data)

# Standardize numerical features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)


kmeans_custom = KMeans(k=5)

# Fit the data
kmeans_custom.fit(scaled_data)

# Predict labels
predicted_labels = kmeans_custom.predict(scaled_data)

print("Predicted Labels:")
for label, _ in predicted_labels:
    print(label)

from pyspark.ml.feature import PCA
import matplotlib.pyplot as plt

# Print clusters
print("Clusters:")
for i, cluster in enumerate(kmeans_custom.kmeans_centroids):
    print(f"Cluster {i}: {cluster}")

# Apply PCA
num_dimensions = 2  # Set the number of dimensions for PCA
pca = PCA(k=num_dimensions, inputCol="features", outputCol="pca_features")
model = pca.fit(scaled_data)
result = model.transform(scaled_data)

# Extract PCA components for plotting
pca_data = result.select("pca_features").rdd.map(lambda x: x.pca_features).collect()
x_values_pca = [point[0] for point in pca_data]
y_values_pca = [point[1] for point in pca_data]

# Plot clusters using PCA components
plt.scatter(x_values_pca, y_values_pca, c=x_values_pca, cmap='viridis')
plt.title('KMeans Clustering with PCA')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.show()

# Stop Spark session
spark.stop()