# Time Series Classification and Clustering

### Spark re-implimentation of the Dynamic Time Warping based KNN Classification and Kmeans Clustering
#### Original Python Version: https://github.com/alexminnaar/time-series-classification-and-clustering

In [1]:
import pandas as pd
import numpy as np
from numpy import *

from pyspark.sql import *
import pyspark.sql.functions as F

# import matplotlib.pylab as plt

In [2]:
%load_ext autotime

In [3]:
def euclid_dist(t1,t2):
    return sqrt(sum((t1-t2)**2))

time: 1.11 ms


In [75]:
def DTWDistance(s1, s2,w):
    DTW={}
    
    w = max(w, abs(len(s1)-len(s2)))
    
    for i in range(-1,len(s1)):
        for j in range(-1,len(s2)):
            DTW[(i, j)] = float('inf')
    DTW[(-1, -1)] = 0
  
    for i in range(len(s1)):
        for j in range(max(0, i-w), min(len(s2), i+w)):
            dist= (s1[i]-s2[j])**2
            DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)])

    return sqrt(DTW[len(s1)-1, len(s2)-1])

time: 7.32 ms


In [5]:
def LB_Keogh(s1,s2,r):
    LB_sum=0
    for ind,i in enumerate(s1):
        
        lower_bound=min(s2[(ind-r if ind-r>=0 else 0):(ind+r)])
        upper_bound=max(s2[(ind-r if ind-r>=0 else 0):(ind+r)])
        
        if i>upper_bound:
            LB_sum=LB_sum+(i-upper_bound)**2
        elif i<lower_bound:
            LB_sum=LB_sum+(i-lower_bound)**2
    
    return np.sqrt(LB_sum)

time: 5.17 ms


## Classification

In [6]:
test = spark.read.csv('datasets/test.csv', sep='\t', inferSchema=True)
train = spark.read.csv('datasets/train.csv', sep='\t', inferSchema=True)
alld = train.union(test).drop('_c60')

time: 6.47 s


In [7]:
from pyspark.ml.feature import VectorAssembler
array_l = train.columns[0:-1]

assembler = VectorAssembler(
    inputCols=array_l,
    outputCol="array")
output = assembler.transform(train)
outputa = assembler.transform(alld).select('array')

time: 535 ms


In [9]:
test = np.genfromtxt('datasets/test.csv', delimiter='\t')
t = test[0][:-1]

time: 23.4 ms


In [10]:
tout = output.select('array').rdd.map(lambda s: euclid_dist(s, t))
print tout.count()
tout.take(10)

300


[12.151056351417187,
 10.453936939773451,
 10.086784735556732,
 10.999850734222914,
 9.7394675912265942,
 9.4993224425529927,
 11.656924528681955,
 11.182484985025257,
 9.8733924804416926,
 10.664275663625961]

time: 259 ms


In [479]:
# one test against all train

tout = output.select('array').rdd.map(lambda s: DTWDistance(s.array, t, 5))
print tout.count()
tout.take(10)

300


[5.2911970088692994,
 5.3557723186296249,
 5.0190014897404325,
 5.9517895331420334,
 6.7503511949119162,
 6.4298381043144257,
 6.1803614878149675,
 5.3019137634372209,
 5.740246450105837,
 5.1515622688035343]

time: 1.12 s


In [480]:
# all test against all train, no broadcast

def testdtw(train,test,w):
    l=[]
    for i in test:
        min_dist=float('inf')
        dist=DTWDistance(i[:-1],train,w)
        l.append(dist)
    return l
# testBroadcast
tout = output.select('array').repartition(8).rdd.map(lambda s: testdtw(s.array, test, 5))
print tout.count()
# tout.take(1)

300
time: 1min 46s


In [7]:
testBroadcast = sc.broadcast(test)

In [226]:
# all test against all train, broadcast

def testdtw(train,test,w):
    l=[]
    for i in test:
        min_dist=float('inf')
        dist=DTWDistance(i[:-1],train,w)
        l.append(dist)
    return l

tout = output.select('array').repartition(8).rdd.map(lambda s: testdtw(s.array, testBroadcast.value, 5))
print tout.count()
# tout.take(1)

300
time: 1min 39s


In [287]:
# all test against all train, select best
# from sklearn.metrics import classification_report

def knn(train,test,w):
    preds=[]
    dists=[]
    for i in test:
        min_dist=float('inf')
        closest_seq=[]
        j = train
        if LB_Keogh(i[:-1],j[:-1],5)<min_dist:
            dist=DTWDistance(i[:-1],j[:-1],w)
            if dist<min_dist:
                min_dist=dist
                closest_seq=j
        preds.append(closest_seq[-1])
        dists.append(min_dist)
    return preds, dists
#     return preds, dists

