In [2]:
from dbscan_parallel import *

24/05/15 16:28:51 WARN Utils: Your hostname, Yzeys-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.30.192.255 instead (on interface en0)
24/05/15 16:28:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/05/15 16:28:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
def _bounds_coordinates(bin_bounds):

    lower_cdnts = [[low] for low in  bin_bounds[0][:-1]]
    upper_cdnts = [[high] for high in bin_bounds[0][1:]]
    
    # super stupid implementation, optimization needed
    for bound in bin_bounds[1:]:
        lower_tmp = []
        upper_tmp = []
        
        for bc in bound[:-1]:
            lower_tmp.extend([lc + [bc] for lc in lower_cdnts])
        lower_cdnts = lower_tmp
        
        for bc in bound[1:]:
            upper_tmp.extend([uc + [bc] for uc in upper_cdnts])
        upper_cdnts = upper_tmp
        
    return np.array(lower_cdnts), np.array(upper_cdnts)

@timeit
def spatial_partition(dataset, par_each_dim, eps, withbuffer=True):
    """
    Partitions the given dataset into spatial bins based on the specified parameters.

    Args:
        dataset (numpy.ndarray): The input dataset to be partitioned.
        par_each_dim (tuple): A tuple specifying the number of partitions in each dimension.
        eps (float): A value representing the maximum distance between two samples.

    Returns:
        pyspark.rdd.RDD: An RDD containing the indexed data, where each element is a tuple
        of the partition ID and a list of point IDs belonging to that partition.
    """
    tp_par = par_each_dim
    num_par = np.prod(par_each_dim)
    
    # cut bins
    bounds_dim = np.concatenate(([np.min(dataset, axis=0)], [np.max(dataset, axis=0)]), axis=0).T   # (d, 2)
    
    bin_bounds = []
    for i in range(len(tp_par)):
        dim_bins = np.linspace(*bounds_dim[i], tp_par[i]+1, endpoint=True)
        bin_bounds.append(dim_bins)
    
    lower_bounds, upper_bounds = _bounds_coordinates(bin_bounds)
    if withbuffer:
        lower_bounds -= eps
        upper_bounds += eps
        if np.min(upper_bounds-lower_bounds) < 2*eps:
            raise Warning('Partitions Overlap too much')

    # scatter points into bins with eps
    indexed_data = []
    # double loop to ensure border points could be given multiple partition ID
    for id_pts in range(len(dataset)):     # index of point in dataset
        for id_ptt in range(num_par):
            if not (dataset[id_pts] > lower_bounds[id_ptt]).all():
                continue
            if not (dataset[id_pts] < upper_bounds[id_ptt]).all():
                continue
            indexed_data.append([id_ptt, id_pts])
            
    res = sc.parallelize(indexed_data).groupByKey().map(lambda x: [x[0], list(x[1])])
    return res


In [3]:
from sklearn.datasets import make_blobs

# Generate random dataset with 3 dimensions
n_samples = 100  # Number of samples
n_features = 3  # Number of dimensions
centers = 3  # Number of clusters
random_state = 42  # Random state for reproducibility

X, y = make_blobs(n_samples=n_samples, n_features=n_features, centers=centers, random_state=random_state)

In [4]:
def _get_dist(m, a, b) -> float:
    """
    for float comparison, set all distance value precision to 5
    :param: a: int; index of given point in data matrix
    :param: b: same as a
    """
    result = np.sqrt(np.power(m[b] - m[a], 2).sum())

    return round(result, 5)

def _get_distance_matrix(dataset):
    """
    Only once calculation will be on each point-pairs
    results will be stored in self.dist_m
    """
    num_p = dataset.shape[0]
    dist_m = np.zeros((num_p, num_p))
    for p_id in range(num_p):
        for q_id in range(p_id, num_p):
            dist = _get_dist(dataset, p_id, q_id)
            dist_m[q_id, p_id] = dist
            dist_m[p_id, q_id] = dist
    
    return dist_m

def _get_neighbors(i, dist_m, eps):
    # get neighbors of point i
    return np.where(dist_m[i] < eps)[0]



