``` shell
python3 bfr.py <input_path> <n_cluster> <out_file1> <out_file2>
```

In [1]:
import findspark
findspark.init()

In [2]:
import sys
import time
import json
import os
import random
from math import sqrt

from pyspark import SparkConf, SparkContext, StorageLevel

In [3]:
a = {1:2, 3:4, 5:6}

In [4]:
random.seed(2)
random.choice(list(a.keys()))

1

In [5]:
SMALL_SAMPLE_RATIO = 0.025
RANDOM_SEED = 1208
LARGE_NUMBER = sys.maxsize * 1.0
EQUAL_THRESHOLD = 0.0001
ALPHA_FOR_MD = 2

In [6]:
input_path = "/Users/markduan/duan/USC_course/USC_APDS/INF553/homework/hw5/data/test1"
n_cluster = 10
out_file1 = "./out_file1_jn.json"
out_file2 = "./out_file2_jn.json"

In [7]:
def getFilesPath(input_path):
    files = os.listdir(input_path)
    files_start_with_slash = [os.path.join(input_path, f) for f in files]
    files_start_with_file = ['file://' + f for f in files_start_with_slash]
    return files_start_with_file

In [8]:
files_complete_path = getFilesPath(input_path)

In [9]:
file_path = files_complete_path[0]

In [10]:
conf = SparkConf() \
    .setAppName("task") \
    .setMaster("local[*]") \
    .set("spark.driver.memory","4g")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

In [11]:
def loadFileRDD(sc, file_path):
    """
    :return rdd - [(idx, (attrs...)), ...] idx: int attr: float
    """
    data_points = sc.textFile(file_path) \
        .map(lambda x: x.split(',')) \
        .map(lambda x: (int(x[0]), tuple(x[1:]))) \
        .mapValues(lambda v: tuple([float(x) for x in v]))
    return data_points

In [12]:
data_points = loadFileRDD(sc,file_path).collect()

In [13]:
d = len(data_points[0][1])
md_threshold = sqrt(d) * ALPHA_FOR_MD

In [14]:
def splitData(data_points, ratio):
    # randomly select samples and the rest data
    random.seed(RANDOM_SEED)
    random.shuffle(data_points)
    num_data = len(data_points)
    num_small_sample = int(num_data * ratio)
    small_sample = data_points[:num_small_sample]
    rest_data = data_points[num_small_sample:]
    return small_sample, rest_data

In [15]:
sample, rest_data = splitData(data_points, SMALL_SAMPLE_RATIO)

In [16]:
def computeDistance(v1, v2):
    # Euclidean Distance
    pingfanghe = 0
    for i in range(len(v1)):
        pingfanghe += (v1[i] - v2[i]) ** 2
    return sqrt(pingfanghe)

In [17]:
def getClosestCentroid(x, centroids_list):
    # x - (idx, (attrs...))
    # centroids_list - [(cluster_tag, (attrs...)), ...]
    shortest_distance = LARGE_NUMBER
    for centroid in centroids_list:
        centroid_attrs = centroid[1]
        dis = computeDistance(x[1], centroid_attrs)
        if dis < shortest_distance:
            shortest_distance = dis
    return (1, (shortest_distance, x))

In [18]:
def getInitialCentroids(data_rdd, size, method="random"):
    if method == "random":
        # random
        centroids_list = data_rdd.map(lambda x: x[1]) \
            .takeSample(False, size, RANDOM_SEED)
        centroids = dict(centroids_list)
        # the key of each centroid is the tag of the cluster
        # {cluster_tag: (attrs...), ...}
        return centroids
    elif method == "kmeans++":
        data_rdd = data_rdd.persist(StorageLevel.MEMORY_AND_DISK)
        centroids_list = []
        for i in range(size):
            if i == 0:
                first_centroid = data_rdd.map(lambda x: x[1]) \
                    .takeSample(False, 1, RANDOM_SEED)
                centroids_list.extend(first_centroid)
            else:
                next_centroid = data_rdd.map(lambda x: x[1]) \
                    .map(lambda x: getClosestCentroid(x, centroids_list)) \
                    .reduceByKey(lambda x, y: y if x[0] < y[0] else x) \
                    .map(lambda x: x[1][1]) \
                    .collect()
                centroids_list.extend(next_centroid)
        centroids = dict(centroids_list)
        return centroids
    else:
        return None
        

In [19]:
def clusterPoint(point, centroids):
    """
    :param - (idx, (cluster_tag, (attrs...)))
    :param - {cluster_tag: (attrs...), ...}
    """
    attrs = point[1][1]
    min_distance = (None, LARGE_NUMBER)
    for c in centroids:
        dis = computeDistance(attrs, centroids[c])
        if dis < min_distance[1]:
            min_distance = (c, dis)
    ctag = min_distance[0]
    return (point[0], (ctag, attrs))

