In [134]:
# 1. Initialize Spark related
import pyspark
from pyspark.sql import SparkSession
from PySpark_utils import *
from tqdm import tqdm

spark = SparkSession.builder.master("local[4]") \
    .appName('PySparkKMeansApp') \
    .getOrCreate()

In [135]:
# 2. Load data
# Create data as RDD, C1/C2 as native lists

In [136]:
# Data

X_df = spark.read.text("../hw2-bundle/kmeans/data/data.txt")
X_rdd = X_df.rdd.map(lambda l: l[0])  # This will take the first value of each rowData, in our case, the string line
X_rdd = X_rdd.map(lambda l: l.split(' '))
X_rdd = X_rdd.map(lambda l: string_list_to_float(l)).cache()


In [137]:
# C1 and C2
def load_data(path):
    with open(path, 'r') as f:
        line = f.readline()
        data = []
        while line:
            tmp_list = line.split(' ')
            # len(tmp_list) should be M
            for i in range(len(tmp_list)):
                tmp_list[i] = float(tmp_list[i])
            data.append(tmp_list)
            line = f.readline()
    return data
C1 = load_data("../hw2-bundle/kmeans/data/c1.txt")
C2 = load_data("../hw2-bundle/kmeans/data/c2.txt")


In [140]:

def partition_and_calc_cost(rdd, C):
    """
    rdd: [0.0, 0.64, 0.64, 0.0,...]
    
    ret: ([0.0, 0.64, 0.64, 0.0,...], best_parition_idx, min_cost)
    """

    part_idx = -1
    min_dist = float('inf')
    for p in range(len(C)):
        tmp_dist = 0.0
        for i in range(len(rdd)):
            tmp_dist += (rdd[i]-C[p][i])**2  

        if tmp_dist < min_dist:
            part_idx = p
            min_dist = tmp_dist

    return (rdd, part_idx, min_dist)

def recalc_centroid(all_points):
    """
    all_points: a list of points [ [x11, x12 ... x158], ...]
    """
    num_point = len(all_points)
    num_dimension = len(all_points[0])
    
    sum_list = [0.0 for i in range(num_dimension)]
    for i in range(num_point):
        for j in range(num_dimension):
            sum_list[j] += all_points[i][j]
        
    return [x / num_point for x in sum_list]

# 3. Calculate initial cost and compute centroid (i.e., the 0th iteration)
X_ext_rdd = X_rdd.map(lambda x : partition_and_calc_cost(x, C1))
init_cost = X_ext_rdd.map(lambda x: x[2]).sum()

# Group points by partition index
Partition_rdd = X_ext_rdd.map(lambda x: (x[1], x[0])).groupByKey().mapValues(list)

# For each partition, calculate new centroid
Centroid_rdd = Partition_rdd.mapValues(lambda p: recalc_centroid(p)).map(lambda x: x[1])   
new_C1 = Centroid_rdd.collect()

# 4. Repeat for each iteration
all_cost_list = [init_cost]

for it in tqdm(range(20)):
    X_ext_rdd = X_rdd.map(lambda x : partition_and_calc_cost(x, new_C1))
    tmp_cost = X_ext_rdd.map(lambda x: x[2]).sum()
    all_cost_list.append(tmp_cost)

    # Group points by partition index
    Partition_rdd = X_ext_rdd.map(lambda x: (x[1], x[0])).groupByKey().mapValues(list)

    # For each partition, calculate new centroid
    Centroid_rdd = Partition_rdd.mapValues(lambda p: recalc_centroid(p)).map(lambda x: x[1])   
    new_C1 = Centroid_rdd.collect()

100%|█████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:25<00:00,  1.25s/it]


In [139]:
# 5. plot

print(all_cost_list)

[623660345.3064234, 509862908.29754597, 485480681.8720084, 463997011.6850107, 460969266.572994, 460537847.98277014, 460313099.65354246, 460003523.8894068, 459570539.3177353, 459021103.3422901, 458490656.1919807, 457944232.5879742, 457558005.1986797, 457290136.3523032, 457050555.0595638, 456892235.6153574, 456703630.7370357, 456404203.0189769, 456177800.54199505, 455986871.0273484, 455729268.35514736]


In [118]:

def plot_cost(all_cost1, all_cost2):
    fig, axs = plt.subplots(2)
    fig.suptitle('Cost vs Iteration')

    plt.setp(axs, xticks=[0, 5, 10, 15, 20], xticklabels=['0', '5', '10', '15', '20'])

    idx = [i for i in range(len(all_cost1))]
    axs[0].plot(idx, all_cost1)
    axs[0].set_title('C1.txt')
    axs[1].plot(idx, all_cost2)
    axs[1].set_title('C2.txt')

    fig.tight_layout()
    plt.savefig("test.png")