# Mahtab Nejati 98209434

# Problem 10

# NOTICE:
# DO NOT USE FOR GRAY SCALE PICS (Use code for Problem 09 instead)
# Download the "pics" folder from the google drive link and save it in the current directory as the input.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
MAX_MEMORY = "8g"
spark = SparkSession.builder.appName('App Name').master("local[*]").config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY).getOrCreate()
sqlc = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

In [2]:
from PIL import Image
from numpy import sqrt, square, floor,random
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from time import time
from os import listdir,makedirs
from os.path import isfile, join
import pickle

# Reading in an image file and converting to rdd

In [3]:
def getResizedImage(image):
    w = 250
    alpha = float(w)/image.size[0]
    h = int((float(image.size[1])*float(alpha)))
    img = image.resize((w,h), Image.ANTIALIAS)
    return img

def getRGBImage(image_path):
    img = Image.open(image_path).convert("RGB")
    img = getResizedImage(img)
    pix = img.load()
    img_w = img.size[0]
    img_h = img.size[1]
    to_rdd = []
    for i in range(img_w):
        for j in range(img_h):
            to_rdd.append([i,j,(pix[i,j])])

    img_rdd = sc.parallelize(to_rdd)
    
    return img,img_rdd

# Euclidean Distance

In [4]:
def getDistance(a,b):
    arrA = np.asanyarray(a)
    arrB = np.asanyarray(b)
    return sqrt(square(arrA-arrB).sum())

# Choosing the initial centroids
### I have chosen to initialize the K centorids by selecting pixels scattered on th image.

In [5]:
def initMeans(k,image):
    pix = image.load()
    img_w = image.size[0]
    img_h = image.size[1]
    
    f = int(floor(sqrt(k)))

    means = []

    w_step = int(floor(img_w/f))
    h_step = int(floor(img_h/f))
    for i in range(f):
        for j in range(f):
            w = int(floor((i+0.5)*w_step))
            h = int(floor((j+0.5)*h_step))
            new = pix[w,h]
            if new not in means:
                means.append(new)

    count = 0
    while(len(means) != k):
        count+=1
        w = random.randint(0,img_w)
        h = random.randint(0,img_h)
        new = pix[w,h]
        if new not in means or count <= 4*(k-square(f)):
            means.append(new)
    return means

# K-means algorithm

In [6]:
def assignToCluster(row,means):
    cluster = 0
    dist = getDistance(row[-1],means[0])
    for i in range(1,len(means)):
        if getDistance(row[-1],means[i]) < dist:
            cluster = i
            dist = getDistance(row[-1],means[i])
    return (cluster),tuple(row)


def kMeans(rdd,initMeans,k,limit=10):
    newMeans = initMeans
    done = False
    count = 0
    while not done or count<=limit:
        means = newMeans
        assigned = rdd.map(lambda row: assignToCluster(row,means))
        newMeans = assigned.map(
            lambda row: (row[0],np.asarray(list(row[1][-1])+[1]))).reduceByKey(
            lambda a,b: a.copy()+b.copy()).map(
            lambda row: tuple((row[1][:-1].copy().astype(np.float)/row[1][-1]).astype(np.int))).collect()
        done = set(means) == set (newMeans)
        count += 1
        
    newImg = assigned.map(lambda row : (row[1][0],row[1][1],row[0])).collect()
    return newImg,means

# Create compressed image

In [7]:
def getCompressedImage(w,h,newData):
    newImg = np.zeros((w,h))
    for d in newData:
        newImg[d[0],d[1]]=d[2]
    return newImg

# Recover compressed image

In [8]:
def getRecoveredImage(compressed,colors):
    h = compressed.shape[0]
    w = compressed.shape[1]
    recovered = []
    for i in range(w):
        for j in range(h):
            recovered.append(np.array(colors[int(compressed[j,i])]))
    recovered = np.reshape(recovered,(w,h,3))
    recovered = Image.fromarray(recovered.astype('uint8'), 'RGB')
    return recovered

# Calculate cost