In [20]:
def addList(l1, l2):
    """
    :param list
    """
    return [l1[i] + l2[i] for i in range(len(l1))]

In [21]:
def computeCentroids(points):
    # points: rdd - [(idx, (cluster_tag, (attrs...))), ...]
    new_centroids = points.map(lambda x: x[1]) \
        .map(lambda x: (x[0], (x[1], 1))) \
        .reduceByKey(lambda x, y: (addList(x[0], y[0]), x[1]+y[1])) \
        .mapValues(lambda v: tuple([x / v[1] for x in v[0]])) \
        .collectAsMap()
    return new_centroids

In [22]:
def checkCentroidsChanged(new_centroids, old_centroids, equal_threshold=0.0001):
        if old_centroids == None:
            return True
        for k in old_centroids:
            old_centroid = old_centroids[k]
            new_centroid = new_centroids[k]
            for i in range(len(old_centroid)):
                if abs(old_centroid[i] - new_centroid[i]) > equal_threshold:
                    return True
        return False  

In [23]:
data_rdd = sc.parallelize(sample).map(lambda x: (x[0], x)).persist(StorageLevel.MEMORY_AND_DISK)

In [24]:
initial_centroids = getInitialCentroids(data_rdd, n_cluster, method='kmeans++')

KeyboardInterrupt: 

In [None]:
initial_centroids

In [None]:
def KMeansRDD(sc, data, k_clusters):
    """
    :param data: [(idx, (attrs...)), ...]
    :return rdd: [(idx, (cluster_tag, (attrs...))), ...]
    """
    data_rdd = sc.parallelize(data).map(lambda x: (x[0], x)).persist(StorageLevel.MEMORY_AND_DISK)
    # data_rdd: [(idx, (cluster_tag, (attrs...))), ...]
    
    initial_centroids = getInitialCentroids(data_rdd, k_clusters)
    old_centroids = None
    new_centroids = initial_centroids
    
    go_on = True
    num_iter = 1
    while(go_on):
        points_tags = data_rdd.map(lambda x: clusterPoint(x, new_centroids))

        old_centroids = new_centroids
        new_centroids = computeCentroids(points_tags)
        
        go_on = checkCentroidsChanged(new_centroids, old_centroids, equal_threshold=EQUAL_THRESHOLD)
#         print(num_iter)
        num_iter += 1
        if num_iter >=5:
            break
    
    return points_tags

In [None]:
k_clusters = n_cluster
sample_rdd = KMeansRDD(sc, sample, k_clusters).persist(StorageLevel.MEMORY_AND_DISK)
# kmeans_rdd rdd: [(idx, (cluster_tag, (attrs...))), ...]

In [None]:
def simpleStatistics(X, pre_state=None):
    """
    :param pre_state: list - [0, [0]*d, [0]*d] [N, SUM_vector, SUMSQ_vector]
    """
    X = list(X)
    d = len(X[0]) # dimension
    
    state = None
    if pre_state == None:
        state = [0, [0]*d, [0]*d]
    else:
        state = pre_state
        
    for x in X:
        state[0] += 1
        for i in range(d):
            state[1][i] += x[i]
            state[2][i] += x[i] ** 2
    return state

In [None]:
def getDiscardSets(points):
    """
    :param points: rdd - [(idx, (cluster_tag, (attrs...))), ...]
    :return : dict - {tag: [N, SUM_vector, SUMSQ_vector], ...}
    """
    initial_DS = points.map(lambda x: x[1]) \
        .groupByKey() \
        .mapValues(lambda X: simpleStatistics(X)) \
        .collectAsMap()
    return initial_DS

In [None]:
DS = getDiscardSets(sample_rdd)

In [None]:
def rearrangeResult(res_rdd, RS_idx=None):
    """
    :param res_rdd: rdd - [(idx, (cluster_tag, (attrs...))), ...]
    :return : list - [(idx, tag), ...] idx: int tag: int
    """
    if RS_idx == None:
        return res_rdd.map(lambda x: (x[0], x[1][0])).collect()
    else:
        return res_rdd.filter(lambda x: x[0] not in RS_idx).map(lambda x: (x[0], x[1][0])).collect()

In [None]:
res_DS = rearrangeResult(sample_rdd, None)

In [None]:
k_clusters = n_cluster * 5
rest_rdd = KMeansRDD(sc, rest_data, k_clusters).persist(StorageLevel.MEMORY_AND_DISK)

