In [1]:
from sklearn.linear_model import LinearRegression, LogisticRegressionCV
from aeon.datasets import  load_from_tsfile
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from aeon.transformations.collection.convolution_based import MiniRocketMultivariate
import numpy as np

# load regression and classification datasets

In [2]:
X_train_reg, y_train_reg = load_from_tsfile("./tsCaptum/data/AppliancesEnergy_TRAIN.ts")
X_test_reg, y_test_reg = load_from_tsfile("./tsCaptum/data/AppliancesEnergy_TEST.ts")
print("regression train and test",X_test_reg.shape, y_test_reg.shape)

CMJ = np.load("tsCaptum/data/CMJ_univariate.npy",allow_pickle=True).item()
CMJ_X_train =CMJ["train"]["X"]
CMJ_X_test = CMJ["test"]["X"]
CMJ_y_train =CMJ["train"]["y"]
CMJ_y_test = CMJ["test"]["y"]
print("univariate classification", CMJ_X_train.shape, CMJ_X_test.shape)

MP = np.load("./tsCaptum/data/MP_centered.npy",allow_pickle=True).item()
MP_X_train =MP["train"]["X"]
MP_X_test = MP["test"]["X"]
MP_y_train =MP["train"]["y"]
MP_y_test = MP["test"]["y"]
print("multivariate classification", MP_X_train.shape, MP_X_test.shape)

regression train and test (42, 24, 144) (42,)
univariate classification (419, 1, 500) (179, 1, 500)
multivariate classification (1426, 8, 161) (595, 8, 161)


# train same classifiers anc checkout how the library works
after you have trained a classifier, have your sample to explain it's just a 2 step process

In [3]:
regressor = make_pipeline(MiniRocketMultivariate(n_jobs=1),
                          StandardScaler(),LinearRegression(n_jobs=-1))

regressor.fit(X_train_reg, y_train_reg)
print("metric is", regressor.score(X_test_reg,y_test_reg) )

metric is 0.5914839837241505


we're explaining only 20 samples as a demo

In [4]:
n_to_explain =20
X_test_reg, y_test_reg = X_test_reg[:n_to_explain], y_test_reg[:n_to_explain]

# Feature Ablation
now we are explaining!
1) instantiate your attribution method, the constructor takes only one mandatory argument namely the predictor and one optional argument its type (classifier or regressor). In case the last one isn't provided it's inferred by tha availability of predict_proba in the predictor
2) one you have the object call the method explain which return the saliency map. Only one mandatory argument that is the samples to be explained 

In [5]:
from tsCaptum.explainers import Feature_Ablation
myFA = Feature_Ablation(regressor)
exp = myFA.explain(samples=X_test_reg)
print( "saliency map shape equal to input shape:", exp.shape, X_test_reg.shape,
       "\n attributions for first 5 time points in first 5 channel:\n", exp[0,:5,:5])

24it [00:08,  2.86it/s]                        

saliency map shape equal to input shape: (20, 24, 144) (20, 24, 144) 
 attributions for first 5 time points in first 5 channel:
 [[-0.12783432 -0.12783432 -0.12783432 -0.12783432 -0.12783432]
 [-1.3445663  -1.3445663  -1.3445663  -1.3445663  -1.3445663 ]
 [ 0.4471445   0.4471445   0.4471445   0.4471445   0.4471445 ]
 [ 0.19569397  0.19569397  0.19569397  0.19569397  0.19569397]
 [-0.5340557  -0.5340557  -0.5340557  -0.5340557  -0.5340557 ]]





apart from sample, the explain method has some additional parameters

		:param labels:      labels associated to samples in case of classification
		:param batch_size:  the batch_size to be used i.e. number of samples to be explained at the same time
		:param n_segments:  number of segments the timeseries is dived to. If you want to explain point-wise provide -1 as value
		:param normalise:   whether or not to normalise the result
		:param baseline:    the baseline which will substitute time series's values when ablated. It can be either a scalar (each time series's value is substituted by this scalar)  or a single time series

