In [4]:
from __future__ import print_function
import numpy as np
from collections import defaultdict
from scipy.spatial import distance_matrix
from pyspark.sql import SparkSession

#### Bonus Clustering

Implement the K-means algorithm (not ++: random initialisation) from scratch (without the library you used before). Are the clusters that you get similar to the ones you previously obtained?

<div style="text-align:center">
<img src="https://i2.wp.com/www.aprendemachinelearning.com/wp-content/uploads/2018/03/kmeans-3d-clusters.png"/> </div>

In [2]:
class KMeans:
    def recomputeCetroid(self):
        self.centroids = [np.array(self.clusters[item]).mean(axis = 0)  for item in self.clusters.keys()]


    def assignPoints(self, X):
        dist = distance_matrix(X, self.centroids)
        self.clusters = defaultdict(list)
        for i,item in enumerate(dist):
            self.clusters[np.argmin(item)].append(X[i,:])

    def __init__(self, X, k):
        self.centroids = X[np.random.choice(X.shape[0], k, replace=False), :]
        self.clusters = defaultdict(list)
        self.assignPoints(X)
        self.run(X)
        print(self.centroids, self.clusters)

    def run(self, X):
        for i in range(5):
            self.recomputeCetroid()
            self.assignPoints(X)

### K-means in PySpark!

<div style="text-align:center">
<img src="https://cdn-images-1.medium.com/max/902/1*Pa7PO1v7bANI7C-eHMS_PQ.png"/> </div>

Here you have the K-means algorithm written from scratch in PySpark. The next chunk has two useful functions needed for implementing the K-Means algorithm in python

In [6]:
def string_to_vector(line):
    return np.array([float(x) for x in line.split(' ')])


def find_closest_point(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

In [8]:
spark = SparkSession\
        .builder\
        .appName("PythonKMeans")\
        .getOrCreate()

lines_of_textfile = spark.read.text('points_in_the_space.txt').rdd.map(lambda r: r[0])
data = lines_of_textfile.map(string_to_vector).cache()
num_of_centers = int(2) #number of centers
epsilon = float(0.001) #value of convergence

kPoints = data.takeSample(False, num_of_centers, 1)
tempDist = 1.0

while tempDist > epsilon:
    closest_points = data.map(lambda p: (find_closest_point(p, kPoints), (p, 1)))\
                         .reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))\
                         .map(lambda st: (st[0], st[1][0] / st[1][1])).collect()

    tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in closest_points)

    for (iK, p) in closest_points:
        kPoints[iK] = p

print("Final Centroids: " + str(kPoints))

spark.stop()

Final centers: [array([15.5, 16.5, 17.5]), array([100., 101., 102.])]


## Find the duplicates!

<div style="text-align:center">
<img src="http://www.cse.chalmers.se/edu/course/TDA352/assets/images/avatar.jpg"/> </div>

For this task we've decided to use the methods for circular hashing. <br>
In particular what we're supposed to do is implement an hash function to represent a string with a number inependently of the position of the characters inside the string. <br>
In this case, the important thing is that the function we apply respect the commutative property, because in this case we can run the hash function on an arbitrary permutation of the input string and getting the same output. <br>
Another important thing is that the function is nonlinear because: <br>

citation: *"first, it usually does not matter that much in practice, but if you really care, you should know that it is a research subject by itself. There are thousand of papers about that. You can still get a PhD today by studying & designing hashing algorithms. second hash function might be slightly better, because it probably should separate the string "ab" from the string "ba". On the other hand, it is probably less quick than the first hash function. It may, or may not, be relevant for your application."*
<br>
    

So a closed form for getting the number hash number associated to a string is: <br><br>


<div style="text-align:center">
$H(s) ~ = ~ (\bigoplus_{c\in s}{ord(c)*A << B}) ~ \% ~ 2^{C}$ 
</div>

<br>

Where:

