In [1]:
import numpy as np
import pandas as pd
from sklearn import datasets
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
def load_data_label(filename):
    '''
    Load data with label specified
    '''
    data = []
    with open(filename) as file:
        lines = file.readlines()
        for line in lines:
            line = line.strip().split()
            data_line = [float(i) for i in line]
            data.append(data_line)
    return data

In [3]:
def addKey(x, max_x, max_y, buffer):
    mid_x = max_x / 2
    mid_y = max_y / 2
    cur = x
    l = []
    if cur[0] <=mid_x + buffer and cur[1]<=mid_y + buffer:
        l.append((0,x))
    if cur[0] > mid_x - buffer and  cur[1] <=mid_y + buffer:
        l.append((1,x))
    if  cur[0] <=mid_x + buffer and cur[1]>mid_y-buffer:
        l.append((2,x))
    if cur[0] > mid_x - buffer and cur[1] >mid_y-buffer:
        l.append((3,x))
    return l

def reduce(x,y):
    if isinstance(x[0], np.float64) and isinstance(y[0], np.float64):
        return np.concatenate((x,y), axis=0).reshape((2,2))
    elif isinstance(x[0], np.ndarray) and isinstance(y[0], np.float64):
        return np.concatenate((y.reshape((1,2)),x), axis=0)
    elif isinstance(x[0], np.float64) and isinstance(y[0], np.ndarray):
        return np.concatenate((x.reshape((1,2)),y), axis=0)
    else:
        return np.concatenate((x,y), axis=0)
def topd(x):
    return (x[0], pd.DataFrame(x[1], columns = ['feature1','feature2']))
def dataProcessing(data):
    rdd = sc.parallelize(train_data,10)
    max_x = max(data[:,0])
    max_y = max(data[:,1])
    rdd = rdd.map(lambda x: addKey(x, max_x, max_y, 3)).reduce(lambda x, y: x+y)
    rdd = sc.parallelize(rdd).reduceByKey(lambda x, y: reduce(x,y)).map(topd)

    return rdd.partitionBy(4)


In [4]:
def Local_DBSCAN(df, distance=3, neighbours_cnt=3):
    # df1: give points id
    df1 = df.copy()
    df1 = df1.rename(columns={'feature1': 'feature1_1', 'feature2': 'feature2_1'})
    df2 = df1.copy()  # just points
    df1["id_1"] = range(1, len(df) + 1)
    # df_distance, svae the distance between two points
    df_distance = pd.DataFrame(columns=["id_1", "id_2", "distance_ab"])
    b = df2.loc[:, ["feature1_1", "feature2_1"]].values
    for i in range(len(df2)):
        a = df2.loc[i, ["feature1_1", "feature2_1"]].values
        dfpair = df1[['id_1']].copy()
        dfpair["id_2"] = i + 1
        dfpair["distance_ab"] = np.sqrt(np.sum((a - b) ** 2, axis=1))  # compute the distance
        df_distance = pd.concat([df_distance, dfpair])
    # dfnears: save every point with their neighbours set
    dfnears = df_distance[df_distance['distance_ab'] < distance]
    dfnears = dfnears.groupby("id_1").agg({"id_2": [len, set]})
    dfnears.columns = ["neighbours_cnt", "neighbours"]
    dfnears = dfnears[dfnears['neighbours_cnt'] >= neighbours_cnt]
    dfnears = dfnears.reset_index()
    # dfcores:save the neighbors which are also core points
    core_ids = set(dfnears["id_1"].values)
    dfcores = dfnears.copy()
    dfcores["neighbours"] = [x & core_ids for x in dfcores["neighbours"]]
    set_list = list(dfcores["neighbours"])
    # merge the neighboures set
    result = []
    while len(set_list) > 0:
        cur_set = set_list.pop(0)
        intersect_idxs = []
        for i in list(range(len(set_list) - 1, -1, -1)):
            if cur_set & set_list[i]:
                intersect_idxs.append(i)
        while intersect_idxs:
            for idx in intersect_idxs:
                cur_set = cur_set | set_list[idx]
            for idx in intersect_idxs:
                set_list.pop(idx)
            intersect_idxs = []
            for i in list(range(len(set_list) - 1, -1, -1)):
                if cur_set & set_list[i]:
                    intersect_idxs.append(i)
        result = result + [cur_set]
    # core_clusters, give every cluster id
    core_clusters = {i: s for i, s in enumerate(result)}
    # cluster_map, give every point id
    core_map = {}
    for k, v in core_clusters.items():
        core_map.update({vi: k for vi in v})
    cluster_map = {}
    for i in range(len(dfnears)):
        id_a = dfnears["id_1"][i]
        neighbours = dfnears["neighbours"][i]
        cluster_map.update({idx: core_map[id_a] for idx in neighbours})
    # cluster_list give every point cluster_id
    cluster_list = []
    for id_1 in df1["id_1"]:
        cluster_list.append(cluster_map.get(id_1, -1))
    # update df1, add the cluster_id
    df1["cluster_id"] = cluster_list

    # draw picture
    df1.plot.scatter('feature1_1', 'feature2_1', s=100,
                     c=list(df1['cluster_id']), cmap='rainbow', colorbar=False,
                     alpha=0.6, title='Hands DBSCAN Cluster Result ')
    return df1
    

