Skip to content

Commit

Permalink
Re-architectured evaluation and visualization, better scaling capabil…
Browse files Browse the repository at this point in the history
…ities
  • Loading branch information
guimatsumoto committed Jun 28, 2017
1 parent 1864e2c commit 9109c72
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 239 deletions.
6 changes: 3 additions & 3 deletions skmultiflow/demos/_test_prequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ def demo(output_file=None, instances=40000):
pipe = Pipeline([('Classifier', classifier)])

# Setup the evaluator
eval = EvaluatePrequential(show_performance=True, pretrain_size=1000, show_kappa=True, max_instances=instances, batch_size=1,
show_scatter_points=False, n_wait=200, max_time=1000, output_file=output_file, track_global_kappa=True)
eval = EvaluatePrequential(pretrain_size=10000, max_instances=instances, batch_size=1, n_wait=200, max_time=1000,
output_file=output_file, task_type='classification', show_plot=True, plot_options=['kappa', 'performance'])

# Evaluate
eval.eval(stream=stream, classifier=pipe)

if __name__ == '__main__':
demo('log1.csv', 40000)
demo('log1.csv', 400000)
179 changes: 80 additions & 99 deletions skmultiflow/evaluation/evaluate_prequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
from skmultiflow.visualization.evaluation_visualizer import EvaluationVisualizer
from skmultiflow.core.utils.utils import dict_to_tuple_list
from skmultiflow.core.utils.data_structures import FastBuffer
from skmultiflow.evaluation.measure_collection import WindowClassificationMeasurements, ClassificationMeasurements
from timeit import default_timer as timer



class EvaluatePrequential(BaseEvaluator):
def __init__(self, n_wait=200, max_instances=100000, max_time=float("inf"), output_file=None,
show_performance=False, batch_size=1, pretrain_size=200, show_kappa = False, track_global_kappa=False, show_scatter_points=False):
batch_size=1, pretrain_size=200, task_type='classification', show_plot=False, plot_options=None):
"""
Parameter show_scatter_points should only be used for small datasets, and non-intensive evaluations, as it
will drastically slower the evaluation process.
:param n_wait: int. Number of samples processed between metric updates, including plot points
:param max_instances: int. Maximum number of samples to be processed
:param max_time: int. Maximum amount of time, in seconds, that the evaluation can take
Expand All @@ -32,54 +32,66 @@ def __init__(self, n_wait=200, max_instances=100000, max_time=float("inf"), outp
:param track_global_kappa: If true will keep track of a global kappa statistic. Will consume more memory and will be plotted if show_kappa is True.
:param show_scatter_points: boolean. If True the visualization module will display a scatter of True labels vs Predicts.
"""
PLOT_TYPES = ['performance', 'kappa', 'scatter', 'hamming_score']
TASK_TYPES = ['classification', 'Classification', 'CLASSIFICATION',
'regression', 'Regression', 'REGRESSION',
'multi_output', 'Multi_output', 'Multi_Output', 'MULTI_OUTPUT']
super().__init__()
# default values
self.n_wait = n_wait
self.max_instances = max_instances
self.max_time = max_time
self.batch_size = batch_size
self.pretrain_size = pretrain_size
self.show_performance = show_performance
self.show_kappa = show_kappa
self.show_scatter_points = show_scatter_points
self.track_global_kappa = track_global_kappa
self.classifier = None
self.stream = None
self.output_file = output_file
self.visualizer = None
# performance stats
self.global_correct_predicts = 0
self.partial_correct_predicts = 0

#plotting configs
self.task_type = task_type
if self.task_type not in TASK_TYPES:
raise ValueError('Task type not supported.')
self.show_plot = show_plot
self.plot_options = None
if self.show_plot is True and plot_options is None:
if self.task_type == 'classification':
self.plot_options = ['performance', 'kappa']
elif self.task_type == 'regression':
self.plot_options = ['performance', 'scatter']
elif self.task_type == 'multi_output':
self.plot_options = ['hamming_score']
elif self.show_plot is True and plot_options is not None:
self.plot_options = plot_options
for i in range(len(self.plot_options)):
if self.plot_options[i] not in PLOT_TYPES:
raise ValueError('Plot type not supported.')