#TODO add option for normalisation?

In [6]:
exp = myFA.explain(samples=X_test_reg,batch_size=10, n_segments=5)
for i in range(5):
    print(i, ": min and max values", exp[i].min(), exp[i].max())
    
exp_normalized = myFA.explain(samples=X_test_reg,batch_size=10, n_segments=5, normalise=True)
for i in range(5):
    print(i,"min and max values", exp_normalized[i].min(), exp_normalized[i].max() )

100%|██████████| 20/20 [00:05<00:00,  3.95it/s]


0 : min and max values -8.276247 1.4897966
1 : min and max values -8.302887 1.4988918
2 : min and max values -7.8726864 1.1513119
3 : min and max values -8.319418 1.448576
4 : min and max values -8.67679 0.69781494


100%|██████████| 20/20 [00:04<00:00,  4.48it/s]

0 min and max values -1.0 0.18000872
1 min and max values -1.0 0.18052658
2 min and max values -1.0 0.1462413
3 min and max values -1.0 0.17411987
4 min and max values -1.0 0.08042317





labels parameter only make sense if you're using a classifier
let's switch to another dataset and classifier 

In [7]:
from aeon.classification.dictionary_based import WEASEL
clf = WEASEL(window_inc=4, support_probabilities=True)
clf.fit(CMJ_X_train, CMJ_y_train)
print ("QUANT accuracy is",clf.score(CMJ_X_test,CMJ_y_test),)


QUANT accuracy is 0.9720670391061452


In [8]:
n_to_explain = 20
CMJ_X_test, CMJ_y_test = CMJ_X_test[:n_to_explain], CMJ_y_test[:n_to_explain]

# SHAP

In [9]:
from tsCaptum.explainers import Shapley_Value_Sampling as SHAP
mySHAP = SHAP(clf)
exp = mySHAP.explain(CMJ_X_test, labels=CMJ_y_test)

24it [00:17,  1.41it/s]                        


# Kernel SHAP and LIME
for kernel SHAP and Lime the Captum framework suggests to use a batch size = 1, we are enforcing this propriety 

In [10]:
from tsCaptum.explainers import Kernel_Shap
myKernelSHAP = Kernel_Shap(clf)
exp = myKernelSHAP.explain(CMJ_X_test, labels=CMJ_y_test, batch_size=4)


100%|██████████| 20/20 [00:06<00:00,  3.29it/s]


In [11]:
from tsCaptum.explainers import  LIME
myLIME = LIME(clf)
exp = myLIME.explain(CMJ_X_test, labels=CMJ_y_test, batch_size=6)


100%|██████████| 20/20 [00:06<00:00,  3.21it/s]


another important optional argument is baseline i.e. the value(s) replacing the time series's ones when ablated by the attributions
two possible format for it:
 1) a scalar i.e. a single number replacing each value to be ablated (default value is 0)

In [12]:
mySHAP = SHAP(clf)
exp = mySHAP.explain(CMJ_X_test, labels=CMJ_y_test, baseline=0)

24it [00:16,  1.48it/s]                        


2) a time series having the same shape as the one to be explained, usually one item from the train set



In [13]:
exp = mySHAP.explain(CMJ_X_test, labels=CMJ_y_test, baseline=CMJ_X_train[0:1])

24it [00:17,  1.39it/s]                        


# Feature Permutation 
this is the last explainer, the only one that not accept a baseline as argument

In [14]:
from tsCaptum.explainers import Feature_Permutation
myFP = Feature_Permutation(clf,clf_type="classifier")
exp = myFP.explain(CMJ_X_test, labels=CMJ_y_test, baseline=42)

24it [00:00, 29.36it/s]                        


# finally we explain a multivariate dataset, first of all using the default arguments

In [15]:
clf_MTS = make_pipeline(MiniRocketMultivariate(n_jobs=-1),
                          StandardScaler(),LogisticRegressionCV(max_iter=200, n_jobs=-1))
