# Exercise 3

In this exercise, we are going to implement the KMeans clustering algorithm using Spark RDD.

For parts marked with **[CODE CHANGE REQUIRED]** you need to modify or complete the code before execution.
For parts without **[CODE CHANGE REQUIRED]** , you can just run the given code.


## KMeans clustering algorithm

KMeans clustering algorithm is an unsupervised machine learning algorithm which groups data points into *cluster*s (or groups) based on their similarity, e.g. product their likes, movies their likes, brands they follow, TV programs and movies they watched, university and colleages they attended. 

Assume all the attributes of the subjects in the analysis can be represented using some scalar values, we can conduct the analysis in the following steps.

1. The user/programmer specifies how many clusters he/she would like to group all the data points under. Let's say it is `K`
2. Randomly generate `K` data points, we call them *centroids*, `c1, c2, ..., cK`.
3. For each data point `p`, we compute the distances between `p` and `c1`, `p` and `c2`, ... Find the centroid `ci`, to which `p` is closest, we conclude `p` is in cluster `i`.
4. For each cluster `i`, we retrieve all the data points falling in this cluster, and compute the mean. The mean will be new centroid for cluster `i`, say `ci'`
5. Compare `c1` with `c1'`, `c2` with `c2'`, ..., `cK` with `cK'`. If all of them remains unchanged (or the differnce is lower than a threshold), we are done. Otherwise, update `c1 = c1'`, ..., `cK = cK` and go back to step 3.


Point to note, clusters might be disappear, i.e. some centroid has zero data point inside. 

