# Clustering times series data with SQL

**The data**

The data in the example cases have the `time` column and one or more columns representing `engine heat` in celcius degrees. 

An increase of 1 in time equals 1 hour. Each case 3 days of data.

This notebook is loosely inspired by an actual business need, but the data and examples are generalization of the problem.

**Clustering**

The value in the `egine heat` column increases by time when the engine is running. Once the temperature raises to a certain level, the system automatically switces to secondary engine. 

For the reporting purposes this data is needed:
* Identify the cluster id for each observation
* Count number engine switches (clusters) in the data
* Calculate number of observations in during each engine run

For one reason or another the engine temperature sensor is the only available information. Because of the system limitations SQL is the only available analytics tool.




## Initialize

In [220]:
#Import libraries
import sqlite3
import pandas as pd
import numpy as np

from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go

In [142]:
init_notebook_mode(connected=True)

In [473]:
#Constants
days_n = 3
observations_n = days_n * 24
clusters_n = 7
start_heat_min = 18
start_heat_max = 22

#Column names
col_time = "time"
col_cluster_rank = "cluster_rank"
col_cluster_id = "cluster_id"
col_start_heat = 'start_heat'

#Database settings
con = sqlite3.connect(":memory:")

In [594]:
#Generate time series from 0 ... observations_n
def generate_initial_data():

    #Generate the time column
    time_series = np.arange(observations_n)

    #Create cluster breakpoints
    np.random.seed(11)
    cluster_breakpoints = np.sort(np.random.randint(observations_n, size=clusters_n-1))
    cluster_breakpoints = np.insert(cluster_breakpoints, 0, 0)
    print("Cluster breakpoints: {}".format(cluster_breakpoints))
    
    #Create cluster partitions
    cluster_id = np.repeat(0, observations_n)
    cluster_id[cluster_breakpoints] = 1
    cluster_id = np.cumsum(cluster_id)
    
    #Create starting heat
    cluster_start_heat = np.repeat(np.nan, observations_n)
    cluster_start_heat[cluster_breakpoints] = np.random.randint(low=start_heat_min, high=start_heat_max, size=clusters_n)
    
    #Create the data frame
    df = pd.DataFrame({col_time: time_series, col_cluster_id: cluster_id, col_start_heat: cluster_start_heat})
    
    #Rank inside the cluster
    df[col_cluster_rank] = df.groupby(col_cluster_id)[col_cluster_id].rank().astype(int)
    
    #Initial temperature
    cluster_firsts = np.where(df[col_cluster_rank]==1)
    df[[col_start_heat]] = df[[col_start_heat]].fillna(method='ffill')
    
    return df

#Always increasing
def generate_engine_heat_1(df_arg, col_engine_heat='engine_heat'):
    
    df = df_arg.copy()
    
    col_heat_delta = 'heat_delta_temp'
    col_heat_cum = 'heat_cum_temp'
    
    #Create heat delta and give a cluster specific factor
    df[col_heat_delta] = np.random.random(size=df.shape[0]) * df[col_cluster_rank]**0.5
    
    #Cumulative sum of heat deltas by cluster id
    df[col_heat_cum] = df.groupby(col_cluster_id)[col_heat_delta].cumsum()
    
    #Engine heat at given moment is cumulative sum plus start heat
    df[col_engine_heat] = df[col_start_heat] + df[col_heat_cum]
    
    #Drop temporary columns
    df.drop([col_heat_delta, col_heat_cum], inplace=True, axis=1)
    
    return df

