# K-means and Intrinsic Dimension

In this programming assignment, you will estimate the intrinsic dimension by calculating the Mean Squared Distance of the entire dataset to their representative centers. You will use the K-Means API in spark to find representative centers.
All of the necessary helper code is included in this notebook. However, we advise you to go over the relevant materials before attempting this Programming Assignment.

## Reviewing the Theory 

### Computing the intrinsic dimension of a data set

Recall from class that given any $d$ dimensional dataset, we can divide it into $n$ cells of diameter $\epsilon$ each. The relationship between $n, d, \epsilon$ is then given by:

$$
n = \frac{C}{\epsilon^d}
$$

where $C \in I\!R$.

Alternately, we may write this as:

$$
\log{n} = \log{C} + d \times \log{\frac{1}{\epsilon}}.
$$

Given this expression, we can then compute the dimensionality of a dataset using:

$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$

where $(n_1,\epsilon_1)$, $(n_2, \epsilon_2)$ are the number of cells and diameter of each cell at 2 different scales.

### Using K-Means to estimate the intrinsic dimension

We can use K-Means to approximate the value of the intrinsic dimension of a dataset. In this case, the K centers represent the cells, each with a diameter equal to the Mean Squared Distance of the entire dataset to their representative centers. The estimate for intrinsic dimension then becomes:

$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\sqrt{\epsilon_1}} - \log{\sqrt{\epsilon_2}}} = 2 \times \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$

where $(n_1, n_2)$ are number of clusters and $(\epsilon_1, \epsilon_2)$ are the Mean Squared Distances for $(n_1, n_2)$ respectively.

In [1]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
from math import log
import pickle

import os
import sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
spark = SparkSession \
    .builder \
    .getOrCreate()
sc = spark.sparkContext

### Problem 1: runKmeans

The function `runKmeans` takes as input the complete dataset rdd, the sample of the rdd on which k-means needs to be run, the k value, and the count of elements in the complete dataset. It outputs the Mean Squared Distance (MSD) of all the points in the dataset from their closest centers, where the centers are calculated using k-means.

#### Task:

Finish `runKmeans` function. In this function, you should train k-means on `sample_dataset` with `k` clusters and 1 iteration. Then you should calculate and return the Mean Squared Distance (MSD) of all points in `data`, where the number of points in `data` is given as `count`.

**Hint:** 

- Use [K-Means API](https://spark.apache.org/docs/2.2.0/mllib-clustering.html#k-means). 
- The `computeCost` function gives you the sum of squared distances. 
- **Ensure you set the argument `maxIterations` to 1** since we only need approximate values for the K centers to estimate intrinsic dimension. Setting this to higher values will lead to large execution times while barely affecting the calculation of the intrinsic dimension. 
- **Use default parameters for other optional arguments.**

---

**<font color="magenta" size=2>Example Code</font>**
``` python
rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())
```

**<font color="blue" size=2>Example Output</font>**
``` python
2.0
```

In [3]:
def runKmeans(data, sample_dataset, k, count):
    ###
    ### YOUR CODE HERE
    ###
    
    clusters = KMeans.train(sample_dataset, k, maxIterations=1)
    
    sum_squared_distances = clusters.computeCost(data)
    
    mean_squared_distances = sum_squared_distances/count
    
    return mean_squared_distances


In [4]:
KMeans

pyspark.mllib.clustering.KMeans

In [5]:
rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])

In [6]:
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())

2.0

### Problem 2: computeIntrinsicDimension

The function `computeIntrinsicDimension` takes as input a pair of values $(n1, e1)$, $(n2, e2)$ and computes the intrinsic dimension as output. $e1, e2$ are the mean squared distances of data points from their closest center

#### Task:

Finish `runKmeans` function.

**Hint:** Use the last formula in the theory section.

---

**<font color="magenta" size=2>Example Code</font>**
``` python
n1 = 10
n2 = 100
e1 = 10000
e2 = 100
computeIntrinsicDimension(n1, e1, n2, e2)
```

**<font color="blue" size=2>Example Output</font>**
``` python
1.0
```

In [8]:
def computeIntrinsicDimension(n1, e1, n2, e2):
    ###
    ### YOUR CODE HERE
    ###

    intrinsic_dim = 2*(log(n2)-log(n1))/(log(e1)-log(e2))
    
    return intrinsic_dim

### Problem 3: Putting it all together

Now we run K-means for various values of k and use these to estimate the intrinsic dimension of the dataset. Since the dataset might be very large, running kmeans on the entire dataset to find k representative centers may take a very long time. To overcome this, we sample a subset of points from the complete dataset and run Kmeans only on these subsets. We will run Kmeans on 2 different subset sizes: **10000, 20000 points**. We will be estimating the MSD for **K values of 10, 200, 700, 2000**.

