# Info
On the files contained in this repo
```bash
> # wc -l *.py
  163 02_pyspark/analysis.py      #< functionality of this notebook
   77 02_pyspark/main2.py         #< Spark program 
   68 02_pyspark/operatorImpl.py  #     - Operator used for spark
   97 02_pyspark/parser.py        #     - Parser
   95 02_pyspark/streamListImpl.py#     - Listener
   42 xx_migrate/generate_cluster_data.py     # irrelevant
   66 xx_migrate/streaming_k_means_example.py # 
  771 total
> # ipynb files
drwxr-xr-x 1 19:00 01_exploreData # contains scripts for explroing
                                  # different datasets
-rw-r--r-- 1 18:44 01_exploreData/unusedExploration/Crimes.ipynb
-rw-r--r-- 1 18:56 01_exploreData/unusedExploration/Phone.ipynb
-rw-r--r-- 1 18:44 01_exploreData/unusedExploration/Power.ipynb
-rw-r--r-- 1 14 12:32 01_exploreData/Sports exploration.ipynb
-rw-r--r-- 1 13:09 02_pyspark/AnalyzerTobemerged.ipynb
```

In [22]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA

from matplotlib.widgets import Slider
import matplotlib.pyplot as plt
%matplotlib notebook
import numpy as np
import numpy as np
import pandas as pd

from mpl_toolkits.mplot3d import Axes3D


In [23]:
amountBatches = 60
amountClusters = 3
dim = 45

In [24]:
def makeSamplePlot():
    # sample plot
    samples_in_cluster = 100
    mean = np.array([-0, -0, -0])
    mean2 = mean + 2
    c0 = np.random.multivariate_normal(
        mean, np.identity(3)*.5, samples_in_cluster)
    c1 = np.random.multivariate_normal(
        mean2, np.identity(3)*.5, samples_in_cluster)
    fig = plt.figure(figsize=(10,5))
    plt.title("Spark Streaming K-Means:\n forgetfulness")
    ax = fig.add_subplot(121, projection='3d')
    ax.set_title("alpha = 0")
    ax.set_xlim((-5, 5))
    ax.set_ylim((-5, 5))
    ax.set_zlim((-5, 5))
    ax.scatter(mean[0], mean[1], mean[2], marker="x", s=100, c="red", label="Mean t = 0")
    ax.scatter(mean2[0], mean2[1], mean2[2], marker="x", s=100, c="orange", label="Mean t = 1")
    ax.scatter(c0[:,0], c0[:,1], c0[:,2], c=(.1, .4, .6, .5), label="data t = 0")
    ax.scatter(c1[:,0], c1[:,1], c1[:,2], c=(.1, .6, .4, .5), label="data t = 1")
    ax.scatter(mean[0], mean[1], mean[2], marker="x", s=100, c="red")
    ax.scatter(mean2[0], mean2[1], mean2[2], marker="x", s=100, c="orange")

    plt.legend()


    ax = fig.add_subplot(122, projection='3d')
    ax.set_xlim((-5, 5))
    ax.set_ylim((-5, 5))
    ax.set_zlim((-5, 5))
    ax.set_title("alpha = 1")
    ax.scatter(mean[0], mean[1], mean[2], marker="x", s=100, c="red", label="Mean t = 0")
    ax.scatter(mean2[0], mean2[1], mean2[2], marker="x", s=100, c="orange", label="Mean t = 1")
    ax.scatter(c0[:,0], c0[:,1], c0[:,2], c=(.1, .4, .6, .5), label="data t = 0")
    ax.scatter(c1[:,0], c1[:,1], c1[:,2], c=(.1, .6, .4, .5), label="data t = 1")
    ax.scatter(mean[0], mean[1], mean[2], marker="x", s=100, c="red")
    ax.scatter((mean2[0] + mean[0]) / 2, (mean2[0] + mean[0]) / 2, (mean2[0] + mean[0]) / 2, marker="x", s=100, c="orange")

makeSamplePlot()

<IPython.core.display.Javascript object>

