Before running this notebook ensure that pydruid, sqlalchemy and pycaret are installed in your python environment. This notebook was tested on pythin 3.8

In [1]:
from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *

In [4]:
import numpy as np
import pandas as pd
from pykafka import KafkaClient

create druid connection using sqlalchemy. The below assumes that basic auth is enabled on druid.  

In [3]:
engine2 = create_engine('druid+https://<user>:<password>@<host name>:<port>/druid/v2/sql/',
                       connect_args={"ssl_verify_cert": False}) 





execute druid sql query and convert result to data frame. This query brings in the last 10s of data

In [4]:
conn2=engine2.connect()
result2 = conn2.execute(text("select * from ml1 where __time > TIME_SHIFT(CURRENT_TIMESTAMP,'PT10S',-1)"))
df2=pd.DataFrame(result2.mappings())



the below dataset is the python iris dataset with a prediction using a classifier (random forest). 
The predictions along with the input are ingested into druid

In [5]:
df2.head()

Unnamed: 0,__time,col1,col2,col3,col4,predict
0,2021-08-30T05:20:31.003Z,5.9,4.1,2.5,1.1,0
1,2021-08-30T05:20:31.006Z,5.8,4.0,2.4,1.3,0
2,2021-08-30T05:20:31.010Z,5.4,4.0,2.3,1.2,0
3,2021-08-30T05:20:31.014Z,5.8,4.4,2.6,1.2,0
4,2021-08-30T05:20:31.017Z,7.7,4.3,6.7,3.1,1


setup pycaret and train anomaly model

In [6]:
import pycaret

In [7]:
from pycaret.anomaly import *

In [8]:
exp1=setup(df2,normalize = True,silent=True,session_id=123)

Unnamed: 0,Description,Value
0,session_id,123
1,Original Data,"(1839, 6)"
2,Missing Values,False
3,Numeric Features,4
4,Categorical Features,1
5,Ordinal Features,False
6,High Cardinality Features,False
7,High Cardinality Method,
8,Transformed Data,"(1839, 12)"
9,CPU Jobs,-1


In [9]:

iforest = create_model('iforest')

In [10]:
iforest_results = assign_model(iforest)

In [11]:
iforest_results[iforest_results['Anomaly']==1].shape

(86, 8)

In [12]:
iforest_results[iforest_results['Anomaly']==0].shape

(1753, 8)

In [13]:
iforest_results.head()

Unnamed: 0,__time,col1,col2,col3,col4,predict,Anomaly,Anomaly_Score
0,2021-08-30T05:20:31.003Z,5.9,4.1,2.5,1.1,0,0,-0.068035
1,2021-08-30T05:20:31.006Z,5.8,4.0,2.4,1.3,0,0,-0.068056
2,2021-08-30T05:20:31.010Z,5.4,4.0,2.3,1.2,0,0,-0.094751
3,2021-08-30T05:20:31.014Z,5.8,4.4,2.6,1.2,0,0,-0.119813
4,2021-08-30T05:20:31.017Z,7.7,4.3,6.7,3.1,1,0,-0.122804


In [14]:
save_model(iforest, model_name='iforest')
#save model

Transformation Pipeline and Model Successfully Saved


(Pipeline(memory=None,
          steps=[('dtypes',
                  DataTypes_Auto_infer(categorical_features=[],
                                       display_types=False, features_todrop=[],
                                       id_columns=[], ml_usecase='regression',
                                       numerical_features=[],
                                       target='UNSUPERVISED_DUMMY_TARGET',
                                       time_features=[])),
                 ('imputer',
                  Simple_Imputer(categorical_strategy='most frequent',
                                 fill_value_categorical=None,
                                 fill_value_numerical=Non...
                 ('fix_perfect', 'passthrough'),
                 ('clean_names', Clean_Colum_Names()),
                 ('feature_select', 'passthrough'), ('fix_multi', 'passthrough'),
                 ('dfs', 'passthrough'), ('pca', 'passthrough'),
                 ['trained_model',
                  IFo

In [15]:
loaded_iforest = load_model('iforest')
#load saved model

Transformation Pipeline and Model Successfully Loaded


In [16]:
result3 = conn2.execute(text("select * from ml1 where __time > TIME_SHIFT(CURRENT_TIMESTAMP,'PT10S',-1)"))
df3=pd.DataFrame(result3.mappings())

In [17]:
pred_new = predict_model(loaded_iforest , data=df3)

In [18]:
pred_new.head()

Unnamed: 0,__time,col1,col2,col3,col4,predict,Anomaly,Anomaly_Score
0,2021-08-30T05:21:04.006Z,6.8,3.7,4.9,2.2,1,0,-0.164129
1,2021-08-30T05:21:04.012Z,6.8,5.0,2.2,1.2,1,0,-0.022609
2,2021-08-30T05:21:04.021Z,7.2,3.2,5.5,2.5,2,0,-0.051693
3,2021-08-30T05:21:04.025Z,7.9,4.1,5.9,2.5,1,0,-0.138968
4,2021-08-30T05:21:04.035Z,6.7,3.6,4.5,2.0,1,0,-0.111961


In [19]:
len(pred_new)

496

In [20]:
pred_new.iloc[0]

__time           2021-08-27T07:42:12.004Z
col1                                  7.3
col2                                  4.3
col3                                  7.0
col4                                  3.5
predict                                 1
Anomaly                                 0
Anomaly_Score                   -0.028845
Name: 0, dtype: object

contunously query druid every 10s and run the results through the anomaly model and post the anomaly detetction output back into druid

In [None]:
client = KafkaClient(hosts='localhost:9092')
topic = client.topics['anomaly']
producer = topic.get_sync_producer()
while true:
    result3 = conn2.execute(text("select * from ml1 where __time > TIME_SHIFT(CURRENT_TIMESTAMP,'PT10S',-1)"))
    df3=pd.DataFrame(result3.mappings())
    pred_new = predict_model(loaded_iforest , data=df3)
    print(pred_new.head())
    for i in range(len(pred_new)):

        p=pred_new.iloc[i].to_json()
        print(p)
        producer.produce(p.encode('ascii'))
    #n=json.dumps(pred_new)
    time.sleep(10)