In [9]:
def getCost(rdd,means):
    cost = rdd.map(
        lambda row: assignToCluster(row,means)).map(
        lambda row: getDistance(row[1][2],means[row[0]])).sum()
    return cost

# Execution on images

In [10]:
def executeOnImages(ks,images):
    results = {}
    for image,img_path in images.items():
        img, rdd = getRGBImage(img_path)
        results[image] = {'img':img,
                          'k':[],
                          'exec_time':[],
                          'cost':[],
                          'comp_data':[],
                          'rec_img':[]}
        for k in ks:
            results[image]['k'].append(k)
            start = time()
            means = initMeans(k,img)
            imgData , centroids = kMeans(rdd,means,k)
            end = time()
            results[image]['exec_time'].append(end-start)
            compressedImg = getCompressedImage(img.size[0],img.size[1],imgData)
            results[image]['comp_data'].append(compressedImg)
            results[image]['rec_img'].append(getRecoveredImage(compressedImg,centroids))
            results[image]['cost'].append(getCost(rdd,centroids))
    return results

# NOTICE:
### Runing the cell below will execute the algorithm for all the RGB images in directory './pics/rgb/' (6 images in total) with 10 different numbers of clusters ( k=1,2,...,9,10). 
### This will take quite some time to run (and produce enough heat to keep your room warm throughout winter. Thus, not recommended for execution during spring time.)
### I have pickled the results into the file 'HW2_P10_MahtabNejti_98209434_Pickled' for further use. 
### You can customize the list 'ks' to test the algorithm with values of your own choice for k. 
### Also you can limit the number of images by setting value of n to the number of images you wish to work with.
### Skip the following cell and execute the next one to load data in or uncomment the code in the cell and execute.

In [11]:
# # set the variable below to a list contaning all the values of k you with to test the algorithm with
# ks = list(range(1,11))

# # set the variable below to the number of images (<7) you wish to test the algorithm with
# m = 6
# images = sorted([f for f in listdir('./pics/rgb/') if isfile(join('./pics/rgb/', f))][:m])
# imagesDict = dict(zip(images,['./pics/rgb/'+f for f in images]))
# results = executeOnImages(ks,imagesDict)
# with open('HW2_P10_MahtabNejti_98209434_Pickled','wb') as f:
#     pickle.dump(results,f)

In [12]:
images = sorted([f for f in listdir('./pics/rgb/') if isfile(join('./pics/rgb/', f))])
with open('HW2_P10_MahtabNejti_98209434_Pickled','rb') as f:
    results = pickle.load(f)

# Saving the outputs

In [13]:
# for j in range(len(images)):
#     info = results[images[j]]
#     dir_name = './HW2_P10_MahtabNejti_98209434_Results/pic_'+str(j+1)+'/'
#     makedirs(dir_name,exist_ok=True)
#     info['img'].save(dir_name+'orig.png')
#     w=250
#     h= info['img'].size[1]
#     if w-h > 50:
#         size = (12,12*(h/w))
#     elif h-w >50:
#         size = (12*(w/h),12)
#     else:
#         size = (12,12)
#     columns = 3
#     rows = 3
#     fig = plt.figure(figsize = size)
#     for i in range(1,len(info['k'])):
#         res = info['rec_img'][i]
#         fig.add_subplot(rows, columns,i)
#         plt.imshow(res)
#         plt.axis('off')
#         plt.title('K='+str(i+1))
#         fig.tight_layout()
#         plt.savefig(dir_name+'all_k.png')
#     plt.show()
#     fig = plt.figure(figsize=(10,4))
#     columns = 2
#     rows = 1
#     fig.add_subplot(rows,columns,1)
#     plt.plot(info['k'],info['cost'])
#     plt.xlabel('Num of Clusters (K)')
#     plt.ylabel('Cost')
#     plt.title('Cost to K')
#     fig.add_subplot(rows,columns,2)
#     plt.plot(info['k'],info['exec_time'])
#     plt.xlabel('Num of Clusters (K)')
#     plt.ylabel('Execution Time (sec)')
#     plt.title('Time to K')
#     fig.tight_layout()
#     plt.savefig(dir_name+'cost_time.png')
#     plt.show()