# ------------ TIM Python Client - KPI Driven Anomaly Detection ------------

# 0. Setup

In [1]:
import pandas as pd
import json
import datetime as dt
import numpy as np
import plotly as plt
import plotly.express as px
import plotly.graph_objects as go
import plotly.subplots as splt

In [2]:
import tim
client = tim.Tim(email='',password='',server='')

In [3]:
def call_tim_function(action,object_type,arguments=None):
    functions = {
        'list':{
            'user_group':client.user_groups.list_user_group,
            'workspace':client.workspaces.list_workspace,
            'dataset':client.datasets.list_dataset,
            'dataset_version':client.datasets.list_dataset_versions,
            'use_case':client.use_cases.list_use_case,
            'experiment':client.experiments.list_experiment
        },
        'create':{
            'user_group':client.user_groups.create_user_group,
            'workspace':client.workspaces.create_workspace,
            'use_case':client.use_cases.create_use_case,
            'experiment':client.experiments.create_experiment
        }
    }
    response = functions[action][object_type](**arguments)
    return response

def create_tim_object_configuration(pipeline,object_type,parameters,name):
    
    if object_type=='user_group':
        configuration = {"name": name,"users": [{"id": client.users.details_user()['id'],"isOwner": True}]}
    if object_type=='workspace':
        configuration = {"name": name,"userGroup": {"id": parameters['user_group_id']}}
    if object_type=='dataset':
        try:
            versionName = pipeline['dataset_version']['name']
        except:
            versionName = 'initial upload'
        configuration = {"name": name,"workspace": {"id": parameters['workspace_id']},"versionName":versionName}
    if object_type=='dataset_version':
        configuration = update_dataset_configuration = {"versionName": name}
    if object_type=='use_case':
        configuration = {"name": name,"workspace": {"id": parameters['workspace_id']},"dataset": {"id": parameters['dataset_id']}}
    if object_type=='experiment':
        configuration = {"name": name,"useCase": {"id": parameters['use_case_id']},"type": pipeline['experiment']['create']['type']}
    return configuration

def check_tim_object(pipeline,object_type,parameters):
    try:
        object_id = pipeline[object_type]['id']
        print(object_type,'id available.')
    except:
        try:
            object_name = pipeline[object_type]['name']
            object_list = [f for f in call_tim_function('list',object_type,parameters) if f['name']==object_name]
            tim_object = object_list[0]
            object_id = tim_object['id']
            print(object_type,'found by name.')
        except:
            try:
                add_to_configuration = pipeline[object_type]['create']['configuration']
                object_name = add_to_configuration['versionName'] if object_type == 'dataset_version' else add_to_configuration['name']
                create_configuration = create_tim_object_configuration(pipeline,object_type,parameters,object_name)
                object_configuration = {**add_to_configuration, **create_configuration}
            except:
                object_name = pipeline[object_type]['name'] 
                object_configuration = create_tim_object_configuration(pipeline,object_type,parameters,object_name)
            if object_type == 'dataset':
                tim_file = pipeline[object_type]['create']['file']
                tim_upload = client.upload_dataset(
                    dataset = tim_file,
                    configuration = object_configuration,
                    outputs = ['response'],
                    status_poll = print,
                    tries_left = 300
                )
                tim_object = tim_upload.response
            elif object_type == 'dataset_version':
                tim_file = pipeline[object_type]['create']['file']
                tim_update = client.update_dataset(
                    dataset_id = parameters['id'],
                    dataset_version = tim_file,
                    configuration = object_configuration,
                    outputs = ['response'],
                    status_poll = print,
                    tries_left = 300
                )
                tim_object = tim_update.response['version']                
            else:
                tim_object = call_tim_function('create',object_type,{'configuration':object_configuration})
            object_id = tim_object['id']
            print(object_type,'created.')
    return object_id

def tim_pipeline_setup(pipeline):
    try:
        response = {'name':pipeline['name']}
    except:
        pass
    try:
        user_group_id = check_tim_object(pipeline=pipeline,object_type='user_group',parameters={})
        response['user_group'] = user_group_id
    except:
        pass
    try:
        workspace_id = check_tim_object(pipeline=pipeline,object_type='workspace',parameters={'user_group_id':user_group_id})
        response['workspace'] = workspace_id
    except:
        pass
    try:
        dataset_id = check_tim_object(pipeline=pipeline,object_type='dataset',parameters={'workspace_id':workspace_id})
        response['dataset'] = dataset_id
    except:
        pass
    try:
        dataset_version_id = check_tim_object(pipeline=pipeline,object_type='dataset_version',parameters={'id':dataset_id})
        response['dataset_version'] = dataset_version_id
    except:
        pass
    try:
        use_case_id = check_tim_object(pipeline=pipeline,object_type='use_case',parameters={'workspace_id':workspace_id,'dataset_id':dataset_id})
        response['use_case'] = use_case_id    
    except:
        pass
    try:
        experiment_id = check_tim_object(pipeline=pipeline,object_type='experiment',parameters={'use_case_id':use_case_id})
        response['experiment'] = experiment_id   
    except:
        pass
    return response

