In [1]:
from sklearn.neighbors import NearestNeighbors
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import operator

### Define Parameters

In [2]:
# Define parameters
PARTITIONS = 48

Eps,MinPts = float('inf'),15

### Compute Global distance matrix

In [3]:
def distance(x,y):
    """
    Coumpute l2 distance
    """
    return np.linalg.norm(x-y)

In [4]:
path = "Data/a1.txt"  #file path
data = sc.textFile(path,PARTITIONS).map(lambda txt : np.array(txt.split()).astype(float)) #split the txt line and convert str to float
                         
points = data.collect()

n = len(points) #length of data

#create neighbor finder using ball-tree in sklearn
neigh = NearestNeighbors(n_neighbors=MinPts, radius=float('inf'))
neigh.fit(points)
bc_knnobj = sc.broadcast(neigh)

# Get neighbors for each point, distributedly
# Each item ((dis_vec),(index))
kneighbor = data.map(lambda x: bc_knnobj.value.kneighbors(x.reshape(1, -1), MinPts, return_distance=True))
kneighbor.cache()

points = list(zip(range(n),points)) #list of (id,points) 

                                                                                

### Compute Core points

In [5]:
# point_rdd = sc.parallelize(points,PARTITIONS_1).cache() #store it for convenience
# tmp = point_rdd.map(CorePoints).filter(lambda x: x != None).cache()

# Get core points & core distance
core_Points = kneighbor.map(lambda x:(x[1][0][0],x[1][0])).cache() #(data index, neighbor)
core_Distance = kneighbor.map(lambda x:(x[1][0][0],x[0][0][-1])).collectAsMap()# (data index, dis_vec[MinPts-1])

core_point = list(core_Distance.keys())#return core point index

                                                                                

### Compute Reachability Distance

In [6]:
def reachability_distance(data):
    """
    Function to compute reachbility distance row by row respect of given data
    
    data is format of (index, (features1, feature2,...))
    """
    global n
    
    rd_row = np.full((1,n),Eps)
    
    #calculate the row reachability distance. This can be improved 
    for c_p in core_point:
        # print(core_Distance[c_p])
        # print(distributed_distance[data[0]][c_p])
        rd_row[0,c_p] = np.maximum(core_Distance[c_p],distance(data[1],points[c_p][1]))
    
    return (data[0],rd_row)

In [7]:
#A dict : {index of point:distance vector}
sc.broadcast(core_Distance)
sc.broadcast(core_point)

rd_Matrix = sc.parallelize(points,PARTITIONS).map(reachability_distance).collectAsMap()

del core_Distance,points #delete variables will not be used later

                                                                                

### Update

In [8]:
def update_part_1(neighbor,point):
    """
    Function to update (Part I): Update Seed dictionary
    
    Since for each neighbor modification will only happen on key=neighbor, we can
    use map function to decide whether to modify or not and reduce the dict in the driver
    
    neighbor : index of point
    """
    
    key = neighbor
    
    if neighbor not in Orderedlist:
        
        #val the value corresponds to key
        if neighbor not in seed.keys(): #if current point not in seed
            val = rd_Matrix[point][0][neighbor]
        else:
            val = np.minimum(rd_Matrix[point][0][neighbor],seed[neighbor])
        
        return (key,val)#update part
    
    else: #if neighbor in the Orderedlist, do not change the seed dictionary
        
        if neighbor not in seed.keys():
            return None #if not in keys, return None and will be pruned later
        else:
            return (key,seed[key])#update
        
def update_part_2():
    """
    Function to update (Part II): The recursive update
    
    """
    global seed,Orderedlist,Reach_distance 
    
    while (len(seed)>0):
        current_point = list(seed)[0]
        dis = seed[current_point]
        seed.pop(current_point)

        if current_point not in Orderedlist:
            Orderedlist.append(current_point)
            Reach_distance.append(dis)

            if core_Points.filter(lambda x: x[0]==current_point).collect() != []:

                neighbors = core_Points.lookup(current_point)[0]
                
                #note that current point should be the reference point
                update(neighbors,current_point)

def update(neighbors,point):
    """
    The whole update function entry
    
    neighbors : list of index of point
    
    point : a index of point
    """
    global seed, Orderedlist
    
    sc.broadcast(seed)
    sc.broadcast(Orderedlist)

    seed = sc.parallelize(neighbors,PARTITIONS_1).map(lambda x: update_part_1(x,point))\
                                            .filter(lambda x: x!= None)\
                                            .sortBy(lambda x: x[1])\
                                            .collectAsMap()
    
    update_part_2()

In [9]:
def update_single(neighbors,point):
    """
    update function for single machine only
    """
    global seed, Orderedlist,core_Points
    
    for neighbor in neighbors:
        if neighbor not in Orderedlist:
            if neighbor not in seed.keys():
                seed[neighbor] = rd_Matrix[point][0][neighbor]
            else:
                seed[neighbor] = min(rd_Matrix[point][0][neighbor],seed[neighbor])
                
    seed = dict(sorted(seed.items(), key=lambda x: x[1], reverse=False))
    
    while(len(seed)>0):
        current_point = list(seed)[0]
        dis = seed[current_point]
        seed.pop(current_point)        
        
        if current_point not in Orderedlist:
            Orderedlist.append(current_point)
            Reach_distance.append(dis)
            if current_point in core_point:
                neighbors=core_Points[current_point]
                update_single(neighbors,current_point)

In [10]:
Orderedlist = [] 
seed = {}
Reach_distance = []

core_Points = core_Points.collectAsMap()

last_point = 0


for point in core_point: #here core_point is index of core points:
    if point not in Orderedlist:
        
        # neighbors = core_Points.lookup(point)[0]
        neighbors = core_Points[point]
        Reach_distance.append(rd_Matrix[point][0][last_point])
        Orderedlist.append(point)        
        
        #update function
        update_single(neighbors,point)

### Draw (Only for 2-dimensions visualization)

In [11]:
def draw(Eps,MinPts,Orderedlist,Reach_distance,data):
    ind = [0]
    data_split = []
    Noise = []
    
    for i in range(len(Reach_distance)):
        if Reach_distance[i]>Eps:
            ind.append(i)
            
    if ind[-1]!=len(Reach_distance):
        ind.append(len(Reach_distance))
    
    for i in range(len(ind)-1):
        if ind[i+1]-ind[i] < MinPts:
            Noise.append(ind[i])
            ind[i]=ind[i]-1
            if ind[i+1] == len(Reach_distance):
                Noise.append(ind[i+1])
         #   ind.remove(ind[i])
    for i in Noise:
        if i in ind:
            ind.remove(i)
    
    
    for i in range(len(ind)-1):
        data_split.append(Orderedlist[ind[i]:ind[i+1]])
    
    data = pd.DataFrame(data)
    data.columns=['x','y']
    data['label'] = -1
    
    for x in range(len(data_split)):
        for y in data_split[x]:
            data.iloc[y,2]=x
            
    
    for i in Noise:
        data.iloc[i-1,2] = x+1
        
    fig = plt.figure(figsize=(12,8),dpi=200)
    plt.scatter(data['x'], data['y'], c=data['label'])
    
    return data

In [12]:
# visualization for testing only.
# comment it while runing OPTICS SPARKS
# draw(3,5,Orderedlist,Reach_distance,data.collect())