#The combination is always increasing
def generate_engine_heat_2(df_arg, col_1='engine_heat_1', col_2='engine_heat_2'):
    
    df = df_arg.copy()
    
    delta_1 = 'heat_delta_temp_1'
    cum_1 = 'heat_cum_temp_1'
    delta_2 = 'heat_delta_temp_2'
    cum_2 = 'heat_cum_temp_2'
    
    #Create heat delta and give a cluster specific factor
    
    df[delta_1] = (np.random.random(size=df.shape[0])-0.3) * df[col_cluster_rank]**0.5
    df[delta_2] = (np.random.random(size=df.shape[0])-0.4)
    
    #Ensure that sum is always more than zero
    df[delta_2] = np.where((df[delta_2]+df[delta_1])<0, -1 * df[delta_1] + 0.05, df[delta_2])
    
    #Cumulative sum of heat deltas by cluster id
    df[cum_1] = df.groupby(col_cluster_id)[delta_1].cumsum()
    df[cum_2] = df.groupby(col_cluster_id)[delta_2].cumsum()
    
    #Engine heat at given moment is cumulative sum plus start heat
    df[col_1] = df[col_start_heat] + df[cum_1]
    df[col_2] = df[col_start_heat] + df[cum_2]
    
    #Drop temporary columns
    df.drop([delta_1, delta_2, cum_1, cum_2], inplace=True, axis=1)
    
    return df

def plot_heat_2d(df, col_y='engine_heat'):
    
    if col_cluster_rank in df.columns:
        label_text = "Cluster id: " + df[col_cluster_id].astype(str) + "<br>Cluster rank: " +  df[col_cluster_rank].astype(str)
    else:
        label_text = "Cluster id: " + df[col_cluster_id].astype(str)
    
    fig = go.Figure()
    fig.add_scatter(
        x=df[col_time],
        y=df[col_y],
        text=label_text,
        mode='markers',
        marker={
            'color': df[col_cluster_id],
            'colorscale': 'Rainbow',
        }
    )
    iplot(fig)
    
def plot_heat_3d(df, col_y='engine_heat_1', col_z='engine_heat_2'):
    
    series_1 = go.Scatter3d(
        x=df[col_time],
        y=df[col_y],
        z=df[col_z],
        text="Cluster id: " + df[col_cluster_id].astype(str) + "<br>Cluster rank: " +  df[col_cluster_rank].astype(str),
        mode='markers',
        marker={
            'size': 5,
            'color': df[col_cluster_id],
            'opacity': 0.8,
            'colorscale': 'Rainbow',
            'symbol': 'circle'
        }
              
    )
    
    layout = go.Layout(
        margin=dict(
            l=0,
            r=0,
            b=0,
            t=0
        ),
        scene = dict(
            xaxis = dict(title='Time'),
            yaxis = dict(title='Engine 1 Heat'),
            zaxis = dict(title='Engine 2 Heat'),
        ),
)
    
    fig = go.Figure(data=[series_1], layout=layout)
    
    iplot(fig)

## Generate initial data

In [573]:
#Display the data frame

df_init = generate_initial_data()

print("Rows in the data: {}".format(df_init.shape[0]))
display(df_init[7:17])

Cluster breakpoints: [ 0 13 25 33 55 63 71]
Rows in the data: 72


Unnamed: 0,time,cluster_id,start_heat,cluster_rank
7,7,1,20.0,7
8,8,1,20.0,7
9,9,1,20.0,7
10,10,1,20.0,7
11,11,1,20.0,7
12,12,1,20.0,7
13,13,2,18.0,6
14,14,2,18.0,6
15,15,2,18.0,6
16,16,2,18.0,6


## Case 1: Clustering a single variable
In this case we make an expectation that engine heat always increases. The clustering can thus be detected only by observing the previous data point.

### Generate data

In [574]:
case1_tbl = "case1_tbl"

df_case1 = df_init.copy()
df_case1 = generate_engine_heat_1(df_case1)
plot_heat_2d(df_case1)

### Write data to sqlite database table

In [575]:
#Write to sqlite database having only the time and heat columns
df_case1[[col_time, 'engine_heat']].to_sql(case1_tbl, con, if_exists="replace", index=False)

### Cluster by using SQL
* The innermost SQL creates a column for previous engine heat value
* The second query creates a column that indicates whether the row starts a new cluster
    * Starts a new cluster if previous value is greater than the previous
* The outermost SELECT uses cumulative sum to generate cluster id for each row