In [5]:
# set parameters
min_pts = 18
eps = 2
dataset = X
b_dataset = sc.broadcast(dataset)
b_eps = sc.broadcast(eps)
b_min_pts = sc.broadcast(min_pts)
partition_each_dim = (2, 2, 2)


In [6]:
# partition data points
partitioned_rdd = spatial_partition(X, partition_each_dim, eps)


spatial_partition time cost: 413.2580757141113ms


In [7]:
# in each partition, create grid cells

# calculate grid number for each dimension in one partition
def cal_grid_num(data, eps):
    grid_side_len = eps/np.sqrt(data.shape[1])
    gridnum_each_dim = [int((np.max(data[:, i]) - np.min(data[:, i])) / grid_side_len) for i in range(data.shape[1])]
    return gridnum_each_dim

# test for one partition
p_ind = partitioned_rdd.take(1)[0][1]
p_data = X[p_ind]
gridnum_each_dim = cal_grid_num(p_data, eps)
grid_rdd = spatial_partition(p_data, gridnum_each_dim, eps) # key: grid index, value: data points in this grid

                                                                                

spatial_partition time cost: 124.79901313781738ms


In [8]:
# find core points in each partition

dist_m_par1 = _get_distance_matrix(p_data)
neighbors = [_get_neighbors(i, dist_m_par1, eps) for i in range(p_data.shape[0])]
is_corep = [1 if (len(p) >= min_pts) else 0 for p in neighbors]


In [11]:
# find core grid cells in each partition
def find_core_grid(grid_rdd, eps, min_pts):
    core_grid = []
    for grid, data_ind in grid_rdd.collect():
        data = X[data_ind]
        dist_m = _get_distance_matrix(data)
        neighbors = [_get_neighbors(i, dist_m, eps) for i in range(data.shape[0])]
        is_core = [1 if (len(p) >= min_pts) else 0 for p in neighbors]
        if sum(is_core) > 0:
            core_grid.append(grid)
    return core_grid

core_grid = find_core_grid(grid_rdd, eps, min_pts)

                                                                                

In [13]:
core_grid = []
for grid, data_ind in grid_rdd.collect():
    print(grid, data_ind)
    data = X[data_ind]
    dist_m = _get_distance_matrix(data)
    neighbors = [_get_neighbors(i, dist_m, eps) for i in range(data.shape[0])]
    is_core = [1 if (len(p) >= min_pts) else 0 for p in neighbors]
    if sum(is_core) > 0:
        core_grid.append(grid)


0 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 24, 25, 26, 27, 28, 29, 30, 31, 32]
8 [1, 4, 5, 7, 8, 10, 11, 12, 14, 15, 17, 19, 20, 21, 23, 24, 25, 26, 29, 32]
16 [1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 28, 29, 30, 31, 32]
1 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 24, 25, 26, 27, 28, 29, 30, 31, 32]
9 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 24, 25, 26, 27, 28, 29, 30, 31, 32]
17 [1, 4, 5, 7, 8, 10, 11, 12, 14, 15, 17, 19, 20, 21, 23, 24, 25, 26, 29, 32]
2 [0, 1, 2, 4, 5, 7, 8, 10, 11, 12, 14, 15, 17, 19, 20, 21, 24, 25, 26, 27, 29, 32]
10 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 24, 25, 26, 27, 28, 29, 30, 31, 32]
3 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
11 [0, 1, 2, 4, 5, 7, 8, 10, 11, 12, 14, 15, 

                                                                                

In [7]:
@staticmethod
def local_dbscan(partioned_rdd, method='matrix', metric='euclidian'):

    dataset = np.array([b_dataset.value[idp] for idp in partioned_rdd])
    if method == 'matrix':
        dbscan_obj = MatrixDBSCAN(dataset, b_eps.value, b_min_pts.value, metric) 
    else:
        dbscan_obj = NaiveDBSCAN(dataset, b_eps.value, b_min_pts.value, metric) 
    dbscan_obj.predict()
    is_core_list = dbscan_obj._find_core_pts()
    
    return list(zip(partioned_rdd, is_core_list, dbscan_obj.tags))


local_tags = rdd.mapValues(lambda x: local_dbscan(x, method='matrix', metric='euclidian')).collect()

predict time cost: 0.9989738464355469ms                             (0 + 8) / 8]
predict time cost: 0.16808509826660156ms
predict time cost: 2.4399757385253906ms
predict time cost: 11.595964431762695ms
predict time cost: 14.004945755004883ms
predict time cost: 3.5741329193115234ms
predict time cost: 0.10275840759277344ms
predict time cost: 8.532047271728516ms
predict time cost: 0.09703636169433594ms
predict time cost: 65.52505493164062ms>                             (4 + 4) / 8]
predict time cost: 34.03902053833008ms
predict time cost: 0.8440017700195312ms
predict time cost: 108.0329418182373ms
predict time cost: 109.83109474182129ms
predict time cost: 0.06985664367675781ms
predict time cost: 0.19621849060058594ms
predict time cost: 92.9727554321289ms
predict time cost: 11.018037796020508ms
predict time cost: 5.685091018676758ms
predict time cost: 33.27608108520508ms
                                                                                