#### Task:

Finish `run` function, which takes a Spark Dataframe `df` containing the complete data-set as input and needs to do the following:

* For each sample size $S$
    * Take the first $S$ elements from the dataframe
    * For each value of K (number of centroids)
        * Call `runKmeans(data, S, K, data_count)`
* Use the MSD values generated to calculate the intrinsic dimension where $(n_1, n_2) \in \{(10,200),(200,700),(700,2000)\}$.

**Hint:** Review the last part of the theory section to figure out what the values should be for $(\epsilon_1, \epsilon_2)$ for each $(n_1, n_2)$.

**NOTE: Ensure you the format of your output is identical to the one given below, i.e. the keys have to be of the format:**
```python
ID_<Subset_size>_<K-Value-1>_<K-Value-2>
```

---

**<font color="magenta" size=2>Example Code</font>**
``` python

df = spark.read.parquet(file_path)
run(df)
```

**<font color="blue" size=2>Example Output</font>**
``` python
{'ID_10000_10_200': 1.5574966096390015, 'ID_10000_200_700': 1.3064513902343675, 'ID_10000_700_2000': 1.091310378488035, 'ID_20000_10_200': 1.518279780870003, 'ID_20000_200_700': 1.2660237819996782, 'ID_20000_700_2000': 1.0151131917703071}
```
**Note: The output here is the output of the below function, i.e., the value stored in the variable where the 'run' function is called**

In [51]:
def run(df):
    ###
    ### YOUR CODE HERE
    ###
    
    output = {}
    for sample_size in [10000,20000]:
        last_k,last_msd = -1,-1
        for k in [10,200,700,2000]:
            msd = runKmeans(df,sc.parallelize(df.take(sample_size)),k,sample_size)
            if last_k == -1:
                last_k, last_msd = k, msd
                continue
            else:
                key = 'ID_%d_%d_%d'%(sample_size,last_k,k)
                val = computeIntrinsicDimension(last_k, last_msd, k, msd)
                output[key] = val
                last_k, last_msd = k, msd
                
    return output


In [12]:
# this function allows a +/-15% tolerance margin 
# while comparing student's answer against the solution
def within_tolerance(required_answer, student_answer, tolerance = 0.15):
            tolerance_value = required_answer * tolerance
            return required_answer - tolerance_value <= student_answer <= required_answer + tolerance_value

In [50]:
file_path_small = "./resource/asnlib/publicdata/hw5-small.parquet"
df = spark.read.parquet(file_path_small)

file_path_small = "./resource/asnlib/publicdata/hw5-small.parquet"
df = spark.read.parquet(file_path_small)
res = run(df)

In [15]:
df.head()

Row(col0='0.45152887379472995', col1='0.2260514832989532', col2='0.34592042250000005', col3='0.20345309649337506', col4='0.542974354273149', col5='0.2260514832989532', col6='0.34592042250000005', col7='0.20345309649337506')

In [16]:
computeIntrinsicDimension(700, 700, 200, 20000)

0.7473811426956143

In [20]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

cols = df.columns

expr = [col(c).cast("Double").alias(c) for c in cols]

df2 = df.select(*expr)
vectorAss = VectorAssembler(inputCols=cols, outputCol="features")
vdf = vectorAss.transform(df2)

data = vdf.rdd.map(lambda row: [x for x in row['features']])

In [21]:
data = df.rdd.map(lambda line: [float(x) for x in line])

In [22]:
print(data.getNumPartitions())

2


In [23]:
res = run(data)

17.6 s ± 326 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [24]:
data_five = data.repartition(5)
print(data_five.getNumPartitions())

5


In [25]:
res = run(data_five)

14.9 s ± 225 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [26]:
data_ten = data.repartition(10)
print(data_ten.getNumPartitions())

10


In [31]:
res = run(data_ten)

In [32]:

res


{'ID_10000_10_200': 1.6003851471866564,
 'ID_10000_200_700': 1.206294046358526,
 'ID_10000_700_2000': 1.116044433830574,
 'ID_20000_10_200': 1.5583397701561041,
 'ID_20000_200_700': 1.274790796855719,
 'ID_20000_700_2000': 1.1351265085743112}

In [33]:
res['ID_20000_10_200']

1.5583397701561041

In [34]:
res['ID_10000_200_700']

1.206294046358526

In [35]:
res['ID_10000_700_2000']

1.116044433830574

In [36]:
assert within_tolerance(1.5528485676876376, res['ID_10000_10_200'])

In [37]:
assert within_tolerance(1.2563867109654632, res['ID_10000_200_700'])

In [38]:
assert within_tolerance(1.2041956732161911, res['ID_10000_700_2000'])

In [39]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [40]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [41]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [42]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [43]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [44]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [45]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [46]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [47]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [48]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###