$\bigoplus$ is the [XOR](https://en.wikipedia.org/wiki/Exclusive_or) bitwise function over all possible characters inside the string; commutative and nonlinear, it gives us awesome properties. <br>

$ord(c)$ is the mapping between characters and [ascii code](https://en.wikipedia.org/wiki/ASCII) <br>

A is a very big prime number: in our case A = $7*37^{17}-27$ it helps to spread out the range of values and also reduce the false positives.<br>

B is the bitwise function ["left shift"](https://msdn.microsoft.com/it-it/library/336xbhcz.aspx), actually it's an integer multiplier. In our case B = $2$. 

C is any power of 2 big enough to contain all possible values for the numbers associated to the string but not a [large number](https://en.wikipedia.org/wiki/Large_numbers), because it will compress the image set and it will be useful in the implementation with hash tables for avoid to wast memory space. In our case C = $2^{128}$

<br>

In [5]:
def hashnoposition(s):
    """
    s[0]*31^(n-1) + s[1]*31^(n-2) + ... + s[n-1]
    This hash function maps a string into a number.
    It doesen't matter the position of the characters in the string
    """
    somme = 0
    for c in s:
        somme = (somme  ^ (ord(c)*(7*37**17-27) << 2))
    somme % (2**128)
    return somme

A brief demonstration of the working criteria is the following

In [3]:
print(hashnoposition('blue da ba dee da ba daa') == hashnoposition('da blbaue  dee da ba daa'))
print(hashnoposition('+TjZfQ%CKtz/iOG9$U1P') == hashnoposition('th9Sb;zk$$Bn.Y&VdpZyr'))

True
False


Here we define a function to get the range of values in which our hash function maps the strings in the __passwords2.txt__

In [13]:
def find_the_range(hashfoo):
    lis = []
    with open('passwords2.txt', 'r') as f:
        for line in f:
            lis.append(hashfoo(line))
    f.close()
    return (min(lis), max(lis))

minim, maxim = find_the_range(hashnoposition)
print(minim, maxim)

383353366233471895041246117920 995562401174542057826049359904


The next chunk is used to provide the number of false positives and the duplicates that are in the file __passwords2.txt__. <br>

The strategy is to use a dictionary to store the informations you have already seen, the key is the hash code of an arbitrary string and the value is the string itself. <br> When when you try to insert a new element in this dictionary increment a counter of duplicates if the element is already in; if the string linked to that hash code is different to the actual one, the counter of collisions is incremented by 1. <br> For the collision finder we've sorted the lines so you don't get false positives due to the fact that the first hash function is [rotational invariant](https://en.wikipedia.org/wiki/Rotational_invariance) and doesn't care about the position. <br> The parameter __hashfoo__ is a pointer to an arbitrary hash function, it's very useful if you don't want to have an [anti pattern](https://en.wikipedia.org/wiki/Anti-pattern) with a useless duplicate code.

In [19]:
def find_collision(hashfoo):
    founds = dict()
    coll = 0 #number of collisions
    dup = 0 #number of duplicates
    with open('passwords2.txt', 'r') as f: 
        for i, line in enumerate(f):
            hashval = hashfoo(line)
            if hashval in founds.keys():
                dup += 1 #i have found a duplicate
                if sorted(founds[hashval]) != sorted(line):
                    coll +=1
            else:
                founds[hashval] = line
    f.close()
    return (coll,dup)

collisions, duplicates = find_collision(hashnoposition)
print("Collisions =", collisions, "Duplicates =", duplicates)

Collisions = 5773, Duplicates = 10005773


### Order matters!

Now we've to define another hash function, but this time we've to keep in mind that two strings are the same if all the characters are equals elementwise. <br>
We've read something on the internet and for doing this we've decided to use a double XOR operator, a left shift and a product between the hash code itself with a big prime number while you compute it. <br>
Now we have a pretty nonlinear, noncommutative and nonassociative function. In maths the class of this function is called [Nonlinear Invariant Galois Fiels](http://www.singacom.uva.es/~iremarquez/CACTC2016/CACTC16-5.pdf). 

In [11]:
def hashposition(s):
    """
    This function define the hash code of a string.
    In this hash function the position has importance
    """
    x = ord(s[0]) << 7
    for c in s[1:]:
        x = (1000003 * x) ^ ~ ord(c)  & ~ x  |  37 >> ord(c)
    x = x ^ len(s)
    return abs(x)

Here you have a brief demonstration that the previous hash function works well even if the characters of the compared string (S1) are a permutation of the characters of the comparing string (S).

In [38]:
print(hashposition('blue da ba dee da ba daa') == hashposition('da blbaue  dee da ba daa'))
print(hashposition('+TjZfQ%CKtz/iOG9$U1P') == hashposition('th9Sb;zk$$Bn.Y&VdpZyr'))
print(hashposition('+TjZfQ%CKtfantaniz/iOG9$U1P') == hashposition('+TjZfQ%CKtfantaniz/iOG9$U1P'))

False
False
True


Now we want to run the function for finding the duplicates and the collisions over the same dataset but now we use the __hashposition__ function so we expect that the duplicates are quietly different from 

In [40]:
collisions, duplicates = find_collision(hashposition)
print("Collisions =", collisions, "Duplicates =", duplicates)

Collisions = 0 Duplicates = 5000000


### Bonus: Duplicate Finder in Spark!

Here we have an implementation in Spark of what we did before. <br> The important thing is to check if the Spark results are the same of what we did without it. <br> First of all we have to define the Spark session and read the text file.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("PythonDuplicateFinder")\
        .getOrCreate()

txt = spark.read.text('passwords3.txt').rdd.map(lambda r: r[0])

After having the entire text file in a variable named __txt__ we have only to define the MapReduce pipeline. <br>
First we emit the string and its hash in order to have a good way to recognize different pairs __<S, H(S)>__. <br>
Second we count the same pairs __<S, H(S)>__ emitted before, then we filter out the pairs that appear only once. <br>
Finally we take all the strings that mave matched as duplicates and put them into an RDD. The length of that RDD will be the number of duplicates.

In [9]:
duplicate_strings_with_position = txt.map(lambda x: ((hashposition(x),x ), 1))\
                                   .reduceByKey(lambda x,y: x+y)\
                                   .filter(lambda x: x[1] != 1)\
                                   .map(lambda x: (x[0][1]))

print("Number of duplicates in which the order matters =", len(duplicate_strings_with_position.take(int(1e100))))

Number of duplicates in which the order matters = 5000000


As expected the number of duplicates is the same with and without Spark.

Here we have the same code but now the hash function is the one that does not take into account the position of the characters.

In [10]:
duplicate_strings_without_position = txt.map(lambda x: ((hashnoposition(x),x ), 1))\
                                      .reduceByKey(lambda x,y: x+y)\
                                      .filter(lambda x: x[1] != 1)\
                                      .map(lambda x: (x[0][1]))


print("Number of duplicates in which the order don't matters =", len(duplicate_strings_without_position.take(int(1e100))))

Number of duplicates in which the order don't matters = 10005773


As expected the number of duplicates is the same with and without Spark. Of course you don't see exactly 10M of duplicates because of the collisions.