In [8]:
local_tags

[(24,
  [(0, 0, 2),
   (1, 1, 1),
   (2, 1, 1),
   (7, 0, 1),
   (15, 0, 2),
   (19, 1, 1),
   (20, 1, 1),
   (22, 1, 2),
   (24, 1, 1),
   (26, 1, 1),
   (27, 0, 2),
   (29, 1, 1),
   (33, 1, 2),
   (34, 0, 2),
   (35, 0, 2),
   (36, 1, 1),
   (38, 1, 1),
   (39, 1, 1),
   (40, 0, 2),
   (42, 1, 1),
   (43, 1, 1),
   (45, 0, 1),
   (47, 0, 2),
   (49, 1, 1),
   (52, 1, 1),
   (53, 0, 2),
   (55, 0, 2),
   (58, 0, 2),
   (59, 1, 1),
   (61, 0, 2),
   (63, 1, 2),
   (64, 1, 2),
   (65, 1, 1),
   (68, 1, 1),
   (71, 1, 1),
   (72, 0, 2),
   (74, 0, 1),
   (75, 0, 2),
   (76, 1, 1),
   (77, 1, 1),
   (80, 1, 1),
   (82, 1, 1),
   (83, 1, 2),
   (84, 1, 1),
   (85, 1, 1),
   (92, 1, 1),
   (93, 1, 1),
   (94, 0, 1),
   (96, 0, 2),
   (97, 0, 2),
   (98, 0, 1),
   (99, 1, 1)]),
 (16,
  [(2, 0, -2),
   (3, 1, 1),
   (8, 1, 1),
   (13, 1, 1),
   (15, 1, 1),
   (17, 1, 1),
   (19, 0, -2),
   (21, 1, 1),
   (22, 1, 1),
   (25, 1, 1),
   (27, 1, 1),
   (33, 1, 1),
   (34, 1, 1),
   (35, 1, 1),
 

In [9]:
@timeit
def merge(local_tags, dataset):
    global_tags = [UNKNOWN] * len(dataset)
    is_tagged = [0] * len(dataset)
    last_max_label = 0
    for local in local_tags:
        np_local = np.array(local[-1])
        np_local[:, -1] += last_max_label

        last_max_label = np.max(np_local[:, -1])
        
        # check and merge overlapped points
        tagged_indices = np.nonzero(is_tagged)[0]
        for tmp_i in range(len(np_local)):
            # should do tag check
            p_id, is_core, label = np_local[tmp_i]
            if p_id in tagged_indices and is_core==1:
                np_local[-1][np_local[-1]==label] = global_tags[p_id]
        
        # update global tags
        for p_id, is_core, label in np_local:
            if is_tagged[p_id]==1:
                continue
            global_tags[p_id] = label
            is_tagged[p_id] = 1
    return global_tags

result_tags = merge(local_tags, dataset)


merge time cost: 7.142066955566406ms


In [10]:
len(result_tags)

100

In [11]:
rdd.collect()

[[24,
  [0,
   1,
   2,
   7,
   15,
   19,
   20,
   22,
   24,
   26,
   27,
   29,
   33,
   34,
   35,
   36,
   38,
   39,
   40,
   42,
   43,
   45,
   47,
   49,
   52,
   53,
   55,
   58,
   59,
   61,
   63,
   64,
   65,
   68,
   71,
   72,
   74,
   75,
   76,
   77,
   80,
   82,
   83,
   84,
   85,
   92,
   93,
   94,
   96,
   97,
   98,
   99]],
 [16,
  [2,
   3,
   8,
   13,
   15,
   17,
   19,
   21,
   22,
   25,
   27,
   33,
   34,
   35,
   36,
   38,
   40,
   45,
   47,
   49,
   52,
   53,
   55,
   56,
   63,
   64,
   68,
   71,
   72,
   75,
   76,
   78,
   80,
   81,
   82,
   83,
   84,
   85,
   87,
   89,
   91,
   93,
   94,
   96,
   97]],
 [25,
  [0,
   2,
   3,
   8,
   12,
   13,
   15,
   17,
   19,
   21,
   22,
   25,
   27,
   33,
   34,
   35,
   36,
   38,
   40,
   41,
   45,
   47,
   49,
   51,
   52,
   53,
   55,
   56,
   58,
   61,
   63,
   64,
   68,
   71,
   72,
   75,
   76,
   78,
   80,
   81,
   82,
   83,
   84,
   85,
  

In [16]:
from sklearn.cluster import DBSCAN

# Create an instance of the DBSCAN algorithm
dbscan = DBSCAN(eps=eps, min_samples=min_pts, metric='euclidean')

# Fit the DBSCAN algorithm to the data
labels = dbscan.fit_predict(X)


In [18]:
from sklearn.metrics import adjusted_mutual_info_score, adjusted_rand_score


# Calculate the Mutual Information (MI) score
mi_score = adjusted_mutual_info_score(result_tags, labels)

# Calculate the Adjusted Rand Index (ARI)
ari_score = adjusted_rand_score(result_tags, labels)

print("Mutual Information (MI) score:", mi_score)
print("Adjusted Rand Index (ARI):", ari_score)


Mutual Information (MI) score: 0.8014438842685137
Adjusted Rand Index (ARI): 0.7146010924553108


In [19]:
labels

array([0, 1, 1, 0, 2, 2, 2, 1, 0, 2, 2, 2, 0, 0, 2, 0, 2, 0, 2, 1, 1, 0,
       0, 2, 1, 0, 1, 0, 2, 1, 2, 2, 2, 0, 0, 0, 1, 2, 1, 1, 0, 0, 1, 1,
       2, 1, 2, 0, 2, 1, 2, 0, 1, 0, 2, 0, 0, 2, 0, 1, 2, 0, 2, 0, 0, 1,
       2, 2, 1, 2, 2, 1, 0, 2, 1, 0, 1, 1, 0, 2, 1, 0, 1, 0, 1, 1, 2, 0,
       2, 0, 2, 0, 1, 1, 1, 2, 0, 0, 1, 1])

In [21]:
np.array(result_tags)

array([2, 1, 1, 3, 6, 8, 6, 1, 3, 6, 6, 6, 4, 3, 6, 2, 6, 3, 8, 1, 1, 3,
       2, 6, 1, 3, 1, 2, 8, 1, 8, 6, 6, 2, 2, 2, 1, 6, 1, 1, 2, 4, 1, 1,
       8, 1, 6, 2, 6, 1, 6, 4, 1, 2, 8, 2, 3, 6, 2, 1, 6, 2, 6, 2, 2, 1,
       8, 8, 1, 6, 8, 1, 2, 8, 1, 2, 1, 1, 3, 6, 1, 3, 1, 2, 1, 1, 6, 3,
       6, 3, 6, 3, 1, 1, 1, 6, 2, 2, 1, 1])