# K-means and Intrinsic Dimension

In this notebook, we presente pyspark code to estimate the intrinsic dimension of a dataset using `K-means clustering`.

Given a sample set of a synthetically generated dataset, we use the Spark K-means API to estimate the centroids of the data points. After obtaining the centroids of the data, we calculate the mean-squared error of the dataset by calculating the mean-squared distance of each datapoint from it's respective representative centroid. This experiment is to be repeated for different values of K (K being the number of centroids), for the same sample of data. Using the information obtained from these experiments, we can estimate the intrinsic dimension of the sample set.

### Computing the intrinsic dimension of a data set:

Recall that given any $d$ dimensional dataset, we can divide it into $n$ cells of diameter $\epsilon$ each. The relationship between $n, d, \epsilon$ is then given by:
$$
n = \frac{C}{\epsilon^d}
$$
Where $C \in I\!R$

we may write this as:
$$
\log{n} = \log{C} + d \times \log{\frac{1}{\epsilon}}
$$

Given this expression, we can then compute the dimensionality of a dataset using:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$


### Using K-Means to estimate intrinsic dimension:
We can use K-Means to approximate the value of intrinsic dimension of a data-set. In this case, the K centers represent the cells, each with a diameter equal to the Mean Squared Distance of the entire dataset to their representative centers. The estimate for intrinsic dimension then becomes:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\sqrt{\epsilon_1}} - \log{\sqrt{\epsilon_2}}} = 2 \times \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$

To learn more about fractional dimensions: [here](https://www.youtube.com/watch?v=gB9n2gHsHN4)

### Spark Setup 

In [1]:
import findspark
findspark.init()

from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
from math import log
import pickle

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Helper functions

* The function `runKmeans` takes as input the complete dataset rdd, the sample of the rdd on which k-means needs to be run on,    the k value and the count of elements in the complete data-set. It outputs the Mean Squared Distance(MSD) of all the points in the dataset from their closest centers, where the centers are calculated using k-means. To do so we use  **K-Means API**: the initializationMode parameter is set to kmeans++. The computeCost function gives you the sum of squared distances. 

* The function `omputeIntrinsicDimension` takes as input a pair of values $(n1, e1)$, $(n2, e2)$ and computes the intrinsic dimension as output. $e1, e2$ are the mean squared distances of data-points from their closest center (we use the formula above).


In [4]:
def runKmeans(data, sample_dataset, k, count):
    clusters = KMeans.train(sample_dataset, k, maxIterations=1, initializationMode="kmeans++")
    cost = clusters.computeCost(data)
    return cost/count

def computeIntrinsicDimension(n1, e1, n2, e2):
    ans = 2 * (log(n2) - log(n1)) / (log(e1) - log(e2))
    return ans

### Run K-Means:

Now we run K-means for various values of k and use these to estimate the intrinsic dimension of the dataset.
we run Kmeans only on a subset of 10000 to avoid long time computations. We will be estimating the MSD for K values of 10, 200, 700, 2000. The input dataframe is a parquet file, in order to get the correct approximation of the dimension, we will transform data to array of dense vectors and convert columns to double before running KMeans.

In [5]:
from pyspark.mllib.linalg import Vectors

def run(df):
    
    result = dict()
    S = (10000, 20000) #Size of the sample kmeans runs on 
    list_K = [10, 200, 700, 2000] # k values(number of cells n)
    diameters = [] 
    
    rdd1 = df.rdd.map(lambda data: Vectors.dense([float(c) for c in data]))
    rdd1_subset = df.limit(S[0]).rdd.map(lambda data: Vectors.dense([float(c) for c in data]))
    count = rdd1.count()
    
    for k in list_K:
        d = runKmeans(rdd1, rdd1_subset, k, count)
        diameters.append(d)
    
    # Use the diameters values generated to calculate the intrinsic dimension
    # To do so we need just two scales (10-200, 200-700, 700-20000) and their respectives diameters
    for i in range(len(list_K)-1):
        interdim = computeIntrinsicDimension(list_K[i], diameters[i], list_K[i+1], diameters[i+1])
        key = 'ID_' + str(S[0]) + '_' + str(list_K[i]) + '_' + str(list_K[i+1])
        result[key] = interdim
    
    return result

In [6]:
# Data
file_path_small = "./hw5-small.parquet"
df = spark.read.parquet(file_path_small)
ans = run(df)

In [7]:
ans

{'ID_10000_10_200': 1.580327672205145,
 'ID_10000_200_700': 1.2730250878501679,
 'ID_10000_700_2000': 1.1274508742463538}