In [None]:
def getCSandRS(points):
    """
    :param points: rdd - [(idx, (cluster_tag, (attrs...))), ...]
    """
    RS_idx = points.map(lambda x: (x[1][0], x[0])) \
        .groupByKey() \
        .map(lambda x: (x[0], list(x[1]))) \
        .filter(lambda x: len(x[1]) == 1) \
        .flatMap(lambda x: x[1]) \
        .collect()
    
    initial_CS = points.filter(lambda x: x[0] not in RS_idx) \
        .map(lambda x: x[1]) \
        .groupByKey() \
        .mapValues(lambda X: simpleStatistics(X)) \
        .collectAsMap()

    initial_RS = points.filter(lambda x: x[0] in RS_idx) \
        .collect()
    return RS_idx, initial_CS, initial_RS

In [None]:
RS_idx, CS, RS = getCSandRS(rest_rdd)

In [None]:
res_CS = rearrangeResult(rest_rdd, RS_idx)

In [None]:
file_path = files_complete_path[1]

In [None]:
file_path

In [None]:
data_points = loadFileRDD(sc, file_path).collect()

In [None]:
point = data_points[0]

In [None]:
point

In [None]:
def calculateCentroidAndDeviation(cluster):
    """
    :param cluster : list - [N, SUM_vector, SUMSQ_vector] could be any cluster in DS or CS
    :return : list - [avg_vector, sd_vector]
    """
    n = cluster[0]
    su_v = cluster[1]
    sumsq_v = cluster[2]
    
    avg_v = []
    sd_v = []
    for i in range(len(su_v)):
        su = su_v[i]
        sumsq = sumsq_v[i]
        
        avg = su / n
        sd = sqrt((sumsq / n) - (su / n) ** 2)
        
        avg_v.append(avg)
        sd_v.append(sd)
    return [avg_v, sd_v]

In [None]:
DS_stt = {}
for ctag in DS:
    DS_stt[ctag] = calculateCentroidAndDeviation(DS[ctag])

In [None]:
DS_stt

In [None]:
def mahalanobisDistancePointCluster(p_attrs, cluster_stt):
    avg_v = cluster_stt[0]
    sd_v = cluster_stt[1]
    d = len(p_attrs)
    vsq = 0
    for i in range(d):
        vsq += ((p_attrs[i] - sd_v[i]) / avg_v[i]) ** 2
    return sqrt(vsq)

In [None]:
idx = point[0]
attrs = point[1]

closest_one = [None, LARGE_NUMBER]
for ctag in DS_stt:
    md = mahalanobisDistancePointCluster(attrs, DS_stt[ctag])
    if md < closest_one[1]:
        closest_one = [ctag, md]
ds_tag = closest_one[0]

In [None]:
def updateCluster(tag, p_attrs, clusters, clusters_stt):
    sim_stt = clusters[tag]
    stt = clusters_stt[tag]
    d = len(p_attrs)
    
    # update simple statistics
    n = sim_stt[0] + 1
    su = [sim_stt[1][i] + p_attrs[i] for i in range(d)]
    sumsq = [sim_stt[2][i] + p_attrs[i] ** 2 for i in range(d)]
    new_cluster = [n, su, sumsq]
    clusters[tag] = new_cluster
    
    clusters_stt[ctag] = calculateCentroidAndDeviation(new_cluster)
    return clusters, clusters_stt

In [None]:
if closest_one[1] < md_threshold:
    res_DS.append((idx, ds_tag))
    DS, DS_stt = updateCluster(ds_tag, attrs, DS, DS_stt)
else:
    # check cs
    pass

In [None]:
a = random.sample(range(10),4)
b = random.sample(range(10),4)

In [None]:
a = {1,2,3}
b = {3,2,1}

In [None]:
with open('/Users/markduan/duan/USC_course/USC_APDS/INF553/homework/hw5/data/cluster1.json', 'r') as fp:
    a = json.load(fp)

In [None]:
a = {1:set([1,2,3]), 2:[4,5,7.4]}
b = {1:set([3,2,1]), 2:[4,5,7.4]}

In [None]:
a == b

In [None]:
lc = [1,2,3,4,5,6,7,8,9]
random.seed(1)
random.shuffle(lc)
lc

In [None]:
for file in input_path:
    data_points = load(file)
    if first round:
        run K-Means for initialization
    else:
        run BFR(data_points)

In [None]:
def loadFile(file_path):
    """
    :return lines: [[0, 1.2, -4.5, ...], ...]
    """
    with open(file_path, 'r', encoding='utf-8') as fp:
        lines = []
        for line in fp.readlines():
            # discard '\n' at each line's end
            if line[-1] == '\n':
                line = line[:-1]
            line_list = line.split(',')
            for i, item in enumerate(line_list):
                if i == 0:
                    line_list[i] = item
                else:
                    line_list[i] = float(item)
            lines.append(line_list)
    return lines

In [None]:
(6 / 2) - (9 / 3) ** 2