tout = output.select('array').repartition(8).rdd.map(lambda train: knn(train.array, testBroadcast.value, 5))
print tout.count()
# tout.take(2)

300
time: 1min 31s


In [None]:
# pick the val with the min_dist
def min_dist(v, d):
    md = float('inf')
    mv = v[0]
    for i in range(len(v)):
        if d[i] < md:
            mv = v[i]
#         s = v[i]
    return mv
tout.map(lambda s: min_dist(s[0], s[1])).take(10)

## Classification and Clustering

In [93]:
# find the closest centroid
def assign_cluster(row,target,w):
    min_dist = float('inf')
#     closest_seq = 
    for ind, t in enumerate(target):
        tt = t[0]
        if LB_Keogh(row, tt, 5) < min_dist:
            dist = DTWDistance(row, tt, w)
            if dist < min_dist:
                min_dist = dist
                closest_seq = ind
        
    return closest_seq

def k_means_clust(data,num_clust,num_iter,w=5):
    centroids = data.rdd.takeSample(False,num_clust, 20)
    data = data.withColumn('assignment', F.lit(0))
    return data.rdd.map(lambda row: assign_cluster(row.array,centroids,w))
#     return centroids
centroids = k_means_clust(outputa,4,10,4)
centroids.take(5)

[0, 0, 0, 3, 1]

time: 295 ms


In [209]:
# find the closest centroid
# then recalculate the centroid
from operator import add

def assign_cluster(row,target,w):
    min_dist = float('inf')
    for ind, t in enumerate(target):
        tt = t[0]
        if LB_Keogh(row, tt, 5) < min_dist:
            dist = DTWDistance(row, tt, w)
            if dist < min_dist:
                min_dist = dist
                closest_seq = ind
    
    return closest_seq, row

def assign_cluster1(row,target,w):
    min_dist = float('inf')
    for ind, t in enumerate(target):
        tt = t
        if LB_Keogh(row, tt, 5) < min_dist:
            dist = DTWDistance(row, tt, w)
            if dist < min_dist:
                min_dist = dist
                closest_seq = ind
    
    return closest_seq, row

def recalc_centroids():
    return 

def k_means_clust(data,num_clust,num_iter,w=5):
    centroids = data.takeSample(False,num_clust, 20)
    counter=1
    print counter
    
    assignments = data.map(lambda row: assign_cluster(row.array,centroids,w))
    s = assignments.reduceByKey(add)
    c = assignments.countByKey()
    centroids = s.map(lambda r: r[1] / c[r[0]]).collect()
    
    for n in range(num_iter-1):
        counter+=1
        print counter
        assignments = data.map(lambda row: assign_cluster1(row.array,centroids,w))
        s = assignments.reduceByKey(add)
        c = assignments.countByKey()
        centroids = s.map(lambda r: r[1] / c[r[0]]).collect()
    
    return centroids
centroids = k_means_clust(outputa.repartition(8).rdd,4,10,4)
centroids#.take(5)

1
2
3
4
5
6
7
8
9
10


[DenseVector([1.2201, 1.1526, 1.1703, 1.0669, 1.098, 1.0569, 1.0416, 0.9862, 0.898, 0.9894, 0.91, 0.916, 0.8827, 0.8768, 0.7797, 0.7493, 0.7435, 0.7039, 0.7179, 0.7681, 0.7363, 0.4702, 0.4849, 0.4058, 0.3431, 0.2023, 0.2516, 0.2002, 0.1787, 0.1124, 0.0346, -0.075, -0.1665, -0.2304, -0.4065, -0.3782, -0.5491, -0.5627, -0.6443, -0.6365, -0.604, -0.6871, -0.7075, -0.7392, -0.8173, -0.8561, -0.889, -0.9099, -0.852, -0.8898, -0.9137, -0.9274, -0.9887, -1.0916, -1.0502, -1.0818, -1.0725, -1.1176, -1.1896, -1.1139]),
 DenseVector([-0.9327, -1.1417, -1.0052, -0.9593, -1.0831, -0.9687, -0.9774, -1.0009, -1.0369, -0.9926, -1.028, -0.935, -1.0734, -0.9192, -1.1266, -1.0083, -1.0249, -0.9998, -1.0568, -1.0423, -0.7946, -0.6769, -0.5499, -0.2253, -0.1365, 0.0565, 0.1291, 0.2323, 0.4263, 0.4854, 0.4594, 0.561, 0.7353, 0.7145, 0.6026, 0.7072, 0.7772, 0.7465, 0.5241, 0.7285, 0.6539, 0.7938, 0.7617, 0.7524, 0.7293, 0.7945, 0.7457, 0.6879, 0.7195, 0.7079, 0.6965, 0.7462, 0.6892, 0.7832, 0.7587, 0.769, 0

time: 31.7 s