In [4]:
from concurrent.futures import ThreadPoolExecutor
from IPython.display import clear_output
import time
from copy import deepcopy

class parallel:
    def run(requests,poll = True):
        tasks = deepcopy(requests)
        executor = ThreadPoolExecutor(max_workers=32)
        futures = []

        for i, task in enumerate(tasks):
            if not isinstance(task, dict):  # type: ignore
                raise ValueError(f'Request {i+1} must be a dict')

            if 'func' not in task:
                raise KeyError(f"Request {i+1} must contain key 'func'")

            if 'args' in task and not isinstance(task['args'], list):
                raise ValueError(f"Property 'args' of request {i+1} must be of type list")
            elif 'args' not in task:
                task['args'] = []

            if 'kwargs' in task and not isinstance(task['kwargs'], dict):
                raise ValueError(f"Property 'kwargs' of request {i+1} must be of type dict")
            elif 'kwargs' not in task:
                task['kwargs'] = {}

        for task in tasks:
            func = task['func']
            args = task['args']
            kwargs = task['kwargs']

            futures.append(executor.submit(func, *args, **kwargs))  # type: ignore
        count = 0
        for future, task in zip(futures, tasks):
            task['exception'] = None
            task['return_value'] = None

            if future.exception() is not None:
                task['exception'] = future.exception()
            else:
                task['return_value'] = future.result()
            if poll:
                clear_output(wait=True)
                count +=1
                print(count,'/',len(futures))
        return tasks
    
def poll_status(status_requests,nb_checks=100):
    open_check = [1]
    idx = 0
    while (len(open_check)>0 and idx<nb_checks):
        clear_output(wait=True)
        status_response = parallel.run(status_requests,False)
        status_jobs = [f['return_value'] for f in status_response]
        open_check = list(set([f.get('status') for f in status_jobs]).intersection(['Registered','Queued','Running']))
        value_counter = {}
        for cl in status_jobs:
            v = cl['status']
            if v not in value_counter: value_counter[v] = 0
            value_counter[v] += 1
        idx += 1
        progress = round(sum([a for a in [f.get('progress') for f in status_jobs] if a is not None])/len(status_jobs),2)
        print('check status:',idx,value_counter,progress)
        if len(open_check)>0: time.sleep(2)

# 1. Data Preparation

In [5]:
csv_df = pd.read_csv('production_line.csv')
print(csv_df.columns)