In [25]:
def loadCenters(amountClusters):

    # fetch the batch
    batch = np.array(np.loadtxt("../01_exploreData/out/check/centers.csv", delimiter=','))
    val = batch[0:60*amountClusters]
    batch = batch[:amountClusters*60,:] #< discard batchest that are non-existent. 
    batch_times = batch.reshape([int(batch.shape[0]/amountClusters), amountClusters, batch.shape[1]])

    # u never know unless u think! or insert assertions!
    if 0:
        assert((val[0] == batch_times[0,0]).all())
        assert((val[1] == batch_times[0,1]).all())
        assert((val[2] == batch_times[0,2]).all())
        assert((val[3] == batch_times[1,0]).all())

    return batch_times, batch

def loadCenter(amountClusters, path="../01_exploreData/out/check/centers.csv" ):
    
    print(path)
    

    # fetch the batch
    batch = np.array(np.loadtxt(path, delimiter=','))
   
    val = batch[0:60*amountClusters]
    print("hier", batch.shape, val.shape, "amountclusters", amountClusters)
    
    batch = batch[:amountClusters*60,:] #< discard batchest that are non-existent. 
    batch = batch.reshape([int(batch.shape[0]/amountClusters), amountClusters, batch.shape[1]])

    return batch

In [27]:
def loadFeatures(path="../01_exploreData/out/check/testRaw.csv"):
    # fetch the batch
    batch = np.array(np.loadtxt(path, delimiter=','))
    line_per_batch = int(batch.shape[0] / amountBatches)
    print(line_per_batch)
    print(batch.shape)
    batch_times = batch.reshape([amountBatches, line_per_batch, batch.shape[1]])
    return batch_times, batch

In [28]:
def getObjective(t, features, labels, centers):
    """
    :features:   The feature vector of dimensionality [times x linesPerTime x Dim=45]
    :labels:   The feature vector of dimensionality   [times x linesPerTime]
    """
    
    features = features[t]
    labels = labels[t]
    centers = centers[t]
    
    means = [centers[l] for l in labels.astype(int)]
    obj = np.linalg.norm(means - features) / labels.shape[0]
    return obj

def getWorst(t, features, labels, centers):
    """
    :features:   The feature vector of dimensionality [times x linesPerTime x Dim=45]
    :labels:   The feature vector of dimensionality   [times x linesPerTime]
    """
    
    features = features[t]
    labels = labels[t]
    centers = centers[t]
    
    means = np.mean(features)
    obj = np.linalg.norm(means - features) / labels.shape[0]
    return obj

In [29]:
centers, centers_flat = loadCenters(amountClusters)
print(centers_flat.shape)
print(centers.shape)

(180, 45)
(60, 3, 45)


In [30]:
testFeatures, testFeatures_flat = loadFeatures()
print(testFeatures_flat.shape)
print(testFeatures.shape)

628
(37680, 45)
(37680, 45)
(60, 628, 45)


In [31]:
trainFeatures, trainFeatures_flat = loadFeatures("../01_exploreData/out/check/trainRaw.csv")
print(trainFeatures_flat.shape)
print(trainFeatures.shape)

18220
(1093200, 45)
(1093200, 45)
(60, 18220, 45)


In [32]:
testLabels, testLabels_flat = loadFeatures("../01_exploreData/out/check/testLabels.csv")
testLabels = testLabels[:,:,1]
print(testLabels_flat.shape)
print(testLabels.shape)

628
(37680, 2)
(37680, 2)
(60, 628)


In [None]:
def plot_reduced_dim_test(dim, batchnr):
    pca = PCA(n_components=dim)
    testFeatures_pca = pca.fit(testFeatures_flat)
    
    testFeatures_0_dim2 = testFeatures_pca.transform(testFeatures[batchnr])
    centers_0_dim2 = testFeatures_pca.transform(centers[batchnr])
    #print(testFeatures_0_dim2.shape)
    #print(centers_0_dim2.shape)

    plt.figure()
    plt.scatter(testFeatures_0_dim2[:,0], testFeatures_0_dim2[:,1], s=.2)
    plt.scatter(centers_0_dim2[:,0], centers_0_dim2[:,1], marker='x')