In [576]:
#The query to get all rows labled with cluster id
sql_1 = """
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY {0} RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            SELECT *, CASE WHEN ((engine_heat - engine_heat_prev) < 0) OR (engine_heat_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                SELECT {0}, {1}, 
                    LAG({1}, 1, null) OVER (ORDER BY {0}) AS engine_heat_prev
                FROM {2}
            )
        )
""".format(col_time, "engine_heat", case1_tbl)

print(sql_1)


        SELECT *, SUM(is_new_cluster) OVER (ORDER BY time RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            SELECT *, CASE WHEN ((engine_heat - engine_heat_prev) < 0) OR (engine_heat_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                SELECT time, engine_heat, 
                    LAG(engine_heat, 1, null) OVER (ORDER BY time) AS engine_heat_prev
                FROM case1_tbl
            )
        )



In [577]:
df_case1_clustered = pd.read_sql(sql_1, con)
df_case1_clustered.head(15)

Unnamed: 0,time,engine_heat,engine_heat_prev,is_new_cluster,cluster_id
0,0,20.245559,,1,1
1,1,22.599505,20.245559,0,1
2,2,24.487942,22.599505,0,1
3,3,25.858173,24.487942,0,1
4,4,27.936116,25.858173,0,1
5,5,29.490934,27.936116,0,1
6,6,31.440419,29.490934,0,1
7,7,33.219714,31.440419,0,1
8,8,35.78628,33.219714,0,1
9,9,38.401815,35.78628,0,1


### Get report for each cluster

In [578]:
#Another query to get aggregated results for each cluster
sql_1_agg = """
    SELECT {0}, COUNT({0}) AS rows_n
    FROM ({1})
    GROUP BY {0}
""".format(col_cluster_id, sql_1)

print(sql_1_agg)


    SELECT cluster_id, COUNT(cluster_id) AS rows_n
    FROM (
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY time RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            SELECT *, CASE WHEN ((engine_heat - engine_heat_prev) < 0) OR (engine_heat_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                SELECT time, engine_heat, 
                    LAG(engine_heat, 1, null) OVER (ORDER BY time) AS engine_heat_prev
                FROM case1_tbl
            )
        )
)
    GROUP BY cluster_id



In [579]:
df_case1_clustered_agg = pd.read_sql(sql_1_agg, con)
display(df_case1_clustered_agg)

Unnamed: 0,cluster_id,rows_n
0,1,13
1,2,12
2,3,8
3,4,22
4,5,8
5,6,8
6,7,1


### Visualize the clusters detected by SQL
Looks correct!

In [580]:
plot_heat_2d(df_case1_clustered)

## Case 2: Clustering a multiple variables with more variation
Even the machine learning backed clustering algorithms struggle with unclear boundaries. Let's add some noise to the data and see how the SQL clustering manages this challenge. This is done by making the `engine heat` go randomly up or down.

In this example there will be two features for the clustering: `engine heat 1` and `engine heat 2`. Now we want to detect the increase of the combined heat. 

In [595]:
case2_tbl = "case2_tbl"

df_case2 = df_init.copy()
df_case2 = generate_engine_heat_2(df_case2, col_1="engine_heat_1", col_2="engine_heat_2")

### Plot both engines in 2D

In [596]:
plot_heat_2d(df_case2, col_y="engine_heat_1")
plot_heat_2d(df_case2, col_y="engine_heat_2")

### Plot 3D

In [597]:
plot_heat_3d(df_case2)

### Write data to sqlite database table

In [598]:
#Write to sqlite database having only the time and heat columns
df_case2[[col_time, 'engine_heat_1', 'engine_heat_2']].to_sql(case2_tbl, con, if_exists="replace", index=False)

### Cluster by using SQL
The most of the logic is similar compared to the simple case 1.

These are the biggest adjustments for this query:
* Difference is calculated for both 2 engines
* Difference is calculated fot both previous and next observation
* A new cluster starts if
    * Total heat increases AND 
    * The difference to previous value is greater than difference to the next