In [5]:
def plotResult(data,x='feature1',y='feature2'):
    clusters_num = set(data['cluster_id'])
    colors = ['#ddff95','#f1ccb8','#cf8878','#f1f1b8','#b8f1cc','#f1707d','#E0EEEE','#66CDAA','#66CDAA']
    count=0
    for c in clusters_num:
        data.loc[data.cluster_id==c, 'colors'] = colors[count]
        count+=1
    scatter = plt.scatter(list(data[x]), list(data[y]), c=list(data['colors']))
#     plt.legend(*scatter.legend_elements(),
#                     loc="lower left", title="Classes")
    plt.show()
    return data

# plotResult(res_0, 'feature1_a', 'feature2_a')
# plotResult(res_1,'feature1_a', 'feature2_a')
# plotResult(res_2,'feature1_a', 'feature2_a')
# plotResult(res_3,'feature1_a', 'feature2_a')
# plotResult(res)

In [6]:


def clustering(x):
    l = []
    global eps
    global minpts
    
    for e in x:
        df = e
        l.append(df[1])
    dfdata = l[0]
    output = Local_DBSCAN(dfdata,eps.value, minpts.value)
    
    return [output]


# res.count()
def merging(res):
    res_0 = res.take(4)[0]
    res_1 = res.take(4)[1]
    res_1['cluster_id'] += (max(res_0['cluster_id'])+1)
    res_2 = res.take(4)[2]
    res_2['cluster_id'] += (max(res_1['cluster_id'])+1)
    res_3 = res.take(4)[3]
    res_3['cluster_id'] += (max(res_2['cluster_id'])+1)
    res = pd.concat([res_0,res_1,res_2,res_3])
    res = res.rename(columns ={'id_a': 'id', 'feature1_a': 'feature1', 'feature2_a':'feature2'})
    res['coord'] = res.apply(lambda x: (x.feature1,x.feature2), axis=1)
    output = res.copy()
    coords = set(output['coord'])
    for coord in coords:
        clusters = set(output.loc[output.coord==coord, 'cluster_id'])
        min_cluster = min(clusters)
        output.loc[output.cluster_id.isin(clusters),'cluster_id'] = min_cluster
    output = output.drop(columns=['id'])
    output = output.drop_duplicates()
    return output

In [5]:
import time
before = time.time()
origin_data = np.array(load_data_label('./spiral.txt'))
train_data = origin_data[:,:2]

preprocess_rdd = dataProcessing(train_data)
preprocess_rdd.glom().collect()
eps = 3
minpts = 2
eps = sc.broadcast(eps)
minpts = sc.broadcast(minpts)
res = preprocess_rdd.mapPartitions(clustering)
output = merging(res)
print(time.time()-before)

[[(0,
       feature1  feature2
   0      18.85     11.10
   1       8.30     18.75
   2       8.05     18.15
   3       7.80     17.55
   4       7.70     16.95
   ..       ...       ...
   93     17.20     11.95
   94     17.50     11.70
   95     17.85     11.50
   96     18.45     11.25
   97     18.15     11.35
   
   [98 rows x 2 columns])],
 [(1,
        feature1  feature2
   0       13.40      4.75
   1       14.15      4.40
   2       14.85      4.10
   3       15.70      3.70
   4       16.55      3.50
   ..        ...       ...
   132     17.50     11.70
   133     17.85     11.50
   134     18.15     11.35
   135     18.85     11.10
   136     18.45     11.25
   
   [137 rows x 2 columns])],
 [(2,
        feature1  feature2
   0        8.05     18.15
   1        7.80     17.55
   2        7.70     16.95
   3        7.55     16.35
   4        7.35     15.75
   ..        ...       ...
   102     15.80     13.50
   103     15.90     13.40
   104     15.95     13.25
   105     

In [None]:

t =plotResult(output)

In [None]:
plt.scatter(list(train_data[:,0]), list(train_data[:,1]) )
#     plt.legend(*scatter.legend_elements(),
#                     loc="lower left", title="Classes")
max_x = max(train_data[:,0])
max_y = max(train_data[:,1])
x1 = [max_x / 2 for i in range(round(max_y)+2)]
y1 = [i for i in range(-1, round(max_y)+1)]
y2 = [max_y / 2 for i in range(round(max_x)+2)]
x2 = [i for i in range(-1, round(max_x)+1)]

x3 = [max_x / 2 + 3 for i in range(round(max_y)+2)]
y3 = [i for i in range(-1, round(max_y)+1)]
y4 = [max_y / 2 + 3 for i in range(round(max_x)+2)]
x4 = [i for i in range(-1, round(max_x)+1)]

x5 = [max_x / 2 - 3 for i in range(round(max_y)+2)]
y5 = [i for i in range(-1, round(max_y)+1)]
y6 = [max_y / 2 - 3 for i in range(round(max_x)+2)]
x6 = [i for i in range(-1, round(max_x)+1)]
plt.plot(x1, y1,color='red')
plt.plot(x2, y2,color='red')
plt.plot(x3, y3,color='black', linestyle=':')
plt.plot(x4, y4,color='black', linestyle=':')
plt.plot(x5, y5,color='black', linestyle=':')
plt.plot(x6, y6,color='black', linestyle=':')
plt.show()
print(time.time()-before)

In [None]:
f = open("time.txt", "a")
f.write(f"{stop-start}")
f.close()