def merge_features(inputdata, batchnr_start, batchnr_end):
    if(batchnr_end > batchnr_start):
        features = np.vstack((inputdata[batchnr_start], inputdata[batchnr_start+1]))
        if(batchnr_start+2 == batchnr_end):
            features = np.vstack((features, inputdata[batchnr_start+2]))
        if(batchnr_start+2 < batchnr_end):
            for i in range(batchnr_start+2, batchnr_end+1):
                features = np.vstack((features, inputdata[i]))
    elif(batchnr_start == batchnr_end):
        features = inputdata[batchnr_start]
    else:
        print("batch_end should be greater than batch_start!")
    #print(features.shape)
    return features
    

def plot_reduced_dim_cumulated_test(dim, batchnr_start, batchnr_end, name):
    pca = PCA(n_components=dim)
    testFeatures_pca = pca.fit(testFeatures_flat)
    
    plt.figure()
    plt.grid(True, linestyle='--')
    plt.xlim((-40, 80))
    plt.ylim((-60, 80))
    plt.xlabel('PCA dimension 1')
    plt.ylabel('PCA dimension 2')
    for i in range(batchnr_start, batchnr_end):
        features_dim2 = testFeatures_pca.transform(testFeatures[i])
        #print(features_dim2.shape)
        transparence = float((i+1.)/batchnr_end)
        plt.scatter(features_dim2[:,0], features_dim2[:,1], s=transparence*2, color=(0,0,1,transparence))
        #print((i+1.)*batchnr_end/10., batchnr_end, 10, i)

    centers_dim2 = testFeatures_pca.transform(centers[batchnr_end])
    print(centers_dim2)
    plt.scatter(centers_dim2[:,0], centers_dim2[:,1], marker='x', color = 'r')
    plt.savefig(name)
    
    """
    for i in range(batchnr_start, batchnr_end):

        features = merge_features(testFeatures, 0, i+1)
        features_dim2 = testFeatures_pca.transform(features)
        centers_dim2 = testFeatures_pca.transform(centers[i+1])

        plt.figure()
        plt.scatter(features_dim2[:,0], features_dim2[:,1], s=.2)
        plt.scatter(centers_dim2[:,0], centers_dim2[:,1], marker='x')
    """

plot_reduced_dim_cumulated_test(2, 0, 1, "../00_documentation/1")
plot_reduced_dim_cumulated_test(2, 0, 15, "../00_documentation/15")
plot_reduced_dim_cumulated_test(2, 0, 30, "../00_documentation/30")
plot_reduced_dim_cumulated_test(2, 0, 45, "../00_documentation/45")
plot_reduced_dim_cumulated_test(2, 0, 59, "../00_documentation/60")


In [34]:


# This is supposed to be an interactive plot for the pca.

#not ready yet!

pca = PCA(n_components=dim)
testFeatures_pca = pca.fit(testFeatures_flat)
def wrapper(batchnr_end): 
    for i in range(0, batchnr_end):
        features_dim2 = testFeatures_pca.transform(testFeatures[i])
        plt.scatter(features_dim2[:,0], features_dim2[:,1], s=float(i*batchnr_end/10))

    centers_dim2 = testFeatures_pca.transform(centers[batchnr_end])
    plt.scatter(centers_dim2[:,0], centers_dim2[:,1], marker='x')


"""
fig, ax = plt.subplots()
plt.subplots_adjust(left=0.25, bottom=0.25)
l, = plt.plot()

time = plt.axes([0.25, 0.1, 0.65, 0.03])
slider = Slider(time, 'time', 0, 9, valinit=0)
slider.on_changed(wrapper)

plt.show()
"""

"\nfig, ax = plt.subplots()\nplt.subplots_adjust(left=0.25, bottom=0.25)\nl, = plt.plot()\n\ntime = plt.axes([0.25, 0.1, 0.65, 0.03])\nslider = Slider(time, 'time', 0, 9, valinit=0)\nslider.on_changed(wrapper)\n\nplt.show()\n"

In [35]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider, Button, RadioButtons

fig, ax = plt.subplots()
plt.subplots_adjust(left=0.25, bottom=0.25)
t = np.arange(0.0, 1.0, 0.001)
a0 = 5
f0 = 3
s = a0*np.sin(2*np.pi*f0*t)
l, = plt.plot(t, s, lw=2, color='red')

axcolor = 'lightgoldenrodyellow'
axfreq = plt.axes([0.25, 0.1, 0.65, 0.03], facecolor=axcolor)
sfreq = Slider(axfreq, 'Freq', 0.1, 30.0, valinit=f0)


