In [47]:
import numpy as np
import pandas as pd
import scipy.stats
import string

In [48]:
class SAX:
    def __init__(self, ts_length, sax_length, n_alphabets):
        if (n_alphabets < 3):
            raise ValueError('The alphabet size should be at least 3.')
        
        self.ts_length = ts_length
        self.sax_length = sax_length
        self.n_alphabets = n_alphabets
        self.alphabets = list(string.ascii_uppercase[0:n_alphabets])

        # Given that the normalized time series have highly Gaussian
        # distribution, we can simply determine the "breakpoints" that will
        # produce a equal-sized areas under Gaussian curve
        # Ref: http://www.cs.ucr.edu/~eamonn/SAX.pdf
        breakpoints = []
        breakpoints.append(-np.inf)
        for i in range(1, n_alphabets):
            breakpoints.append(scipy.stats.norm.ppf(i / n_alphabets))
        breakpoints.append(np.inf)
        self.breakpoints = breakpoints
    
    def transform(self, ts):
        ts_normalized = self.__z_normalize(ts)
        ts_paa = self.__reduce_dim_by_paa(ts_normalized)
        ts_discretized = self.__discretize(ts_paa)
        return ''.join(ts_discretized.ravel())
    
    def mindist(self, sax_a, sax_b):
        sax_a_index = np.array([ord(x) - ord('A') for x in list(sax_a)]) + 1
        sax_b_index = np.array([ord(x) - ord('A') for x in list(sax_b)]) + 1
        
        bps = self.breakpoints
        scaling_factor = float(self.ts_length / self.sax_length)
        dists = np.array([0 if np.abs(r - c) <= 1 else bps[np.amax([r, c]) - 1] - bps[np.amin([r, c])] for (r, c) in zip(sax_a_index, sax_b_index)])
        return np.sqrt(scaling_factor * np.sum(dists**2))
    
    def __z_normalize(self, ts):
        return scipy.stats.zscore(ts)
    
    def __reduce_dim_by_paa(self, ts):
        statistic, _, _ = scipy.stats.binned_statistic(np.arange(self.ts_length), ts, statistic='mean', bins=self.sax_length)
        return statistic
    
    def __discretize(self, ts):
        return pd.cut(ts, bins=self.breakpoints, labels=self.alphabets)

In [49]:
x = 7
y = 5
z = 6 # Number of alphabet for SAX string
n = 2**x # Time series length
w = 2**y # SAX string length
data_dir = './data/'

In [41]:
# Initialize two random time series of length n
# TS_A = np.around(np.random.uniform(0, x, n), decimals=2)
# TS_B = np.around(np.random.uniform(0, x, n), decimals=2)

In [50]:
# Load time series data from files
TS_A = np.loadtxt(data_dir + 'ts_a.txt', delimiter=' ')
TS_B = np.loadtxt(data_dir + 'ts_b.txt', delimiter=' ')

In [51]:
sax = SAX(ts_length=n, sax_length=w, n_alphabets=z)

In [52]:
# Transform to SAX
SAX_A = sax.transform(TS_A)
SAX_B = sax.transform(TS_B)

In [53]:
print('SAX_A: {0:s}'.format(SAX_A))
print('SAX_B: {0:s}'.format(SAX_B))

SAX_A: EDDCEDCCCBCDBBDECDDDCCDDDDECBDEC
SAX_B: CDCCCDFFCCBDCBDCDBEBECFBCECCCCDC


In [54]:
# Calculate a distance between SAX_A and SAX_B
mindist = np.around(sax.mindist(SAX_A, SAX_B), decimals=2)
print('Distance between SAX_A and SAX_B: {0:.2f}'.format(mindist))

Distance between SAX_A and SAX_B: 3.82


In [55]:
# Save data as files
np.savetxt(data_dir + 'ts_a.txt', TS_A, fmt='%.2f', delimiter=' ')
np.savetxt(data_dir + 'ts_b.txt', TS_B, fmt='%.2f', delimiter=' ')
np.savetxt(data_dir + 'ts_a_normalized.txt', sax._SAX__z_normalize(TS_A), fmt='%.2f', delimiter=' ')
np.savetxt(data_dir + 'ts_b_normalized.txt', sax._SAX__z_normalize(TS_B), fmt='%.2f', delimiter=' ')

fp1 = open(data_dir + 'sax_a.txt', 'w')
fp2 = open(data_dir + 'sax_b.txt', 'w')
fp1.write('{0:s}'.format(SAX_A))
fp2.write('{0:s}'.format(SAX_B))
fp1.close()
fp2.close()

In [58]:
fp3 = open(data_dir + 'result.txt', 'w')
fp3.write('5730329521\tParinthorn Saithong (Aof)\n')
fp3.write('X = {0:d}\n'.format(x))
fp3.write('Y = {0:d}\n'.format(y))
fp3.write('Z = {0:d}\n'.format(z))
fp3.write('n = {0:d}\n'.format(n))
fp3.write('w = {0:d}\n'.format(w))
fp3.write('SAX_A = {0:s}\n'.format(SAX_A))
fp3.write('SAX_B = {0:s}\n'.format(SAX_B))
fp3.write('Distance between SAX_A & SAX_B = {0:.2f}\n'.format(mindist))
fp3.close()