In [1]:
## import dependencies
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
import uuid
import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [2]:
## spark Context and Streaming Spark Context
job_uuid = 'job_'+ str(uuid.uuid4())

sc = SparkContext( 'local', job_uuid)

In [3]:
sc.stop()

In [5]:
# streaming
sc = SparkContext('local', ' streaming-kmeans')
ssc = StreamingContext(sc,10)

In [6]:
ssc.stop()

In [7]:
# helper function to parse the data
class Parser():
    def __init__(self,type='train',delimiter=',',num_elements=5, job_uuid=''):
        self.type=type
        self.delimiter=delimiter
        self.num_elements=num_elements
        self.job_uuid=job_uuid
        
    def parse(self,l):
        try:
            line = l.split(self.delimiter)    
            if self.type=='test':
                category = float(line[0])
                feature_vector = Vectors.dense(line[1:])
                return LabeledPoint(category, feature_vector)
            elif self.type=='train':
                category = -1
                feature_vector = Vectors.dense(line)
                return LabeledPoint(category, feature_vector)
            else:
                # log exceptions
                f = open('/errors_events/{0}.txt'.format(self.job_uuid),'a')
                f.write('Unknown type: {0}'.format(self.type))
                f.close()
        except:
            # log errors
            f = open('/error_events/{0}.txt'.format(self.job_uuid),'a')
            f.write('Error parsing line: {0}'.format)
            f.close()   


In [9]:
# streaming
sc = SparkContext('local', ' streaming-kmeans')
ssc = StreamingContext(sc,10)

num_features = 2
training_parser = Parser('train',',',num_features,job_uuid)


trainingData = ssc.textFileStream("/training_data").\
    map(lambda x: training_parser.parse(x)).pprint()
ssc.start()

In [11]:
ssc.stop()

In [12]:
num_features = 2
num_clusters = 3

# streaming
sc = SparkContext('local', ' streaming-kmeans')
ssc = StreamingContext(sc,10)

training_parser = Parser('train',',',num_features,job_uuid)
test_parser = Parser('test',',',num_features+1,job_uuid)

trainingData = ssc.textFileStream("/training_data").\
    map(lambda x: training_parser.parse(x)).map(lambda x: x.features).filter(lambda x: x is not None)
testData = ssc.textFileStream("/test_data").\
    map(lambda x: test_parser.parse(x)).filter(lambda x: x is not None)
streaming_clustering = StreamingKMeans(k=num_clusters, decayFactor=1.0).\
    setRandomCenters(num_features,0,0)
streaming_clustering.trainOn(trainingData)
streaming_clustering.predictOnValues(testData.map(lambda x: (x.label, x.features))).\
    pprint()
ssc.start()  

In [None]:
ssc.stop()

In [None]:
streaming_clustering.latestModel().clusterCenters

In [None]:
import random
labels = []
for i in range(1, 100):
    class_num = random.randint(1, 3)
    labels.append(class_num)

In [None]:
import numpy as np

dists = {}
dists[1] = lambda : [np.random.normal(1,1),np.random.normal(1,2)]
dists[2] = lambda : [np.random.normal(3,1),np.random.normal(7,2)]
dists[3] = lambda : [np.random.normal(7,1),np.random.normal(-10,1)]

f=open('sample_train.txt','w')
for i in labels:
    f.write(",".join([str(n) for n in dists[i]()])+'\n')
f.close()

f=open('sample_test.txt','w')
for i in labels:
    f.write(",".join([str(i)]+[str(n) for n in dists[i]()])+'\n')
f.close()

In [None]:
streaming_clustering.latestModel().centers