In [601]:
#con.create_function("md5", 1, md5sum)
def sqlite_power(b, e):
    if b is None:
        return np.nan
    else:
        return b**e

con.create_function("POWER", 2, sqlite_power)

#The query to get all rows labled with cluster id
sql_2 = """
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY {0} RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            SELECT *, CASE WHEN (tot_diff_prev < 0) OR (tot_diff_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                SELECT *,
                    (engine_heat_1 - eh_1_prev) + (engine_heat_2 - eh_2_prev) AS tot_diff_prev,
                    (engine_heat_1 - eh_1_next) + (engine_heat_2 - eh_2_next) AS tot_diff_next
                FROM(
                    SELECT {0}, {1}, {2},
                        LAG({1}, 1, null) OVER (ORDER BY {0}) AS eh_1_prev,
                        LAG({2}, 1, null) OVER (ORDER BY {0}) AS eh_2_prev,
                        LAG({1}, -1, null) OVER (ORDER BY {0}) AS eh_1_next,
                        LAG({2}, -1, null) OVER (ORDER BY {0}) AS eh_2_next
                    FROM {3}
                )
            )
        )
""".format(col_time, "engine_heat_1", "engine_heat_2", case2_tbl)

print(sql_2)


        SELECT *, SUM(is_new_cluster) OVER (ORDER BY time RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            SELECT *, CASE WHEN (tot_diff_prev < 0) OR (tot_diff_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                SELECT *,
                    (engine_heat_1 - eh_1_prev) + (engine_heat_2 - eh_2_prev) AS tot_diff_prev,
                    (engine_heat_1 - eh_1_next) + (engine_heat_2 - eh_2_next) AS tot_diff_next
                FROM(
                    SELECT time, engine_heat_1, engine_heat_2,
                        LAG(engine_heat_1, 1, null) OVER (ORDER BY time) AS eh_1_prev,
                        LAG(engine_heat_2, 1, null) OVER (ORDER BY time) AS eh_2_prev,
                        LAG(engine_heat_1, -1, null) OVER (ORDER BY time) AS eh_1_next,
                        LAG(engine_heat_2, -1, null) OVER (ORDER BY time) AS eh_2_next
                    FROM case2_tbl
                )
            )
        )



In [602]:
df_case2_clustered = pd.read_sql(sql_2, con)
df_case2_clustered.head(30)

Unnamed: 0,time,engine_heat_1,engine_heat_2,eh_1_prev,eh_2_prev,eh_1_next,eh_2_next,tot_diff_prev,tot_diff_next,is_new_cluster,cluster_id
0,0,21.407242,19.790616,,,22.685591,19.795451,,-1.283183,1,1
1,1,22.685591,19.795451,21.407242,19.790616,24.418797,20.07785,1.283183,-2.015606,0,1
2,2,24.418797,20.07785,22.685591,19.795451,23.945488,20.557494,2.015606,-0.006334,0,1
3,3,23.945488,20.557494,24.418797,20.07785,25.454083,20.90994,0.006334,-1.861041,0,1
4,4,25.454083,20.90994,23.945488,20.557494,26.755612,20.85313,1.861041,-1.244719,0,1
5,5,26.755612,20.85313,25.454083,20.90994,27.612627,20.592492,1.244719,-0.596377,0,1
6,6,27.612627,20.592492,26.755612,20.85313,28.826667,20.825435,0.596377,-1.446983,0,1
7,7,28.826667,20.825435,27.612627,20.592492,28.213707,21.488395,1.446983,-0.05,0,1
8,8,28.213707,21.488395,28.826667,20.825435,28.194108,21.700641,0.05,-0.192646,0,1
9,9,28.194108,21.700641,28.213707,21.488395,29.060443,22.194329,0.192646,-1.360024,0,1


In [603]:
plot_heat_2d(df_case2_clustered, col_y="engine_heat_1")
plot_heat_2d(df_case2_clustered, col_y="engine_heat_2")