#metrics
self.global_classification_metrics = None
self.partial_classification_metrics = None
self.global_classification_metrics = ClassificationMeasurements()
self.partial_classification_metrics = WindowClassificationMeasurements(window_size=self.n_wait)
self.global_sample_count = 0
self.partial_sample_count = 0
self.global_accuracy = 0
# kappa stats
self.global_kappa = 0.0
self.all_labels = []
self.all_predicts = []
self.kappa_count = 0
self.kappa_predicts = FastBuffer(n_wait)
self.kappa_true_labels = FastBuffer(n_wait)

warnings.filterwarnings("ignore", ".*invalid value encountered in true_divide.*")

def eval(self, stream, classifier):
if self.show_performance or self.show_kappa or self.show_scatter_points:
if self.show_plot:
self.start_plot(self.n_wait, stream.get_plot_name())
self.classifier = classifier
self.stream = stream
self.classifier = self.train_and_test(stream, classifier)
if self.show_performance or self.show_kappa or self.show_scatter_points:
if self.show_plot:
self.visualizer.hold()
return self.classifier

def train_and_test(self, stream = None, classifier = None):
def train_and_test(self, stream=None, classifier=None):
logging.basicConfig(format='%(message)s', level=logging.INFO)
init_time = timer()
end_time = timer()
self.classifier = classifier
self.stream = stream
self._reset_partials()
self._reset_globals()
prediction = None
logging.info('Generating %s targets.', str(self.stream.get_num_targets()))
Expand All @@ -98,7 +110,8 @@ def train_and_test(self, stream = None, classifier = None):
f.write("\n# " + self.classifier.get_info())
f.write("\n# " + self.get_info())
f.write("\n# SETUP END")
f.write("\nx_count,global_performance,partial_performance,global_kappa,sliding_window_kappa,true_label,prediction")
f.write(
"\nx_count,global_performance,partial_performance,global_kappa,sliding_window_kappa,true_label,prediction")