def update(val):
    freq = sfreq.val
    l.set_ydata(amp*np.sin(2*np.pi*freq*t))
    fig.canvas.draw_idle()
sfreq.on_changed(update)


plt.show()

<IPython.core.display.Javascript object>

In [36]:
# Simple plot that shows the behavior of the objective 
# function over time.
j = [ getObjective(t, testFeatures, testLabels, centers) for t in np.arange(60)]
w = [ getWorst(t, testFeatures, testLabels, centers) for t in np.arange(60)]

plt.figure()
plt.xlabel('batch number')
plt.ylabel('objective function value')
plt.grid(True, linestyle='--')
plt.plot(j)
plt.plot(w, label="worst")
print(j)
plt.savefig("../00_documentation/jfunction_batches.png")

<IPython.core.display.Javascript object>

[0.87822770145718665, 0.84043617143753824, 0.97442046896450352, 0.98839684312920206, 0.82059735632707709, 0.85278997404781332, 0.8231663341481078, 0.82922090755732014, 0.81962143819514344, 0.79012182275296916, 0.67188632529702641, 0.6830480415797644, 0.64757814464168306, 0.67406453212654149, 0.67753585191684618, 0.6432879833691969, 0.65352723555594161, 0.65963105270162681, 0.61692797677765521, 0.65729562106097217, 0.6073859097832196, 0.60352257654780783, 0.66164713745046699, 0.6756724407424749, 0.62918670007864352, 0.64419017056936911, 0.70118902141661255, 0.61700891845245132, 0.65430041625897384, 0.64145927742861708, 0.64384832752271548, 0.69619213563176252, 0.66861526761461443, 0.66829420004637941, 0.64539271120620756, 0.65008333276452557, 0.64325371461937231, 0.6360425403339468, 0.65862755320572441, 0.63513733653795867, 0.62996370493384657, 0.68260448051104017, 0.60949507084389887, 0.64237339457436271, 0.64661122098789969, 0.6232112767804574, 0.64566251210669856, 0.63656629357782846

In [None]:
colors = {0:"red", 1:"green", 2:"blue"}
colors = np.array(["red", "green", "blue"])
def plotPerDimension(features, labels, centers, timeUnit=0):
    """
    :timeUnit:   Time Unit that is selected for the plot
    :features:   The feature vector of dimensionality [times x linesPerTime x Dim=45]
    :labels:   The feature vector of dimensionality   [times x linesPerTime]
    """
    plt.figure(figsize=(100,100))
    
    # only get the current timeUnit
    features = features[timeUnit]
    labels = labels[timeUnit].astype(int)
    centers = centers[timeUnit]

    #labels = np.dot(labels[:,np.newaxis], np.array([.3, .3, .3])[np.newaxis,:])
    for dim1 in np.arange(features.shape[1]):
        
        print(dim1)
        for dim2 in np.arange(dim1, features.shape[1]):
            plt.subplot(features.shape[1], features.shape[1], dim1 * features.shape[1] + dim2 + 1)
            ## Plot centers
            
            plt.scatter(features[:,dim1], features[:,dim2], color=[colors[i] for i in labels], s=.2) #
            if(centers.shape[0] == 0):
                plt.scatter(centers[:,dim1], centers[:,dim2], color=[colors[i] for i in range(centers.shape[0])], s=10, marker="x")
plotPerDimension(testFeatures, testLabels, centers)
plt.savefig("asdf.png")

<IPython.core.display.Javascript object>

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44


In [None]:
def loadFeature(path="../01_exploreData/out/check/testRaw.csv"):
    # fetch the batch
    batch = np.array(np.loadtxt(path, delimiter=','))
    line_per_batch = int(batch.shape[0] / amountBatches)
    print(line_per_batch)
    print(batch.shape)
    if (len(batch.shape) > 1):
        batch = batch.reshape([amountBatches, line_per_batch, batch.shape[1]])
    else:
        batch = batch.reshape([amountBatches, line_per_batch, 1])
    return batch

def getData(amountClusters, bulkExec="", timeUnitMax=10):
    """
    :bulkExex:   Identifier for the bulk execution. leave empty in case of normal execution.
    """
    center = loadCenter(amountClusters, path="../01_exploreData/out/check/" + bulkExec + "centers.csv" )
    testFeatures = loadFeature("../01_exploreData/out/check/" + bulkExec + "testRaw.csv")
    trainFeatures = loadFeature("../01_exploreData/out/check/" + bulkExec + "trainRaw.csv")
    testLabels = loadFeature("../01_exploreData/out/check/" + bulkExec + "testLabels.csv")
    testLabels = testLabels[:,:,1]
    trainLabels = loadFeature("../01_exploreData/out/check/" + bulkExec + "trainLabels.csv")
    
    #center = center[:timeUnitMax]
    testFeatures = testFeatures[:timeUnitMax]
    trainFeatures = trainFeatures[:timeUnitMax]
    testLabels = testLabels[:timeUnitMax]
    trainLabels = trainLabels[:timeUnitMax]
    
    return testFeatures, trainFeatures, testLabels, trainLabels, testFeatures, testLabels, center
    
    return testFeatures, trainFeatures, testLabels, trainLabels, trainFeatures, trainLabels, center
    
    
    print(testFeatures.shape, trainFeatures.shape)
    print(testLabels.shape, trainLabels.shape)
    features = np.empty([timeUnitMax, testFeatures.shape[1] + trainFeatures.shape[1], testFeatures.shape[2]])
    labels = np.empty([timeUnitMax, testLabels.shape[1] + trainLabels.shape[1], 1])
    for t in range(timeUnitMax):
        i = t
        features[t] = np.vstack((testFeatures[i], trainFeatures[i]))
        print(testLabels[i].shape, trainLabels[i,:,0].shape, labels[t].shape)
        labels[t] = np.vstack((testLabels[i][:,np.newaxis], trainLabels[i]))
    
    return testFeatures, trainFeatures, testLabels, trainLabels, features, labels, center
    
    
def getBulkObjective(bulkCount=10, amountBatches=1):
    """
    Returns objective function for bulk execution. 
    :bulkCount:   the amount of temporal batches to be processed.
    :return:      
    """
    
    js = np.empty([bulkCount, amountBatches])
    
    for k in np.arange(1, bulkCount+1):
        print("k = ", k)
        _, _, _, _, features, labels, centers= getData(k, "bulk" + str(k) + "/", timeUnitMax=amountBatches)
        
        js[k-1,:] = np.array([ getObjective(t, features, labels, centers) for t in np.arange(amountBatches)])

        #js[k-1,:] = np.array([ getObjective(t, testFeatures, testLabels, centers) for t in np.arange(amountBatches)])
    return js
buuu = getBulkObjective(10, 10)



k =  1
../01_exploreData/out/check/bulk1/centers.csv
hier (200, 45) (60, 45) amountclusters 1
628
(37680, 45)
18220
(1093200, 45)
628
(37680, 2)
18220
(1093200,)
k =  2
../01_exploreData/out/check/bulk2/centers.csv
hier (400, 45) (120, 45) amountclusters 2
628
(37680, 45)


In [None]:
buuu.shape

In [None]:
plt.figure()
plt.title("Objective function")
plt.xlabel("k")

plt.grid(True, linestyle='--')
plt.xlabel("objective function")
plt.plot(buuu[:,k])

In [None]:
plt.figure()
for k in range(10):
    
    plt.plot(buuu[:,k], label="batch = " + str(k))
plt.legend()

In [None]:
testFeatures, trainFeatures, testLabels, trainLabels, features, labels = getData(amountClusters)
plotPerDimension(features, labels, [], timeUnit=0)

In [None]:
X = np.linspace(0,10,10)
Y =  np.linspace(0,10,10)
print(X.shape)
X = np.array(np.meshgrid(X, Y))
X, Y, = X
Z = buuu

In [None]:

X, Y, Z = axes3d.get_test_data(0.05)

In [None]:
print(X.shape, Y.shape, Z.shape)

In [None]:
'''
=================
3D wireframe plot
=================

A very basic demonstration of a wireframe plot.
'''

from mpl_toolkits.mplot3d import axes3d
import matplotlib.pyplot as plt


fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

# Grab some test data.
#X, Y, Z = axes3d.get_test_data(0.05)

# Plot a basic wireframe.
ax.plot_wireframe(X, Y, Z, rstride=10, cstride=10)

plt.show()