Index(['Datetime', 'Output', 'AmbientHumidity', 'AmbientTemperature',
       'M1_RawMaterial_1', 'M1_RawMaterial_2', 'M1_RawMaterial_3',
       'M1_RawMaterial_4', 'M1_RawMaterialFeeder', 'M1_Zone1_Temperature',
       'M1_Zone2_Temperature', 'M1_MotorAmperage', 'M1_MotorRPM',
       'M1_MaterialPressure', 'M1_MaterialTemperature',
       'M1_ExitZoneTemperature', 'M2_RawMaterial_1', 'M2_RawMaterial_2',
       'M2_RawMaterial_3', 'M2_RawMaterial_4', 'M2_RawMaterialFeeder',
       'M2_Zone1_Temperature', 'M2_Zone2_Temperature', 'M2_MotorAmperage',
       'M2_MotorRPM', 'M2_MaterialPressure', 'M2_MaterialTemperature',
       'M2_ExitZoneTemperature', 'M3_RawMaterial_1', 'M3_RawMaterial_2',
       'M3_RawMaterial_3', 'M3_RawMaterial_4', 'M3_RawMaterialFeeder',
       'M3_Zone1_Temperature', 'M3_Zone2_Temperature', 'M3_MotorAmperage',
       'M3_MotorRPM', 'M3_MaterialPressure', 'M3_MaterialTemperature',
       'M3_ExitZoneTemperature', 'Stage_1_Temperature1',
       'Stage_1_Temperature2'

In [6]:
tim_dataset = csv_df.copy()
timestamp = 'Datetime'
target = 'Output'
predictors = [s for s in list(tim_dataset.columns) if s not in [timestamp,target]]
tim_dataset = tim_dataset[[timestamp,target]+predictors].reset_index(drop=True)

In [None]:
v_data = tim_dataset.copy()
fig = splt.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
fig.add_trace(go.Scatter(x=v_data[timestamp], y=v_data[target], name=target,connectgaps=True), row=1, col=1)
for idx, p in enumerate(predictors): fig.add_trace(go.Scatter(x=v_data[timestamp], y=v_data[p], name=p,connectgaps=True), row=2, col=1)
fig.update_layout(height=600, width=1200, title_text="Data visualization")
fig.show()    

# 2. TIM

## 2.1 Workflow

In [8]:
batch_size = len(tim_dataset)//10
initial_upload_df = tim_dataset[:batch_size]

In [9]:
upload_dataset_configuration = {
    # "timestampColumn": timestamp,
    # "groupKeys": group_keys,
    "name": "production_line"
}
# -----------------------------------------------------------------------------------------------------------------------
user_group = {
    # 'id':'135c50cd-e6ea-423e-ae2b-581564cb9cbc',
    'name':'POV',
    # 'create':{'configuration':create_user_group_configuration}
}
workspace = {
    # 'id':'e3e34c8f-3864-4199-af4b-70366c6a79db',
    'name':'Templates',
    # 'create':{'configuration':create_workspace_configuration}
}
dataset = {
    # 'id':'0c217702-6d7c-4349-9345-a5d8c9120881',
    # 'name':upload_dataset_configuration['name'],
    'create':{'configuration':upload_dataset_configuration,'file':initial_upload_df}
}
dataset_version = {
    # 'id':'03ea3953-3956-4155-8ad8-f3c95daadd5f',
    # 'name':'panel_data_demo',
    # 'create':{'configuration':update_dataset_configuration,'file':tim_dataset.tail(28)}
}
use_case = {
    # 'id':'3b8ad8ce-8516-4ed4-a9e7-e891ed5e176c',
    'name':'tag_subselection',
    # 'create':{'configuration':create_use_case_configuration}
}
experiment = {
    # 'id':'bc82c706-be70-4726-8976-dbadf75e7385',
    'name':'KPI Driven Anomaly Detection',
    'create':{
        # 'configuration':create_use_case_configuration,
        'type':'AnomalyDetection'
    }
}
# -----------------------------------------------------------------------------------------------------------------------
pipeline_input = {
    'name':'pipeline_1',
    'user_group':user_group,
    'workspace':workspace,
    'dataset':dataset,
    'dataset_version':dataset_version,
    'use_case':use_case,
    'experiment':experiment
}
# -----------------------------------------------------------------------------------------------------------------------
pipeline_response = tim_pipeline_setup(pipeline=pipeline_input)
pipeline_response

user_group found by name.
workspace found by name.
{'status': 'Running', 'progress': 80.0, 'createdAt': '2023-10-18T11:54:25.217Z'}
dataset created.
use_case created.
experiment created.


{'name': 'pipeline_1',
 'user_group': '4f2bbd5d-4b40-473f-9cae-7c03508ae11c',
 'workspace': '7f5a354a-026e-431c-85e9-261ec543329c',
 'dataset': 'b225034c-d0ce-475d-95aa-642300a75dd6',
 'use_case': '46bc82a5-4fe0-4ec4-b018-fc66af8b5b8b',
 'experiment': 'd76a6521-731b-4385-981e-40820793c881'}

In [10]:
dataset_id = pipeline_response['dataset']
use_case_id = pipeline_response['use_case']
experiment_id = pipeline_response['experiment']

## 2.2 Dataset Management

In [12]:
update_dataset_configuration = {}

In [13]:
for i in range(1,11):
    dataset = tim_dataset[batch_size*i:batch_size*(i+1)]
    response = client.extended_datasets.update_dataset(
        dataset_id = dataset_id,
        dataset_version = dataset,
        configuration = update_dataset_configuration,
        wait_to_finish = True,
        outputs = [
            'response',
            # 'logs',
            # 'details'
        ],
        status_poll = print,
        tries_left = 300,
    )
    print(i)

{'status': 'Running', 'progress': 85.0, 'createdAt': '2023-10-18T11:55:00.812Z'}
1
{'status': 'Running', 'progress': 85.0, 'createdAt': '2023-10-18T11:55:05.079Z'}
2
{'status': 'Running', 'progress': 85.0, 'createdAt': '2023-10-18T11:55:09.296Z'}
3
{'status': 'Running', 'progress': 85.0, 'createdAt': '2023-10-18T11:55:13.479Z'}
4
{'status': 'Running', 'progress': 85.0, 'createdAt': '2023-10-18T11:55:17.602Z'}
{'status': 'Finished', 'progress': 100.0, 'createdAt': '2023-10-18T11:55:17.602Z'}
5
{'status': 'Running', 'progress': 85.0, 'createdAt': '2023-10-18T11:55:21.666Z'}
6
{'status': 'Running', 'progress': 80.0, 'createdAt': '2023-10-18T11:55:25.930Z'}
7
{'status': 'Running', 'progress': 80.0, 'createdAt': '2023-10-18T11:55:29.958Z'}
8
{'status': 'Running', 'progress': 80.0, 'createdAt': '2023-10-18T11:55:33.541Z'}
9
{'status': 'Running', 'progress': 80.0, 'createdAt': '2023-10-18T11:55:37.012Z'}
{'status': 'Finished', 'progress': 100.0, 'createdAt': '2023-10-18T11:55:37.012Z'}
10


## 2.3 Subselection

### 2.3.1 Configuration

In [14]:
detection_build_kpi_model_configuration = {
    "name": "Subselection model",
    "useCase": {"id": use_case_id},
    "experiment": {"id": experiment_id},
    "configuration": {
        "domainSpecifics": [
            {
                "perspective": "Residual",
                "sensitivity": 0,
                "minSensitivity": 0,
                "maxSensitivity": 0
            }
        ],
        "normalBehaviorModel": {
#             "useNormalBehaviorModel": True,
#             "normalization": True,
#             "maxModelComplexity": 50,
            "features": [
                "Identity",
            ],
#             "dailyCycle": true,
#             "useKPIoffsets": true,
#             "allowOffsets": true,
#             "offsetLimit": {"type": "Explicit","value": 0}
#         },
#         "anomalousBehaviorModel": {
#             "maxModelComplexity": 15,
#             "detectionIntervals": [
#                 {"type": "Hour","value": "8-16"}
#             ]
        }
    },
    # "data": {
#         "version": {"id": "a74ae716-a86e-47f0-8a50-d8b21d6d7dd6"},
        # "rows": {"type":"First","baseUnit": "Sample","value": in_sample_rows}, #{"type":"Last","baseUnit": "Sample","value": 1} or [{"from": "yyyy-mm-dd HH:MM:SS","to": "yyyy-mm-dd HH:MM:SS"}]
#         "columns": [
#             1,
#             3,
#             "wind_speed"
#         ],
#         "KPIColumn": "rotor_speed",
#         "holidayColumn": "PH",
#         "labelColumn": "LABEL",
#         "imputation": {"type": "LOCF","maxGapLength": 6},
#         "timeScale": {"baseUnit": "Hour","value": 1},
#         "aggregation": "Mean",
#         "updates": [
#             {
#                 "column": "wind_speed",
#                 "updateTime": [
#                     {"type": "Hour","value": "1,12,23"}
#                 ],
#                 "updateUntil": {"baseUnit": "Hour","offset": -2}
#             }
#         ]
    # }
}
detection_build_kpi_model_configuration

{'name': 'Subselection model',
 'useCase': {'id': '46bc82a5-4fe0-4ec4-b018-fc66af8b5b8b'},
 'experiment': {'id': 'd76a6521-731b-4385-981e-40820793c881'},
 'configuration': {'domainSpecifics': [{'perspective': 'Residual',
    'sensitivity': 0,
    'minSensitivity': 0,
    'maxSensitivity': 0}],
  'normalBehaviorModel': {'features': ['Identity']}}}

### 2.2.2 API Call

In [15]:
detection_build_kpi_model = client.detection_build_kpi_model(
    configuration = detection_build_kpi_model_configuration,
    # dataset_id = dataset_id,
    # execute = True,
    # wait_to_finish = True,
    outputs = [
        'id',
        'details',
        'logs',
        'status',
        'table',
        'model',
        'accuracies'
    ],
    status_poll = print,
    # tries_left = 300
 )

{'status': 'Running', 'progress': 6.5, 'CPU': 0.11, 'memory': 4209.0, 'createdAt': '2023-10-18T11:57:19.528Z'}
{'status': 'Running', 'progress': 60.0, 'CPU': 0.11, 'memory': 4203.0, 'createdAt': '2023-10-18T11:57:20.847Z'}
{'status': 'Running', 'progress': 60.0, 'CPU': 0.11, 'memory': 4203.0, 'createdAt': '2023-10-18T11:57:20.847Z'}
{'status': 'Running', 'progress': 100.0, 'CPU': 0.27, 'memory': 4217.0, 'createdAt': '2023-10-18T11:57:26.738Z'}
{'status': 'Finished', 'progress': 100.0, 'CPU': 0.27, 'memory': 4217.0, 'createdAt': '2023-10-18T11:57:26.738Z'}


In [16]:
detection_build_kpi_model_id = detection_build_kpi_model.id
detection_build_kpi_model_details = detection_build_kpi_model.details
detection_build_kpi_model_logs = detection_build_kpi_model.logs
detection_build_kpi_model_status = detection_build_kpi_model.status
detection_build_kpi_model_table = detection_build_kpi_model.table
detection_build_kpi_model_model = detection_build_kpi_model.model
detection_build_kpi_model_accuracies = detection_build_kpi_model.accuracies

# 3. Results

In [17]:
properties_df = client.post_process.properties(detection_build_kpi_model_model)
features_df = client.post_process.features(detection_build_kpi_model_model)
model_logs_df = pd.DataFrame(detection_build_kpi_model_logs).sort_values(by='createdAt').reset_index(drop=True)

## 3.1 Visual

In [None]:
fig = splt.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
fig.add_trace(go.Scatter(x=tim_dataset[timestamp], y=tim_dataset[target], name=target, line=dict(color='black')), row=1, col=1)
fig.add_trace(go.Scatter(x=detection_build_kpi_model_table['timestamp'], y=detection_build_kpi_model_table['normal_behavior'], name='InSample Normal Behavior', line=dict(color='goldenrod')), row=1, col=1)
for ai in [f for f in detection_build_kpi_model_table.columns if 'anomaly_indicator' in f]:
    fig.add_trace(go.Scatter(x=detection_build_kpi_model_table['timestamp'], y=detection_build_kpi_model_table[ai], name= ai.replace('anomaly_indicator_','')+' InSample'), row=2, col=1)
    va = detection_build_kpi_model_table[detection_build_kpi_model_table[ai]>=1]
    fig.add_trace(go.Scatter(x=va['timestamp'], y=va['kpi'], name=ai.replace('anomaly_indicator_','')+' anomaly inSample',mode='markers', line={'color': 'red'}), row=1, col=1)
fig.add_hline(y=1, line_color="orange", row=2, col=1)
fig.update_layout(height=700, width=1400, title_text="Results")
fig.show()

## 3.3 Insights

In [None]:
fig1 = go.Figure(go.Bar(x=properties_df['name'], y=properties_df['rel_importance'],text=round(properties_df['rel_importance'],2),textposition='auto'))
fig1.update_layout(height=500,width=1200,title_text='Predictor Importances',xaxis_title='name',yaxis_title='rel_importance')
print('Predictors not used:'+str(list(set(predictors+[target])-set(list(properties_df['name'])))))
fig1.show()

In [None]:
fig = px.sunburst(features_df, path=['Model','Feature'], values='importance',color='Feature')
fig.update_layout(height=700,width=700,title_text='Feature Importances')
fig.show()

In [21]:
warnings = list(model_logs_df[model_logs_df['messageType'] == "Warning"]['message'])
warnings

[]

In [22]:
model_logs_df

Unnamed: 0,message,messageType,createdAt,origin
0,The job is categorized as light.,Info,2023-10-18T11:57:18.090Z,Registration
1,Expected result table size is 1.3 MiB.,Info,2023-10-18T11:57:18.090Z,Registration
2,Job waiting in the queue mlJobs_Light with pri...,Info,2023-10-18T11:57:18.837Z,Execution
3,Executing job.,Info,2023-10-18T11:57:18.910Z,Execution
4,"Detection job, type: model building, approach:...",Info,2023-10-18T11:57:18.941Z,Execution
5,"Getting data from dataset version ""4c153b7f-f3...",Info,2023-10-18T11:57:18.968Z,Execution
6,Used sampling period 1 second.,Info,2023-10-18T11:57:19.457Z,Execution
7,Validation successful.,Info,2023-10-18T11:57:19.513Z,Execution
8,Building the normal behavior model.,Info,2023-10-18T11:57:19.524Z,Execution
9,Parameter useKPIoffsets is set to true. Reason...,Info,2023-10-18T11:57:19.562Z,Execution