clf_MTS.fit(MP_X_train,MP_y_train)
print("accuracy is", clf_MTS.score(MP_X_test,MP_y_test))

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


accuracy is 0.761344537815126


In [16]:
n_to_explain = 5
MP_X_test_samples, MP_y_test_samples = MP_X_test[:n_to_explain], MP_y_test[:n_to_explain]

In [17]:
myFA_MTS = Feature_Ablation(clf_MTS, clf_type="classifier")
exp = myFA_MTS.explain( samples= MP_X_test_samples, labels=MP_y_test_samples, batch_size=8, n_segments=10, normalise=False, baseline=0)

8it [00:00, 11.59it/s]               


then try different classifiers and different arguments for attribution


In [18]:
from aeon.classification.dictionary_based import MUSE
clf_MTS = MUSE(window_inc=4, use_first_order_differences=False, support_probabilities=True)
clf_MTS.fit(MP_X_train,MP_y_train)
print("accuracy is", clf_MTS.score(MP_X_test,MP_y_test))

accuracy is 0.6840336134453782


we can checkout the n_segments purpose. #TODO write a bit more about it?

In [19]:
my_explainer = Kernel_Shap(clf_MTS)
exps = my_explainer.explain( samples=MP_X_test_samples, labels=MP_y_test_samples, n_segments=10)
for i,exp in enumerate(exps):
    print( i , np.unique(exp).shape )


exps = my_explainer.explain( samples=MP_X_test_samples, labels=MP_y_test_samples, n_segments=5)
for i,exp in enumerate(exps):
	print( i , np.unique(exp).shape )

100%|██████████| 5/5 [00:08<00:00,  1.73s/it]


0 (71,)
1 (79,)
2 (73,)
3 (70,)
4 (78,)


100%|██████████| 5/5 [00:08<00:00,  1.69s/it]

0 (40,)
1 (39,)
2 (40,)
3 (40,)
4 (39,)





# QUANT 

In [20]:
from aeon.classification.interval_based import QUANTClassifier
clf_MTS = QUANTClassifier()
clf_MTS.fit(MP_X_train,MP_y_train)
print("accuracy is", clf_MTS.score(MP_X_test,MP_y_test))

accuracy is 0.6991596638655462


In [21]:
my_explainer = Feature_Permutation(clf_MTS)

exps = my_explainer.explain( samples=MP_X_test_samples, labels=MP_y_test_samples, n_segments=5, normalise=False)
print(" min and max attribution without normalisation:")
for i,exp in enumerate(exps):
	print( i , '{:.4f}'.format(exp.min()),"\t", '{:.4f}'.format(exp.max()) )

exps = my_explainer.explain( samples=MP_X_test_samples, labels=MP_y_test_samples, n_segments=5, normalise=True)
print(" min and max attribution with normalisation:")
for i,exp in enumerate(exps):
	print( i , '{:.4f}'.format(exp.min()),"\t", '{:.4f}'.format(exp.max()) )

8it [00:01,  4.78it/s]               


 min and max attribution without normalisation:
0 -0.0300 	 0.0750
1 -0.0700 	 0.0750
2 -0.0200 	 0.1800
3 -0.0100 	 0.0800
4 -0.0750 	 0.0150


8it [00:01,  5.11it/s]               

 min and max attribution with normalisation:
0 -0.2353 	 1.0000
1 -1.0000 	 0.8824
2 -0.1818 	 1.0000
3 0.0000 	 1.0000
4 -1.0000 	 0.6667





# Rocket classifier

In [22]:
from aeon.classification.convolution_based import RocketClassifier
clf_MTS = RocketClassifier(num_kernels=500,rocket_transform="rocket",n_jobs=-1)
clf_MTS.fit(MP_X_train,MP_y_train)
print( clf_MTS.score(MP_X_test,MP_y_test))

0.6722689075630253


In [23]:
my_explainer = Feature_Permutation(clf_MTS)
exp = my_explainer.explain(samples=MP_X_test_samples, labels=MP_y_test_samples, n_segments=5)

8it [00:00, 16.45it/s]               