![](https://i.stack.imgur.com/ibYKU.png)

Instead of using real data, we use a python script to generate the data. The python script is already written for you. It can be found in the git-cloned repo `sutd50043_student/lab13/data/ex3/data/generate.py` 

```python
import sys
import random
def gen(num_of_records, filename):
    with open(filename,'w') as f:
        for i in range(0,int(num_of_records)):
            x = random.uniform(-100,100)
            y = random.uniform(-100,100) 
            f.write("%.2f\t%.2f\n" % (x,y))
    f.close()

if __name__ == "__main__":
    if len(sys.argv) > 2:
        sys.exit(gen(sys.argv[1],sys.argv[2]))
    else:
        print("USAGE: python3 generate.py <number_of_records> <file_name>")
```

 **[CODE CHANGE REQUIRED]**
Modify the bash command in the next cell accordingly to execute the above script.



In [4]:
%sh

python3 /home/ec2-user/git/50043-labs/lab12/data/ex3/generate.py 1000 points.tsv
python3 /home/ec2-user/git/50043-labs/lab12/data/ex3/generate.py 10 centroids.tsv


## Exercise 3.1

 **[CODE CHANGE REQUIRED]**
 
Modify the following bash command and execute it, so that we can upload the points in the HDFS

In [6]:
%sh

export PATH=$PATH:/home/ec2-user/hadoop/bin/

namenode=ip-172-31-86-18 # TODO:change me

hdfs dfs -rm -r hdfs://$namenode:9000/lab12/ex3/
hdfs dfs -mkdir -p hdfs://$namenode:9000/lab12/ex3/input/points
hdfs dfs -put points.tsv hdfs://$namenode:9000/lab12/ex3/input/points/
hdfs dfs -mkdir -p hdfs://$namenode:9000/lab12/ex3/input/centroids/
hdfs dfs -put centroids.tsv hdfs://$namenode:9000/lab12/ex3/input/centroids/

 

We load the spark session to create the spark context

In [8]:
%pyspark
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("KMeans notebook").getOrCreate()
sc = sparkSession.sparkContext

 

## Exercise 3.2

 **[CODE CHANGE REQUIRED]**

Complete the definition of the following function `load_points_into_rdd`, which load the TSV data into an RDD given the namenode and the HDFS path (e.g. `/lab12/ex3/input/points`).

Each item in the RDD is a tuple of two float values.

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```python
namenode = "127.0.0.1:9000"

def load_points_into_rdd(namenode, path): # TODO
    return sc.textFile("hdfs://%s/%s" % (namenode,path)).map(lambda ln:ln.strip().split("\t")).map(lambda l:(float(l[0]), float(l[1])))
```

In [10]:
%pyspark 

namenode = "127.0.0.1:9000"

def load_points_into_rdd(namenode, path): # TODO
    return None



### Sample answer

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>


```python

def load_points_into_rdd(namenode, path): # TODO
    return sc.textFile("hdfs://%s/%s" % (namenode,path)).map(lambda ln:ln.strip().split("\t")).map(lambda l:(float(l[0]), float(l[1])))

```


## Test case 3.2

Run the following you should see, (the actual numeric values might differ, but the structure should be the same) 
```python
[(61.4, -33.87), (19.57, -20.85), (22.95, -49.32), (42.81, 29.71), (-65.89, -75.57), (13.48, 71.92), (-17.28, -21.7), (1.79, 43.8), (11.58, -32.18), (1.73, -54.43)]
```


In [13]:
%pyspark

# test case 1
points = load_points_into_rdd(namenode, "/lab12/ex3/input/points")
points.take(10)

 

## Exercise 3.3
 **[CODE CHANGE REQUIRED]**
Complete the following function `euc_dist` which compute the euclidean distance between two points


$$eucdist((x_1,y_1),(x_2,y_2)) = \sqrt{ (x_1 - x_2)^2 + (y_1 - y_2)^2  }$$



In [15]:
%pyspark
import math
def euc_dist(p1,p2): 
    '''
    inpput
    p1, p2: points
    output
    euclidean distance between p1 and p2
    '''
    # TODO
    return 0


### Sample answer

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>


```python
import math
def euc_dist(p1,p2): 
    '''
    inpput
    p1, p2: points
    output
    euclidean distance between p1 and p2
    '''
    # ANSWER
    return math.sqrt((p1[0]-p2[0])**2 + (p1[1] - p2[1])**2)

```


## Test case 3.3

Run the following you should see, 

```text
1.4142135623730951
```


In [18]:
%pyspark

p1 = (3.0, 1.0)
p2 = (2.0, 2.0)

euc_dist(p1,p2)

 

## Exercise 3.4
 **[CODE CHANGE REQUIRED]**
 
Complete the following function which computes the mean of of an iterator/list of points.


`mean()` functions takes a plain python iterator of points, not an RDD of points.
The mean of two points are defined as 

$$
mean((x_1,y_1),(x_2,y_2)) = (( x_1 + x_2) / 2, (y_1 + y_2) / 2)
$$



In [20]:
%pyspark

def mean(points):
    '''
    input
    an iterator of points
    output
    a point that is the means of all the points in the input. if the input list is empty, return None
    '''
    # TODO
    return None

### Sample answer

```python

def mean(points):
    '''
    input
    an iterator of points
    output
    a point that is the means of all the points in the input. if the input list is empty, return None
    '''
    # ANSWER
    pts = list(points)
    count = len(pts)
    if count == 0:
        return None
    else:
        xs = map(lambda p:p[0], pts)
        ys = map(lambda p:p[1], pts)
        return (sum(xs)/len(pts), sum(ys)/len(pts))
```


## Test case 3

Run the following you should see,

```text
(4,5)
```


In [23]:
%pyspark
points = ((x,y) for x in range(0, 10) for y in range(1,11))

mean(points)

## Exercise 3.5
 **[CODE CHANGE REQUIRED]**
Complete the following function which finds the nearest centroids for each points in the RDD.
<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```text
Let r1 and r2 be RDDs, r1.cartesion(r2) produces the cartesian product of r1 and r2. 
When finding the nearest centroid w.r.t to a point from a list of centroids, it is useful to think of it as a reduce operation.
```


In [25]:
%pyspark

def nearest(points, centroids):
    '''
    inputs
    points: an RDD of points
    centroids: an RDD of the current centroids
    
    output:
    point_and_nearestcentroids : an RDD of pairs, each pair consists of a point and the nearest centroid it belongs to
    '''
    # TODO
    point_and_nearcentroids = None
    return point_and_nearcentroids

### Sample Answer

```python
def nearest(points, centroids):
    '''
    inputs
    points: an RDD of points
    centroids: an RDD of the current centroids
    
    output:
    point_and_nearestcentroids : an RDD of pairs, each pair consists of a point and the nearest centroid it belongs to
    '''
    # ANSWER
    point_cross_centroids = points.cartesian(centroids)
    point_cross_centroids_distance = point_cross_centroids.map(lambda pc: (pc[0],(pc[1],euc_dist(pc[0],pc[1]))))
    point_and_nearcentroids = point_cross_centroids_distance.reduceByKey(lambda cd1, cd2:  cd1 if cd1[1] < cd2[1] else cd2 ).map(lambda pc: (pc[0], pc[1][0]))
    return point_and_nearcentroids

```


## Test Case 3.5

Run the following you should see, (the actual numeric values might differ, but the structure should be the same)


```text

[((31.34, 39.54), (10.94, 64.76)), ((63.18, -54.98), (78.66, -91.58)), ((-96.53, 54.69), (-53.93, 76.38)), ((-89.91, 9.5), (-42.27, 24.16)), ((95.43, 57.26), (10.94, 64.76)), ((74.38, -61.52), (78.66, -91.58)), ((77.21, -69.08), (78.66, -91.58)), ((-84.04, -74.41), (11.81, -27.94)), ((7.44, 45.33), (10.94, 64.76)), ((30.63, 52.08), (10.94, 64.76))]
```

In [28]:
%pyspark

points = load_points_into_rdd(namenode, "/lab12/ex3/input/points")
centroids = load_points_into_rdd(namenode, "/lab12/ex3/input/centroids")

nearest(points, centroids).take(10)

## Exercise 3.6

 **[CODE CHANGE REQUIRED]**

Complete the following function which executes one iteration of the K-Means algorithm.

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```text
Recall the difference between reduceByKey and groupByKey
```

In [30]:
%pyspark

    
def iteration(points, centroids):
    '''
    inputs
    points: an RDD of points
    centroids: an RDD of the current centroids
    
    output
    current_and_new_centroids: an RDD of pairs, each pair consists of a current centroid and the new centroid
    '''
    # Step a: for each point, compute the euclidean_dinstance with each centroid, find the nearest centroid
    point_and_nearcentroids = nearest(points, centroids)
    # Step b: flip the pairs from step b, and create an RDD of nearest centroid and points (note: the nearest centroids are still the current centroids)
    # TODO
    nearcentroid_and_points = None
    # Step c: compute the new centroids
    # TODO
    current_and_newcentroids = None
    return current_and_newcentroids
    

    

### Sample answer

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>


```python
def iteration(points, centroids):
    '''
    inputs
    points: an RDD of points
    centroids: an RDD of the current centroids
    
    output
    current_and_new_centroids: an RDD of pairs, each pair consists of a current centroid and the new centroid
    '''
    # Step a: for each point, compute the euclidean_dinstance with each centroid, find the nearest centroid
    point_and_nearcentroids = nearest(points, centroids)
    # Step b: flip the pairs from step b, and create an RDD of nearest centroid and points (note: the nearest centroids are still the current centroids)
    nearcentroid_and_points = point_and_nearcentroids.map(lambda p: (p[1], p[0]))
    # Step c: compute the new centroids
    current_and_newcentroids = nearcentroid_and_points.groupByKey().map(lambda kvs: (kvs[0], mean(kvs[1])))
    return current_and_newcentroids


```


## Test case 3.6

Run the following you should see, (the actual numeric values might differ, but the structure should be the same)

```text
[((-42.27, 24.16), (-63.50617346938779, -4.715561224489797)), ((34.19, -5.47), (65.24037313432835, 4.253208955223882)), ((73.78, -94.16), (43.22766666666668, -84.53166666666665)), ((78.66, -91.58), (83.09037735849053, -62.727924528301884)), ((11.81, -27.94), (-18.85223809523809, -62.081285714285706)), ((-55.65, 97.37), (-42.820499999999996, 94.566)), ((11.1, 2.86), (8.047450980392156, 10.851764705882351)), ((10.94, 64.76), (38.894331550802164, 69.78598930481279)), ((-68.4, 93.57), (-83.12192307692308, 83.44)), ((-53.93, 76.38), (-56.9511111111111, 63.98825396825399))]
```

In [33]:
%pyspark

points = load_points_into_rdd(namenode, "/lab12/ex3/input/points/")
centroids = load_points_into_rdd(namenode, "/lab12/ex3/input/centroids")
r = iteration(points,centroids)
r.take(10)

 

## KMeans 

To define KMeans, we just need two more actions, 

1. `forAll(rdd,p)` which checks whether all elements in `rdd` satisfy the predicate `p`.
2. `no_change(no_change(centroid_and_newcentroids,tolerance)` which applies a conditional check to all pairs of current and new centroids. It yields True if none of the new centroids is `None`, and the euclidean distances between the currents and new centroids are less than the tolerance.




### Exercise 3.7

Complete the `forAll` function. You don't need to change `no_change` function. 

 **[CODE CHANGE REQUIRED]**

Complete the following function which executes one iteration of the K-Means algorithm.

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```text
You can implement it using 
1. the aggregate function or
2. map and reduce functions.
```

In [36]:
%pyspark

def forAll(rdd, p):
    '''
    input:
    rdd : an RDD
    p : a predicate, a lambda function that takes a value and return a boolean. p will be applied to all elements in rdd
    
    output:
    True or False
    '''
    # TODO:
    return False


def no_change(centroid_and_newcentroids,tolerance):
    return forAll(centroid_and_newcentroids, lambda p:  p[1] is not None and euc_dist(p[0], p[1]) < tolerance)


### Sample answer

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>


```python
def forAll(rdd, p):
    '''
    input:
    rdd : an RDD
    p : a predicate, a lambda function that takes a value and return a boolean. p will be applied to all elements in rdd
    
    output:
    True or False
    '''
    # ANSWER
    return rdd.map(p).reduce(lambda x,y: x and y)

```


### Test Case 3.7

Run the following, we should see `True`


In [39]:
%pyspark

test_vs = sc.parallelize([2,4,0,6])
forAll(test_vs ,lambda x:x % 2 == 0)

Lastly, the `kmeans` function is defined by a for-loop in which 

1. we call `iteration` to compute the new centroids, 
2. then check whether there is any change between the current and new centroids via `no_change`. 
    2.1. If there is changes, it goes back to the loop by sending the new centroids to be current centroids, 
    2.2. otherwise it exits the loop and compute the membership between the points and the lastest centroids.

Note the use of `.persist()`. try to re-run it again by commenting away the statements using `.persist()`, it will take a longer time to converge.

The code is written for you, you don't need to change anything unless you want to experiement with `.persist()` and without.

In [41]:
%pyspark


def kmeans(points, centroids, num_iters, tolerance=2):
    points.persist()
    for i in range(0, num_iters):
        centroid_and_newcentroids = iteration(points,centroids)
        if no_change(centroid_and_newcentroids,tolerance):
            break;
        centroids = centroid_and_newcentroids.map(lambda p:p[1]).filter(lambda c: c is not None)
        centroids.persist()
        # print(i,centroids.collect())
    
    return nearest(points, centroids)
    
    
        

In [42]:
%pyspark

points = load_points_into_rdd(namenode, "/lab12/ex3/input/points/")
centroids = load_points_into_rdd(namenode, "/lab12/ex3/input/centroids")

clusters = kmeans(points, centroids, 100, 2)

clusters.take(100)

## modify and run the following to clean up the HDFS

 **[CODE CHANGE REQUIRED]**

In [44]:
%sh
export PATH=$PATH:/home/ec2-user/hadoop/bin/

namenode=ip-172-31-86-18 # TODO:change me

hdfs dfs -rm -r hdfs://$namenode:9000/lab12/ex3/

In [45]:
%pyspark
sc.stop()

# End of Exercise 3
