diff --git a/ChangeLog.rst b/ChangeLog.rst index 7a37b5a..5f07b64 100644 --- a/ChangeLog.rst +++ b/ChangeLog.rst @@ -1,6 +1,20 @@ ChangeLog ==================================================== +Release 0.6.2 - 2019/01/28 +-------------------------- + +* New Features + * Add Bandit service (#129, #131) + * Add support for clear_row method of Recommender (#136, #137) + +* Improvements + * Add visualize example using TensorBoard (#131) + +* Bug Fixes + * Fix schema not working with service specialized column definition (#134, #135) + * Fix Twitter Loader not working with latest tweepy (#138) + Release 0.6.1 - 2018/10/29 -------------------------- diff --git a/README.rst b/README.rst index cb54148..0d8ab22 100644 --- a/README.rst +++ b/README.rst @@ -22,7 +22,9 @@ Currently jubakit supports `Anomaly `_, `Recommender `_, `NearestNeighbor `_, -`Clustering `_ and +`Clustering `_, +`Burst `_, +`Bandit `_ and `Weight `_ engines. Install @@ -102,6 +104,8 @@ See the `example `_ dire +-----------------------------------+-----------------------------------------------+-----------------------+ | classifier_sklearn_grid_search.py | Grid Search example using scikit-learn wrapper| ✓ | +-----------------------------------+-----------------------------------------------+-----------------------+ +| classifier_tensorboard.py | Visualize a training process using TensorBoard| ✓ | ++-----------------------------------+-----------------------------------------------+-----------------------+ | regression_boston.py | Regression with toy dataset (boston) | ✓ | +-----------------------------------+-----------------------------------------------+-----------------------+ | regression_csv.py | Regression with CSV file | | @@ -116,6 +120,10 @@ See the `example `_ dire +-----------------------------------+-----------------------------------------------+-----------------------+ | clustering_2d.py | Clustering 2-dimensional dataset | | +-----------------------------------+-----------------------------------------------+-----------------------+ +| burst_dummy_stream.py | Burst detection with stream data | | ++-----------------------------------+-----------------------------------------------+-----------------------+ +| bandit_slot.py | Multi-armed bandit with slot machine example | | ++-----------------------------------+-----------------------------------------------+-----------------------+ | weight_shogun.py | Tracing fv_converter behavior using Weight | | +-----------------------------------+-----------------------------------------------+-----------------------+ | weight_model_extract.py | Extract contents of Weight model file | | diff --git a/example/bandit_slot.py b/example/bandit_slot.py new file mode 100644 index 0000000..e4d6f69 --- /dev/null +++ b/example/bandit_slot.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, \ + print_function, unicode_literals + +""" +Using Classifier and CSV file +======================================== + +This is an simple example of Bandit service. + +The player `Jubatun` tries to maximize the cumulative reward of +a sequence of slot machine plays by multi-armed bandit algorithm. + +You can try various simulation settings by modifying the slot machine setting. +Let's edit lines 67-72 and enjoy! +""" + +import random + +from jubakit.bandit import Bandit, Config + + +class Slot(object): + """Slot machine.""" + + def __init__(self, probability, average, stddev): + """ + Initialize slot machine. + + :param float probability: Hit probability. + :param float average: Average of a gaussian distribution. + :param float stddev: Standard deviation of a gaussian distribution. + :return: self + """ + self.probability = probability + self.average = average + self.stddev = stddev + + def hit(self): + """ + This slot machine hits with the given probability. + + :return bool: Whether this slot machine hits or not. + """ + if random.random() < self.probability: + return True + else: + return False + + def reward(self): + """ + A reward is determined based on + the given average and standard deviation. + + :return float: A reward. + """ + if self.hit(): + return random.gauss(self.average, self.stddev) + else: + return 0.0 + + +# Experimental config. +# Which slot machine should we choose? +iteration = 1000 +slots = { + 'bad': Slot(0.1, 50, 10), # E[R] = 5: bad arm + 'normal': Slot(0.01, 600, 100), # E[R] = 6: normal arm + 'good': Slot(0.001, 8000, 1000) # E[R] = 8: good arm +} + +# Launch bandit service. +player = 'Jubatan' +config = Config(method='epsilon_greedy', parameter={'epsilon': 0.1}) +bandit = Bandit.run(config) + +# Initialize bandit settings. +bandit.reset(player) +for name, slot in slots.items(): + bandit.register_arm(name) + +# Select arms and get rewards. +cumulative_reward = 0 +for i in range(iteration): + arm = bandit.select_arm(player) + reward = float(slots[arm].reward()) + bandit.register_reward(player, arm, reward) + cumulative_reward += reward + +# Show result. +arm_info = bandit.get_arm_info(player) +frequencies = {name: info.trial_count for name, info in arm_info.items()} + +print('cumulative reward: {0:.2f}'.format(cumulative_reward)) +print('slot frequencies: {0}'.format(frequencies)) diff --git a/example/classifier_tensorboard.py b/example/classifier_tensorboard.py new file mode 100644 index 0000000..80bf030 --- /dev/null +++ b/example/classifier_tensorboard.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, print_function, unicode_literals + +""" +Visualize training process with TensorBoard +=========================================== + +In this example, we show the training process of Jubatus with TensorBoard. + +TensorBoard syntax is little complicated and in this example we use tensorboardX library. +tensorboardX is a simple wrapper of TensorBoard that write events with simple function call. + +[How to Use] + +1. Install tensorboard. + + ``` + $ pip install tensorboardX + ``` + +2. Run this example. + +3. Check the training process using tensorboard. + + ``` + $ tensorboard --logdir runs/*** + ``` + +4. Enjoy! + +""" + +from sklearn.datasets import load_digits +from sklearn.metrics import ( + accuracy_score, f1_score, precision_score, recall_score, log_loss) + +from tensorboardX import SummaryWriter + +import jubakit +from jubakit.classifier import Classifier, Dataset, Config +from jubakit.model import JubaDump + +# Load the digits dataset. +digits = load_digits() + +# Create a dataset. +dataset = Dataset.from_array(digits.data, digits.target) +n_samples = len(dataset) +n_train_samples = int(n_samples * 0.7) +train_ds = dataset[:n_train_samples] +test_ds = dataset[n_train_samples:] + +# Create a classifier. +config = Config(method='AROW', + parameter={'regularization_weight': 0.1}) +classifier = Classifier.run(config) + +model_name = 'classifier_digits' +model_path = '/tmp/{}_{}_classifier_{}.jubatus'.format( + classifier._host, classifier._port, model_name) + +# show the feature weights of the target label. +target_label = 4 + +# Initialize summary writer. +writer = SummaryWriter() + +# train and test the classifier. +epochs = 100 +for epoch in range(epochs): + # train + for _ in classifier.train(train_ds): pass + + # test + y_true, y_pred = [], [] + for (_, label, result) in classifier.classify(test_ds): + y_true.append(label) + y_pred.append(result[0][0]) + + # save model to check the feature weights + classifier.save(model_name) + + model = JubaDump.dump_file(model_path) + weights = model['storage']['storage']['weight'] + for feature, label_values in weights.items(): + for label, value in label_values.items(): + if str(label) != str(target_label): + continue + writer.add_scalar('weights/{}'.format(feature), value['v1'], epoch) + + # write scores to tensorboardX summary writer. + acc = accuracy_score(y_true, y_pred) + prec = precision_score(y_true, y_pred, average='macro') + recall = recall_score(y_true, y_pred, average='macro') + f1 = f1_score(y_true, y_pred, average='macro') + writer.add_scalar('metrics/accuracy', acc, epoch) + writer.add_scalar('metrics/precision', prec, epoch) + writer.add_scalar('metrics/recall', recall, epoch) + writer.add_scalar('metrics/f1_score', f1, epoch) + +writer.close() +classifier.stop() diff --git a/jubakit/_version.py b/jubakit/_version.py index b32f773..04139be 100644 --- a/jubakit/_version.py +++ b/jubakit/_version.py @@ -1 +1 @@ -VERSION = (0, 6, 1) +VERSION = (0, 6, 2) diff --git a/jubakit/bandit.py b/jubakit/bandit.py new file mode 100644 index 0000000..e8ee38b --- /dev/null +++ b/jubakit/bandit.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, print_function, unicode_literals + +import jubatus +import jubatus.embedded + +from .base import BaseService, GenericConfig + + +class Bandit(BaseService): + """ + Bandit service. + """ + + @classmethod + def name(cls): + return 'bandit' + + @classmethod + def _client_class(cls): + return jubatus.bandit.client.Bandit + + @classmethod + def _embedded_class(cls): + return jubatus.embedded.Bandit + + def register_arm(self, arm_id): + arm_id = str(arm_id) + return self._client().register_arm(arm_id) + + def delete_arm(self, arm_id): + arm_id = str(arm_id) + return self._client().delete_arm(arm_id) + + def select_arm(self, player_id): + player_id = str(player_id) + return self._client().select_arm(player_id) + + def register_reward(self, player_id, arm_id, reward): + arm_id = str(arm_id) + player_id = str(player_id) + reward = float(reward) + return self._client().register_reward(player_id, arm_id, reward) + + def get_arm_info(self, player_id): + player_id = str(player_id) + arm_info = self._client().get_arm_info(player_id) + # convert key object to string type. + return {str(name): info for name, info in arm_info.items()} + + def reset(self, player_id): + player_id = str(player_id) + return self._client().reset(str(player_id)) + + +class Config(GenericConfig): + """ + Configuration to run Bandit service. + """ + + @classmethod + def methods(cls): + return [ + 'epsilon_greedy', + 'epsilon_decreasing', + 'ucb1', + 'softmax', + 'exp3', + 'ts' + ] + + @classmethod + def _default_method(cls): + return 'epsilon_greedy' + + @classmethod + def _default_parameter(cls, method): + params = { + 'assume_unrewarded': False + } + if method in ('epsilon_greedy',): + params['epsilon'] = 0.1 + elif method in ('softmax',): + params['tau'] = 0.05 + elif method in ('exp3',): + params['gamma'] = 0.1 + elif method not in ('epsilon_decreasing', 'ucb1', 'ts'): + raise RuntimeError('unknown method: {0}'.format(method)) + return params + + @classmethod + def _default(cls, cfg): + cfg.clear() + + method = cls._default_method() + parameter = cls._default_parameter(method) + + if method is not None: + cfg['method'] = method + if parameter is not None: + cfg['parameter'] = parameter diff --git a/jubakit/base.py b/jubakit/base.py index aa733f4..13bd241 100644 --- a/jubakit/base.py +++ b/jubakit/base.py @@ -78,17 +78,7 @@ def __init__(self, mapping, fallback=None): loaders with the same configuration. """ self._fallback = fallback - self._key2type = {} - self._key2name = {} - - for (key, ent) in mapping.items(): - if isinstance(ent, (tuple, list, )): - (key_type, key_name) = ent - else: - (key_type, key_name) = (ent, key) - - self._key2type[key] = key_type - self._key2name[key] = key_name + self._key2type, self._key2name = BaseSchema._normalize_mapping(mapping) def transform(self, row): """ @@ -104,6 +94,25 @@ def predict(cls, row, typed): """ raise NotImplementedError() + @staticmethod + def _normalize_mapping(mapping): + """ + Normalizes the schema mapping. + """ + key2type = {} + key2name = {} + + for (key, ent) in mapping.items(): + if isinstance(ent, (tuple, list, )): + (key_type, key_name) = ent + else: + (key_type, key_name) = (ent, key) + + key2type[key] = key_type + key2name[key] = key_name + + return key2type, key2name + @staticmethod def _get_unique_mapping(mapping, fallback, key_type, name, optional=False): """ @@ -113,7 +122,8 @@ def _get_unique_mapping(mapping, fallback, key_type, name, optional=False): if fallback == key_type: raise RuntimeError('{0} key cannot be specified as fallback in schema'.format(name)) - keys = [k for k in mapping.keys() if mapping[k] == key_type] + key2type, _ = BaseSchema._normalize_mapping(mapping) + keys = [k for k in key2type.keys() if key2type[k] == key_type] if len(keys) == 0: if optional: return None raise RuntimeError('{0} key must be specified in schema'.format(name)) diff --git a/jubakit/loader/twitter.py b/jubakit/loader/twitter.py index b65e0c2..074be76 100644 --- a/jubakit/loader/twitter.py +++ b/jubakit/loader/twitter.py @@ -80,7 +80,7 @@ def __init__(self, auth=None, mode=SAMPLE, keys=STATUS_KEYS, count=None, **kwarg if start_stream is None: raise RuntimeError('unknown stream mode: {0}'.format(mode)) - kwargs['async'] = False + kwargs['is_async'] = False self._thread = threading.Thread(target=start_stream, kwargs=kwargs) self._thread.daemon = True diff --git a/jubakit/recommender.py b/jubakit/recommender.py index 26842a7..86fb4e4 100644 --- a/jubakit/recommender.py +++ b/jubakit/recommender.py @@ -55,6 +55,17 @@ def _client_class(cls): def _embedded_class(cls): return jubatus.embedded.Recommender + def clear_row(self, dataset): + """ + Removes the given rows from the recommendation table. + """ + cli = self._client() + for (idx, (row_id, d)) in dataset: + if row_id is None: + raise RuntimeError('dataset must have `id`.') + result = cli.clear_row(row_id) + yield (idx, row_id, result) + def update_row(self, dataset): """ Update data points to the recommender model using the given dataset. @@ -169,6 +180,7 @@ def decode_row(self, dataset): result = cli.decode_row(row_id) yield (idx, row_id, result) + class Config(GenericConfig): """ Configuration to run Recommender service. diff --git a/jubakit/test/test_bandit.py b/jubakit/test/test_bandit.py new file mode 100644 index 0000000..43cc615 --- /dev/null +++ b/jubakit/test/test_bandit.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, print_function, unicode_literals + +from unittest import TestCase + +from jubakit.bandit import Bandit, Config + +from . import requireEmbedded + + +class BanditTest(TestCase): + + def test_simple(self): + Bandit() + + def test_simple_launch(self): + Bandit.run(Config()) + + @requireEmbedded + def test_embedded(self): + Bandit.run(Config(), embedded=True) + + def test_register_arm(self): + bandit = Bandit.run(Config()) + ret = bandit.register_arm(1) + self.assertIsInstance(ret, bool) + + def test_delete_arm(self): + bandit = Bandit.run(Config()) + bandit.register_arm(1) + ret = bandit.delete_arm(1) + self.assertIsInstance(ret, bool) + + def test_select_arm(self): + bandit = Bandit.run(Config()) + bandit.register_arm(1) + ret = bandit.select_arm('player') + self.assertEqual(ret, str(1)) + + def test_register_reward(self): + bandit = Bandit.run(Config()) + bandit.register_arm(1) + bandit.select_arm('player') + ret = bandit.register_reward('player', 1, 10) + self.assertIsInstance(ret, bool) + + def test_get_arm_info(self): + from jubatus.bandit.types import ArmInfo + bandit = Bandit.run(Config()) + bandit.register_arm(1) + bandit.select_arm('player') + ret = bandit.get_arm_info('player') + self.assertIsInstance(ret, dict) + for name, info in ret.items(): + self.assertIsInstance(name, str) + self.assertIsInstance(info, ArmInfo) + + def test_reset(self): + bandit = Bandit.run(Config()) + bandit.register_arm(1) + bandit.select_arm('player') + bandit.register_reward('player', 1, 10) + ret = bandit.reset('player') + self.assertIsInstance(ret, bool) + + +class ConfigTest(TestCase): + + def test_simple(self): + config = Config() + self.assertEqual('epsilon_greedy', config['method']) + + def test_methods(self): + config = Config() + self.assertIsInstance(config.methods(), list) + + def test_default(self): + config = Config.default() + self.assertEqual('epsilon_greedy', config['method']) + + def test_method_params(self): + for method in Config.methods(): + self.assertTrue( + 'assume_unrewarded' in Config(method=method)['parameter']) + self.assertTrue('epsilon' in Config(method='epsilon_greedy')['parameter']) + self.assertTrue('tau' in Config(method='softmax')['parameter']) + self.assertTrue('gamma' in Config(method='exp3')['parameter']) + + def test_invalid_method(self): + self.assertRaises( + RuntimeError, Config._default_parameter, 'invalid_method') diff --git a/jubakit/test/test_recommender.py b/jubakit/test/test_recommender.py index 342a960..dca0240 100644 --- a/jubakit/test/test_recommender.py +++ b/jubakit/test/test_recommender.py @@ -51,6 +51,26 @@ def test_simple(self): def test_embedded(self): recommender = Recommender.run(Config(), embedded=True) + def test_clear_row(self): + recommender = Recommender.run(Config()) + loader = StubLoader() + + # dataset must have id when execute `clear_row`. + schema = Schema({'v': Schema.NUMBER}) + dataset = Dataset(loader, schema) + def func(): + for _ in recommender.clear_row(dataset): pass + self.assertRaises(RuntimeError, lambda: func()) + + schema = Schema({'v': Schema.ID}) + dataset = Dataset(loader, schema) + + # expect to get False when table is empty. + for (idx, row_id, result) in recommender.clear_row(dataset): + self.assertEqual(result, True) + + recommender.stop() + def test_update_row(self): recommender = Recommender.run(Config()) loader = StubLoader() diff --git a/tools/run_examples.py b/tools/run_examples.py index 66606ae..91e5a84 100755 --- a/tools/run_examples.py +++ b/tools/run_examples.py @@ -21,7 +21,8 @@ from urllib.request import urlopen # Accept exceptions in following examples. -BLACK_LIST = ['classifier_twitter.py', 'classifier_hyperopt_tuning.py'] +BLACK_LIST = ['classifier_twitter.py', 'classifier_hyperopt_tuning.py', + 'classifier_tensorboard.py'] def download_bzip2(path, url): if os.path.exists(path): return