if (self.pretrain_size > 0):
logging.info('Pretraining on %s samples.', str(self.pretrain_size))
Expand All @@ -116,47 +129,39 @@ def train_and_test(self, stream = None, classifier = None):
if X is not None and y is not None:
prediction = self.classifier.predict(X)
self.global_sample_count += self.batch_size
self.partial_sample_count += self.batch_size
self.kappa_predicts.add_element(np.ravel(prediction))
self.kappa_true_labels.add_element(np.ravel(y))
for i in range(len(prediction)):
self.global_classification_metrics.add_result(y[i], prediction[i])
self.partial_classification_metrics.add_result(y[i], prediction[i])
nul_count = self.global_sample_count - self.batch_size
if ((prediction[i] == y[i]) and not (self.global_sample_count > self.max_instances)):
self.partial_correct_predicts += 1
self.global_correct_predicts += 1
if ((nul_count + i + 1) % (rest/20)) == 0:
logging.info('%s%%', str(((nul_count+i+1) // (rest / 20)) * 5))
#if self.show_scatter_points:
#self.visualizer.on_new_scatter_data(self.global_sample_count - self.batch_size + i, y[i],
#prediction[i])
self.all_labels.extend(y)
self.all_predicts.extend(prediction)
if ((nul_count + i + 1) % (rest / 20)) == 0:
logging.info('%s%%', str(((nul_count + i + 1) // (rest / 20)) * 5))
# if self.show_scatter_points:
# self.visualizer.on_new_scatter_data(self.global_sample_count - self.batch_size + i, y[i],
# prediction[i])
self.classifier.partial_fit(X, y)

if ((self.global_sample_count % self.n_wait) == 0 | (self.global_sample_count >= self.max_instances) |
if ((self.global_sample_count % self.n_wait) == 0 | (
self.global_sample_count >= self.max_instances) |
(self.global_sample_count / self.n_wait > before_count + 1)):
before_count += 1
self.kappa_count += 1
self.update_metrics(y[-1], prediction[-1])
self.update_metrics()
end_time = timer()
except BaseException as exc:
if exc is KeyboardInterrupt:
self.kappa_count += 1
if self.show_scatter_points:
self.update_metrics(y[-1], prediction[-1])
self.update_metrics()
else:
self.update_metrics()
break

if (end_time-init_time > self.max_time):
if (end_time - init_time > self.max_time):
logging.info('\nTime limit reached. Evaluation stopped.')
logging.info('Evaluation time: %s s', str(self.max_time))
else:
logging.info('\nEvaluation time: %s s', str(round(end_time - init_time, 3)))
logging.info('Total instances: %s', str(self.global_sample_count))
logging.info('Global accuracy: %s', str(round(self.global_correct_predicts/self.global_sample_count, 3)))
if self.track_global_kappa:
logging.info('Global kappa: %s', str(round(self.global_kappa, 3)))
logging.info('Global accuracy: %s', str(round(self.global_classification_metrics.get_performance(), 3)))
logging.info('Global kappa: %s', str(round(self.global_classification_metrics.get_kappa(), 3)))

return self.classifier

Expand All @@ -174,78 +179,57 @@ def predict(self, X):
else:
return self

def update_plot(self, partial_accuracy=None, num_instances=None, y=None, prediction=None):
def update_plot(self, current_x, new_points_dict):
if self.output_file is not None:
line = str(num_instances)
line = str(current_x)
i = 0
if self.show_performance:
line += ',' + str(round(self.global_accuracy, 3))
line += ',' + str(round(partial_accuracy[i],3))
if 'classification' in self.plot_options:
line += ',' + str(round(self.global_classification_metrics.get_performance(), 3))
line += ',' + str(round(self.partial_classification_metrics.get_performance(), 3))
i += 1
else:
line += ',' + str(np.nan) + ',' + str(np.nan)
if self.show_kappa:
if self.track_global_kappa:
line += ',' +str(round(self.global_kappa, 3))
else:
line += ',' + str(np.nan)
line += ',' + str(round(partial_accuracy[i], 3))
if 'kappa' in self.plot_options:
line += ',' + str(round(self.global_classification_metrics.get_kappa(), 3))
line += ',' + str(np.nan)
line += ',' + str(round(self.partial_classification_metrics.get_kappa(), 3))
else:
line += ',' + str(np.nan) + ',' + str(np.nan)
if self.show_scatter_points:
line += ',' + str(y) + ',' + str(prediction)
if 'scatter' in self.plot_options:
line += ',' + str(new_points_dict['scatter'][0]) + ',' + str(new_points_dict['scatter'][1])
else:
line += ',' + str(np.nan) + ',' + str(np.nan)
with open(self.output_file, 'a') as f:
f.write('\n'+line)
self.visualizer.on_new_train_step(partial_accuracy, num_instances, y, prediction, self.global_kappa)
f.write('\n' + line)
self.visualizer.on_new_train_step(current_x, new_points_dict)
pass

def update_metrics(self, y=None, prediction=None):
def update_metrics(self):
""" Updates the metrics of interest.
It's possible that cohen_kappa_score will return a NaN value, which happens if the predictions
and the true labels are in perfect accordance, causing pe=1, which results in a division by 0.
If this is detected the plot will assume it to be 1.
:return: No return.
"""
self.global_accuracy = ((self.global_sample_count - self.partial_sample_count) / self.global_sample_count) * \
self.global_accuracy + (self.partial_sample_count / self.global_sample_count) * \
(self.partial_correct_predicts/self.partial_sample_count)
partial_kappa = 0.0
partial_kappa = cohen_kappa_score(self.kappa_predicts.get_queue(), self.kappa_true_labels.get_queue())
self.global_kappa = cohen_kappa_score(self.all_labels, self.all_predicts)
#logging.info('%s', str(round(partial_kappa, 3)))
if math.isnan(partial_kappa):
partial_kappa = 1.0

partials = None
if self.show_kappa or self.show_performance:
partials = []
if self.show_performance:
partials.append(self.partial_correct_predicts/self.partial_sample_count)
if self.show_kappa:
partials.append(partial_kappa)

self.update_plot(partials, self.global_sample_count, y, prediction)

self._reset_partials()
new_points_dict = {}

def _reset_partials(self):
self.partial_sample_count = 0
self.partial_correct_predicts = 0
if 'performance' in self.plot_options:
new_points_dict['performance'] = [self.global_classification_metrics.get_performance(), self.partial_classification_metrics.get_performance()]
if 'kappa' in self.plot_options:
new_points_dict['kappa'] = [self.global_classification_metrics.get_kappa(), self.partial_classification_metrics.get_kappa()]
if 'scatter' in self.plot_options:
true, pred = self.global_classification_metrics.get_last()
new_points_dict['scatter'] = [true, pred]
self.update_plot(self.global_sample_count, new_points_dict)

def _reset_globals(self):
self.global_sample_count = 0
self.global_correct_predicts = 0
self.global_accuracy = 0.0

def start_plot(self, n_wait, dataset_name):
self.visualizer = EvaluationVisualizer(n_wait=n_wait, dataset_name=dataset_name,
show_performance=self.show_performance, show_kappa= self.show_kappa,
track_global_kappa=self.track_global_kappa,
show_scatter_points=self.show_scatter_points)
self.visualizer = EvaluationVisualizer(n_wait=n_wait, dataset_name=dataset_name, plots=self.plot_options)
pass

def set_params(self, dict):
Expand All @@ -271,14 +255,11 @@ def set_params(self, dict):
self.show_scatter_points = value

def get_info(self):
plot = 'True' if self.show_performance else 'False'
kappa = 'True' if self.show_kappa else 'False'
scatter = 'True' if self.show_scatter_points else 'False'
plot = 'True' if self.show_plot else 'False'
return 'Prequential Evaluator: n_wait: ' + str(self.n_wait) + \
' - max_instances: ' + str(self.max_instances) + \
' - max_time: ' + str(self.max_time) + \
' - batch_size: ' + str(self.batch_size) + \
' - pretrain_size: ' + str(self.pretrain_size) + \
' - show_performance: ' + plot + \
' - show_kappa: ' + kappa + \
' - show_scatter_points: ' + scatter
' - show_plot: ' + plot + \
' - plot_options: ' + str(self.plot_options)
20 changes: 18 additions & 2 deletions skmultiflow/evaluation/measure_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def __init__(self, targets=None, dtype=np.int64):
else:
self.n_targets = 0
self.confusion_matrix = ConfusionMatrix(self.n_targets, dtype)
self.last_class = None
self.last_true_label = None
self.last_prediction = None
self.sample_count = 0
self.targets = targets

Expand All @@ -29,12 +30,17 @@ def reset(self, targets=None):
pass

def add_result(self, sample, prediction):
self.last_true_label = sample
self.last_prediction = prediction
true_y = self._get_target_index(sample, True)
pred = self._get_target_index(prediction, True)
self.confusion_matrix.update(true_y, pred)
self.sample_count += 1
pass

def get_last(self):
return self.last_true_label, self.last_prediction

def get_majority_class(self):
""" Get the true majority class
Expand Down Expand Up @@ -94,7 +100,8 @@ def get_kappa(self):
sum_column = np.sum(column) / self.sample_count

pc += sum_row * sum_column

if pc == 1:
return 1
return (p0 - pc) / (1.0 - pc)

@property
Expand Down Expand Up @@ -123,6 +130,8 @@ def __init__(self, targets=None, dtype=np.int64, window_size=200):
self.true_labels = FastBuffer(window_size)
self.predictions = FastBuffer(window_size)
self.temp = 0
self.last_prediction = None
self.last_true_label = None
pass

def reset(self, targets=None):
Expand All @@ -134,6 +143,8 @@ def reset(self, targets=None):
pass

def add_result(self, sample, prediction):
self.last_true_label = sample
self.last_prediction = prediction
true_y = self._get_target_index(sample, True)
pred = self._get_target_index(prediction, True)

Expand All @@ -150,6 +161,9 @@ def add_result(self, sample, prediction):
self.confusion_matrix.update(true_y, pred)
pass

def get_last(self):
return self.last_true_label, self.last_prediction

def get_majority_class(self):
""" Get the true majority class
Expand Down Expand Up @@ -210,6 +224,8 @@ def get_kappa(self):

pc += sum_row * sum_column

if pc == 1:
return 1
return (p0 - pc) / (1.0 - pc)

@property
Expand Down

0 comments on commit 9109c72

Please sign in to comment.