In [1]:
import sys, ast, time, pickle, redis
import numpy as np
from pykafka import KafkaClient
from pykafka.partitioners import hashing_partitioner
from scipy.spatial import distance


# Mapper

In [28]:
def get_data_from_kafka(window):

    start_consumption = time.time()
    Nmessages = 0
    data = []
    while time.time() - start_consumption < window:
        message = consumer.consume(block=True) 
        if message!=None:
            message = message.value
            data_np = np.array(ast.literal_eval(message))
            data.append(data_np)
            Nmessages+=1  # save that
        else:
            break

    return data

In [175]:
def get_clusters():

    serialized_clusters =  r.get('means')
    return pickle.loads(serialized_clusters)


def save_sums_to_redis(partial_sums):
    r.rpush('partial_sums',pickle.dumps(partial_sums))
    return

def calculate_distances(elements,centroids):      # np.array of 3-d data

    return distance.cdist(elements, centroids, 'euclidean') # row: elements , column :centroids 


def find_partial_sums(dist, centroids,elements):

    #ncentroids = centroids.shape[0]
    dtype = str(centroids.shape) + 'float64,(' + str(centroids.shape[0]) + ',1)float32'
    sum_centroids =  np.zeros(1, dtype=dtype)    # first column is the sum of centroids 2nd is the number of elements
                                                 #access sum of centroids [0][0]
                                                 # acess sum of elements: [0][1]
    centroid_pos = np.argmin(dist, axis=1)  #  index: element id - value:  closest centroid_id

    ## sum all distances of each cluster 
    for i in  xrange(len(elements)):
        centroid = centroid_pos[i]
        sum_centroids[0][0][centroid] += elements[i]  # add also number of elements
        sum_centroids[0][1][centroid] +=1  # added one element to that cluster

    return  sum_centroids


In [35]:
## settings

zkKafka = 'localhost:2181'
broker = 'localhost:9092'
redis_host = 'localhost'
kafka_messages = 10000
window = 10000  # in miliseconds 

In [36]:
client = KafkaClient(hosts=broker)
topic = client.topics['Throughput']
consumer = topic.get_simple_consumer(reset_offset_on_start=True,consumer_timeout_ms=10000)
r = redis.StrictRedis(host=redis_host, port=6379, db=0)

In [191]:
%%time
## main function
#data_batch = get_data_from_kafka(window)

#process
centroids = get_clusters()
elements = np.concatenate(data_batch,axis=0) # fix shape
elements = np.concatenate(data_batch,axis=0).reshape(elements.size/3,3)[0]
dist = calculate_distances(data_batch[0], centroids)
partial_sums = find_partial_sums(dist, centroids, elements)
save_sums_to_redis(partial_sums)

CPU times: user 3.84 ms, sys: 1.47 ms, total: 5.3 ms
Wall time: 4.05 ms


## Reducer

In [199]:
import numpy as np
import pickle
import sys
import redis



def get_and_aggregate_partial_sums_from_redis(clusters):
    """
    - Each worker is writing the partial sums to redis server and now I
    - Aggregate the partial sums from all the workers, in order to calculate the new centroids
    """
    aggregated_sums_of_elements = np.zeros(clusters.shape)  # agggregate sum for each 
                                                             #centroid - shape[1] is dim of element
    n_elements_per_cluster = np.zeros((clusters.shape[0], 1))   # number of 
                                                            #elements that belongs to each centroid
    entries = r.llen('partial_sums')    # number of entries from workers

    for i in xrange(entries):
        serialized_value = r.lindex('partial_sums', i)
        apartial_sum = pickle.loads(serialized_value)
        n_elements_per_cluster = apartial_sum[0][0]   
        aggregated_sums_of_elements += apartial_sum[0][1]  ## adds up the element to all correct cluster 

    # delete entries
    r.delete('partial_sums')

    return  (aggregated_sums_of_elements, n_elements_per_cluster)



def find_new_centers(data):
    """
    Add docstring
    """
    return   np.divide(data[0], data[1]) 


def save_clusters_to_redis(clusters):
    """
    - Add docstring
    """
    r.set('means', pickle.dumps(clusters))

    return


def get_clusters():

    serialized_clusters =  r.get('means')

    return pickle.loads(serialized_clusters)


In [192]:
centroids = get_clusters()
par_sums = get_and_aggregate_partial_sums_from_redis(centroids)
centroids = np.nan_to_num(find_new_centers(par_sums))
save_clusters_to_redis(centroids)