diff --git a/.gitignore b/.gitignore index 0ef476e5..803c29b2 100644 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,4 @@ docs/_build/ # Keras model files history.p -keras_model.* \ No newline at end of file +keras_model* \ No newline at end of file diff --git a/README.md b/README.md index f8eb5090..dfdd432c 100644 --- a/README.md +++ b/README.md @@ -61,19 +61,6 @@ knowledgehub -c /etc/ckan/default/production.ini db init sudo service apache2 reload ``` -6. Run the command for predictive search periodically( daily or weekly is recommended ). -This command will start training the model: - -``` -knowledgehub -c /etc/ckan/default/production.ini predictive_search train -``` - -There is a action that can run CLI commands for Knowledge Hub. -This example shows how to run the above command through the API action: -``` -curl -v 'http://hostname/api/3/action/run_command' -H'Authorization: API-KEY' -d '{"command": "predictive_search train"}' -``` - ### Config Settings These are the required configuration options used by the extension: @@ -94,24 +81,24 @@ ckanext.knowledgehub.sub_themes_per_page = 20 ckanext.knowledgehub.dashboards_per_page = 20 ``` 4. Predictive Search - - Length of the seuqunce after which the model can start predict, recommended at least 15 chars long + - Length of the sequence after which the model can start predict, recommended at least 10 chars long ``` # (optional, default: 10) ckanext.knowledgehub.rnn.sequence_length = 12 ``` - Number of chars to be skipped in generation of next sentence ``` - # (optional, default: 3) + # (optional, default: 1) ckanext.knowledgehub.rnn.sentence_step = 2 ``` - Number of predictions to return ``` # (optional, default: 3) - ckanext.knowledgehub.rnn.number_prediction = 2 + ckanext.knowledgehub.rnn.number_predictions = 2 ``` - Minimum length of the corpus after it should start to predict ``` - # (optional, default: 3) + # (optional, default: 10000) ckanext.knowledgehub.rnn.min_length_corpus = 300 ``` - Maximum epochs to learn @@ -119,10 +106,15 @@ ckanext.knowledgehub.dashboards_per_page = 20 # (optional, default: 50) ckanext.knowledgehub.rnn.max_epochs = 30 ``` - - Full path to the RNN model + - Full path to the RNN weights model + ``` + # (optional, default: ./keras_model_weights.h5) + ckanext.knowledgehub.rnn.model_weights = /home/user/model_weights.h5 ``` - # (optional, default: ./keras_model.h5) - ckanext.knowledgehub.rnn.model = /home/user/model.h5 + - Full path to the RNN network model + ``` + # (optional, default: ./keras_model_network.h5) + ckanext.knowledgehub.rnn.model_network = /home/user/model_network.h5 ``` - Full path to the model history ``` @@ -226,7 +218,7 @@ knowledgehub -c /etc/ckan/default/production.ini search-index rebuild --model da This would rebuild the index for dashboards. -Avalilable model types are: +Avalilable model types are: * `ckan` - rebuilds the CKAN core (package) index, * `dashboard` - rebuilds the dasboards index, * `research-question` - rebuilds the research questions index and @@ -264,4 +256,24 @@ The crontab should look something like this: Data Quality is measured across the six primary dimensions for data quality assessment. -A lot more details are available in the dedicated [documentation section](docs/data-qualtiy-metrics.md). \ No newline at end of file +A lot more details are available in the dedicated [documentation section](docs/data-qualtiy-metrics.md). + +# Predictive search + +The preditive search functinality predict the next n characters in the word or the next most possible word. +The training data is consist of title and description of all entities on Knowledge hub including themes, sub-themes, +research questions, datasets, visualizations and dashboards. Before it starts predict the machine learning model has to be trained. +By default user should write 10 characters in the search box on home page before it starts to predict. + +Run the command for predictive search periodically( daily or weekly is recommended ). +This command will start training the model: + +``` +knowledgehub -c /etc/ckan/default/production.ini predictive_search train +``` + +There is a action that can run CLI commands for Knowledge Hub. +This example shows how to run the above command through the API action: +``` +curl -v 'http://hostname/api/3/action/run_command' -H'Authorization: API-KEY' -d '{"command": "predictive_search train"}' +``` \ No newline at end of file diff --git a/ckanext/knowledgehub/cli/predictive_search.py b/ckanext/knowledgehub/cli/predictive_search.py index abe7d976..1c66f965 100644 --- a/ckanext/knowledgehub/cli/predictive_search.py +++ b/ckanext/knowledgehub/cli/predictive_search.py @@ -4,7 +4,7 @@ import logging from ckanext.knowledgehub.cli import error_shout -from ckanext.knowledgehub.rnn.worker import learn +from ckanext.knowledgehub.lib.rnn import PredictiveSearchWorker log = logging.getLogger(__name__) @@ -19,7 +19,8 @@ def train(): u'''Initialising the Knowledgehub tables''' log.info(u"Initialize Knowledgehub tables") try: - learn() + worker = PredictiveSearchWorker() + worker.run() except Exception as e: error_shout(e) else: diff --git a/ckanext/knowledgehub/fanstatic/javascript/search_prediction.js b/ckanext/knowledgehub/fanstatic/javascript/search_prediction.js index 5ac7033c..40fea859 100644 --- a/ckanext/knowledgehub/fanstatic/javascript/search_prediction.js +++ b/ckanext/knowledgehub/fanstatic/javascript/search_prediction.js @@ -50,6 +50,7 @@ x[i].parentNode.removeChild(x[i]); } } + currentFocus = -1; } $(document).ready(function () { @@ -58,21 +59,21 @@ searchInput .bind("change keyup", function (event) { clearTimeout(timer) - if (!(event.keyCode >= 13 && event.keyCode <= 20) && !(event.keyCode >= 37 && event.keyCode <= 40)) { + if (!(event.keyCode >= 13 && event.keyCode <= 20) && !(event.keyCode >= 37 && event.keyCode <= 40) && event.keyCode != 27) { // detect that user has stopped typing for a while timer = setTimeout(function() { var text = searchInput.val(); if (text !== '') { api.get('get_predictions', { - text: text + query: text }, true) .done(function (data) { if (data.success) { var a, b; var results = data.result; - closeAllLists() + closeAllLists(); a = document.createElement("DIV"); a.setAttribute("id", "autocomplete-list"); @@ -84,8 +85,9 @@ b.innerHTML = text; b.innerHTML += "" + r + ""; b.addEventListener("click", function (e) { - searchInput.val(text + r); closeAllLists(); + searchInput.val(text + r); + searchInput.trigger("change"); }); a.append(b) }); @@ -95,25 +97,24 @@ console.log("Get predictions: " + error.statusText); }); } - }, 500); + }, 300); } }) }); $('.search-input-group').on("mouseover", autocompleteItems, function(e){ - var activeItem = document.getElementsByClassName('autocomplete-active')[0]; activeItem ? activeItem.classList.remove('autocomplete-active') : null; event.target !== input ? event.target.classList.add('autocomplete-active') : null; var p = e.target.parentElement; var index = Array.prototype.indexOf.call(p.children, e.target); activeItem ? currentFocus = index : currentFocus = -1 - }); searchInput.on('keydown', function (e) { var x = document.getElementById("autocomplete-list"); if (x) x = x.getElementsByTagName("div"); + if (e.keyCode == 40) { // The arrow DOWN key is pressed currentFocus++; @@ -125,9 +126,12 @@ } else if (e.keyCode == 13) { // ENTER key is pressed if (currentFocus > -1) { + e.preventDefault(); // simulate a click on the "active" item* if (x) x[currentFocus].click(); } + } else if (e.keyCode == 27) { + closeAllLists(); } }); })(ckan.i18n.ngettext, $); \ No newline at end of file diff --git a/ckanext/knowledgehub/fanstatic/javascript/user_query_result.js b/ckanext/knowledgehub/fanstatic/javascript/user_query_result.js index 68693a17..9c76a4aa 100644 --- a/ckanext/knowledgehub/fanstatic/javascript/user_query_result.js +++ b/ckanext/knowledgehub/fanstatic/javascript/user_query_result.js @@ -37,10 +37,10 @@ result_id: result_id }) .done(function (data) { - console.log("User Quere Result: SAVED!"); + console.log("User Query Result: SAVED!"); }) .fail(function (error) { - console.log("User Quere Result failed: " + error.statusText); + console.log("User Query Result failed: " + error.statusText); }); } }) @@ -49,6 +49,19 @@ }); } + function saveKnowledgeHubData(query_text) { + api.post('kwh_data_create', { + type: 'search_query', + title: query_text + }) + .done(function (data) { + console.log("User query added to kwh data"); + }) + .fail(function (error) { + console.log("Failed to add user query to kwh data: " + error.statusText); + }); + } + $(document).ready(function () { var save_user_query = function(callback) { var tab_content = $('.tab_content'); @@ -81,6 +94,7 @@ if (query_text) { var user_id = $('#user-id').val(); saveUserQueryResult(query_text, result_type, result_id, user_id) + saveKnowledgeHubData(query_text, user_id) } } }); diff --git a/ckanext/knowledgehub/helpers.py b/ckanext/knowledgehub/helpers.py index c38b596e..2d164c64 100644 --- a/ckanext/knowledgehub/helpers.py +++ b/ckanext/knowledgehub/helpers.py @@ -31,7 +31,6 @@ from ckanext.knowledgehub.model import Dashboard from ckanext.knowledgehub.model import ResourceValidation -from ckanext.knowledgehub.rnn import helpers as rnn_helpers log = logging.getLogger(__name__) diff --git a/ckanext/knowledgehub/lib/rnn/__init__.py b/ckanext/knowledgehub/lib/rnn/__init__.py new file mode 100644 index 00000000..f6cc90e2 --- /dev/null +++ b/ckanext/knowledgehub/lib/rnn/__init__.py @@ -0,0 +1,8 @@ +from ckanext.knowledgehub.lib.rnn.worker import PredictiveSearchWorker +from ckanext.knowledgehub.lib.rnn.model import PredictiveSearchModel + + +__all__ = [ + 'PredictiveSearchWorker', + 'PredictiveSearchModel' +] diff --git a/ckanext/knowledgehub/lib/rnn/config.py b/ckanext/knowledgehub/lib/rnn/config.py new file mode 100644 index 00000000..87b7d96d --- /dev/null +++ b/ckanext/knowledgehub/lib/rnn/config.py @@ -0,0 +1,40 @@ +import os +import time + +from ckan.common import config + + +class PredictiveSearchConfig(object): + ''' Hold the configuration for the machine learning model and worker ''' + + def __init__(self): + self.corpus_length = int(config.get( + u'ckanext.knowledgehub.rnn.min_length_corpus', 10000)) + self.sequence_length = config.get( + u'ckanext.knowledgehub.rnn.sequence_length', 10) + self.step = int( + config.get(u'ckanext.knowledgehub.rnn.sentence_step', 1)) + self.epochs = int( + config.get(u'ckanext.knowledgehub.rnn.max_epochs', 50)) + self.weights_path = config.get( + u'ckanext.knowledgehub.rnn.model_weights', + './keras_model_weights.h5' + ) + self.network_path = config.get( + u'ckanext.knowledgehub.rnn.model_network', + './keras_model_network.h5' + ) + self.history_path = config.get( + u'ckanext.knowledgehub.rnn.history', + './history.p' + ) + self.temp_weigths_path = os.path.join( + os.path.dirname(self.weights_path), + 'keras_model_%s.h5' % time.time() + ) + self.number_predictions = int( + config.get( + u'ckanext.knowledgehub.rnn.number_predictions', + 3 + ) + ) diff --git a/ckanext/knowledgehub/lib/rnn/data_manager.py b/ckanext/knowledgehub/lib/rnn/data_manager.py new file mode 100644 index 00000000..726d4396 --- /dev/null +++ b/ckanext/knowledgehub/lib/rnn/data_manager.py @@ -0,0 +1,57 @@ +import ckan.plugins.toolkit as toolkit + + +class DataManager: + ''' Manage the training data''' + + @staticmethod + def create_corpus(corpus): + ''' Store the machine learning corpus + + :param corpus: the machine learning corpus + :type corpus: string + + :returns: the stored corpus + :rtype: dict + ''' + return toolkit.get_action('corpus_create')( + {'ignore_auth': True}, + {'corpus': corpus} + ) + + @staticmethod + def get_corpus(): + ''' Get the data in knowledgehub and create corpus + + :returns: the machine learning corpus + :rtype: string + ''' + kwh_data = toolkit.get_action( + 'kwh_data_list')({'ignore_auth': True}, {}) + + corpus = '' + if kwh_data.get('total'): + data = kwh_data.get('data', []) + for entry in data: + corpus += ' %s' % entry.get('title') + if entry.get('description'): + corpus += ' %s' % entry.get('description') + + return corpus + + @staticmethod + def get_last_corpus(): + ''' Return the corpus usd in the last training of the model ''' + + return toolkit.get_action('get_last_rnn_corpus')( + {'ignore_auth': True}, {}) + + @staticmethod + def prepare_corpus(corpus): + ''' Find the unique chars in the corpus and index the characters ''' + + unique_chars = sorted(list(set(corpus))) + char_indices = dict((c, i) for i, c in enumerate(unique_chars)) + indices_char = dict((i, c) for i, c in enumerate(unique_chars)) + + return (unique_chars, char_indices, indices_char) diff --git a/ckanext/knowledgehub/lib/rnn/model.py b/ckanext/knowledgehub/lib/rnn/model.py new file mode 100644 index 00000000..20ee913a --- /dev/null +++ b/ckanext/knowledgehub/lib/rnn/model.py @@ -0,0 +1,109 @@ +import logging +import os + +import heapq +import numpy as np +import tensorflow as tf +from keras.models import load_model, model_from_json +from filelock import FileLock + +from ckanext.knowledgehub.lib.rnn.data_manager import DataManager +from ckanext.knowledgehub.lib.rnn.config import PredictiveSearchConfig + + +class PredictiveSearchModel(PredictiveSearchConfig): + ''' Use the machine learning model trained by the worker. + + Has ability to predict the next characters or word for given text. + ''' + + def __init__(self): + super(PredictiveSearchModel, self).__init__() + self.model = None + self.unique_chars = None + self.char_indices = None + self.indices_char = None + self.logger = logging.getLogger('ckanext.PredictiveSearchModel') + + def prepare_input(self, text): + x = np.zeros((1, self.sequence_length, len(self.unique_chars))) + for t, char in enumerate(text): + x[0, t, self.char_indices[char]] = 1. + + return x + + def sample(self, preds, top_n=3): + preds = np.asarray(preds).astype('float64') + preds = np.log(preds) + exp_preds = np.exp(preds) + preds = exp_preds / np.sum(exp_preds) + + return heapq.nlargest(top_n, range(len(preds)), preds.take) + + def predict_completion(self, text): + completion = '' + while True: + x = self.prepare_input(text) + preds = self.model.predict(x, verbose=0)[0] + next_index = self.sample(preds, top_n=1)[0] + next_char = self.indices_char[next_index] + if not next_char.isalnum(): + return completion + + text = text[1:] + next_char + + completion += next_char + if (len(text + completion) + 2 > len(text) and next_char == ' '): + return completion + + def predict(self, search_text): + self.unique_chars, self.char_indices, self.indices_char = \ + DataManager.prepare_corpus(DataManager.get_last_corpus()) + + text = search_text[-self.sequence_length:].lower() + unique_chars_text = sorted(list(set(text))) + for char in unique_chars_text: + if char not in self.unique_chars: + return [] + + if self.sequence_length > len(text): + return [] + + if not os.path.isfile(self.weights_path): + self.logger.debug( + 'Model weights %s does not exist!' % self.weights_path) + return [] + if not os.path.isfile(self.network_path): + self.logger.debug( + 'Model network does not exist!' % self.network_path) + return [] + + try: + lock_model_network = FileLock('%s.lock' % self.weights_path) + lock_model_weigths = FileLock('%s.lock' % self.network_path) + + with lock_model_network.acquire(timeout=1000): + with lock_model_weigths.acquire(timeout=1000): + graph = tf.Graph() + with graph.as_default(): + session = tf.Session() + with session.as_default(): + with open(self.network_path, 'r') as json_file: + loaded_model_json = json_file.read() + + self.model = model_from_json(loaded_model_json) + self.model.load_weights(self.weights_path) + + x = self.prepare_input(text) + preds = self.model.predict(x, verbose=0)[0] + next_indices = self.sample( + preds, self.number_predictions) + + return [ + self.indices_char[idx] + + self.predict_completion( + text[1:] + self.indices_char[idx] + ) for idx in next_indices] + except Exception as e: + self.logger.debug('Error while prediction: %s' % str(e)) + return [] diff --git a/ckanext/knowledgehub/lib/rnn/worker.py b/ckanext/knowledgehub/lib/rnn/worker.py new file mode 100644 index 00000000..0bb548a7 --- /dev/null +++ b/ckanext/knowledgehub/lib/rnn/worker.py @@ -0,0 +1,208 @@ +import logging +import os +import pickle +import time + +import numpy as np +import tensorflow as tf +from filelock import FileLock + +from keras.models import Sequential +from keras.layers import LSTM, Activation +from keras.layers.core import Dense, Activation +from keras.optimizers import RMSprop +from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau + +from ckanext.knowledgehub.lib.rnn.data_manager import DataManager +from ckanext.knowledgehub.lib.rnn.config import PredictiveSearchConfig + +from ckan.lib.navl import dictization_functions as df + + +np.random.seed(42) +tf.set_random_seed(42) + + +class PredictiveSearchWorker(PredictiveSearchConfig): + ''' A worker that gets the kwh data, create corpus(training data), + prepare the data for training, traing the model and save it. + ''' + + def __init__(self): + super(PredictiveSearchWorker, self).__init__() + self.model = Sequential() + self.history = None + self.corpus = None + self.training_data = None + self.unique_chars = None + self.char_indices = None + self.x = None + self.y = None + self.logger = logging.getLogger('ckanext.PredictiveSearchWorker') + + def __check_if_paths_exist(self): + weights_dir = os.path.dirname(self.weights_path) + if not os.path.exists(weights_dir): + os.makedirs(weights_dir) + netwokr_dir = os.path.dirname(self.network_path) + if not os.path.exists(netwokr_dir): + os.makedirs(netwokr_dir) + history_dir = os.path.dirname(self.history_path) + if not os.path.exists(history_dir): + os.makedirs(history_dir) + + return self + + def __set_corpus(self): + self.corpus = DataManager.get_corpus() + self.training_data = self.corpus.lower() + self.training_data = ''.join( + ch for ch in self.training_data if ch.isalnum() or ch == ' ') + return self + + def __validate_corpus(self): + if self.corpus_length > len(self.training_data): + msg = ('The minimum length of the corpus is {}, ' + 'current corpus has {}').format( + int(self.corpus_length), len(self.training_data)) + self.logger.debug(msg) + raise df.Invalid(msg) + + if len(self.training_data) <= self.sequence_length: + msg = ( + 'The length of the RNN sequence %d, given %d' + % (int(sequence_length), len(self.training_data)) + ) + self.logger.debug(msg) + raise df.Invalid(msg) + + return self + + def __process_corpus(self): + self.unique_chars, self.char_indices, _ = \ + DataManager.prepare_corpus(self.training_data) + return self + + def __set_x_y(self): + sentences = [] + next_chars = [] + for i in range( + 0, len(self.training_data) - self.sequence_length, self.step): + sentences.append(self.training_data[i: i + self.sequence_length]) + next_chars.append(self.training_data[i + self.sequence_length]) + + self.logger.info('Number of training examples: %d ' % len(sentences)) + + self.x = np.zeros(( + len(sentences), + self.sequence_length, + len(self.unique_chars)), + dtype=np.bool + ) + self.y = np.zeros( + (len(sentences), len(self.unique_chars)), dtype=np.bool) + for i, sentence in enumerate(sentences): + for t, char in enumerate(sentence): + self.x[i, t, self.char_indices[char]] = 1 + self.y[i, self.char_indices[next_chars[i]]] = 1 + return self + + def __prepare_model(self): + self.model.add(LSTM(128, input_shape=( + self.sequence_length, len(self.unique_chars)))) + self.model.add(Dense(len(self.unique_chars))) + self.model.add(Activation('softmax')) + + optimizer = RMSprop(lr=0.01) + self.model.compile( + loss='categorical_crossentropy', + optimizer=optimizer, + metrics=['accuracy']) + + return self + + def __train_model(self): + earlyStopping = EarlyStopping( + monitor='val_loss', + patience=10, + verbose=0, + mode='min' + ) + mcp_save = ModelCheckpoint( + self.temp_weigths_path, + save_best_only=True, + save_weights_only=True, + monitor='val_loss', + mode='min' + ) + reduce_lr_loss = ReduceLROnPlateau( + monitor='val_loss', + factor=0.1, + patience=7, + verbose=1, + epsilon=1e-4, + mode='min' + ) + + try: + self.history = self.model.fit( + self.x, + self.y, + validation_split=0.05, + batch_size=128, + epochs=self.epochs, + shuffle=True, + callbacks=[ + earlyStopping, + mcp_save, + reduce_lr_loss + ] + ).history + except Exception as e: + self.logger.debug('Error while training the model: %s' % str(e)) + raise e + + return self + + def __save_model(self): + try: + lock_model_network = FileLock('%s.lock' % self.network_path) + lock_model_weigths = FileLock('%s.lock' % self.weights_path) + + with lock_model_network.acquire(timeout=1000): + with lock_model_weigths.acquire(timeout=1000): + if os.path.exists(self.weights_path): + os.remove(self.weights_path) + os.rename(self.temp_weigths_path, self.weights_path) + + with open(self.network_path, "w") as json_file: + model_json = self.model.to_json() + json_file.write(model_json) + + DataManager.create_corpus(self.training_data) + except Exception as e: + self.logger.debug('Error while saving RNN model: %s' % str(e)) + raise e + finally: + os.remove('%s.lock' % self.network_path) + os.remove('%s.lock' % self.weights_path) + + return self + + def __save_history(self): + try: + pickle.dump(self.history, open(self.history_path, "wb")) + except Exception as e: + self.logger.debug('Error while saving RNN history: %s' % str(e)) + raise e + + def run(self): + self.__check_if_paths_exist().\ + __set_corpus().\ + __validate_corpus().\ + __process_corpus().\ + __set_x_y().\ + __prepare_model().\ + __train_model().\ + __save_model().\ + __save_history() diff --git a/ckanext/knowledgehub/logic/action/create.py b/ckanext/knowledgehub/logic/action/create.py index ea63827d..7e8ef875 100644 --- a/ckanext/knowledgehub/logic/action/create.py +++ b/ckanext/knowledgehub/logic/action/create.py @@ -101,7 +101,23 @@ def theme_create(context, data_dict): session.add(theme) session.commit() - return _table_dictize(theme, context) + theme_data = _table_dictize(theme, context) + + # Add to kwh data + try: + data_dict = { + 'type': 'theme', + 'title': theme_data.get('title'), + 'description': theme_data.get('description'), + 'theme': theme_data.get('id') + } + logic.get_action('kwh_data_create')(context, data_dict) + except Exception as e: + log.debug('Unable to store theme {} to knowledgehub data: {}'.format( + theme_data.get('id'), str(e) + )) + + return theme_data @toolkit.side_effect_free @@ -139,7 +155,24 @@ def sub_theme_create(context, data_dict): st = SubThemes(**data) st.save() - return st.as_dict() + sub_theme_data = st.as_dict() + + # Add to kwh data + try: + data_dict = { + 'type': 'sub_theme', + 'title': sub_theme_data.get('title'), + 'description': sub_theme_data.get('description'), + 'sub_theme': sub_theme_data.get('id') + } + logic.get_action('kwh_data_create')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to store sub-theme {} to knowledgehub data: {}'.format( + sub_theme_data.get('id'), str(e) + )) + + return sub_theme_data def research_question_create(context, data_dict): @@ -210,6 +243,20 @@ def research_question_create(context, data_dict): ResearchQuestion.delete(research_question.id) raise e + # Add to kwh data + try: + data_dict = { + 'type': 'research_question', + 'title': research_question_data.get('title'), + 'research_question': research_question_data.get('id') + } + logic.get_action('kwh_data_create')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to store research question %s to knowledgehub data: %s' + % (research_question_data.get('id'), str(e)) + ) + return research_question_data @@ -326,6 +373,21 @@ def resource_view_create(context, data_dict): # Add to index Visualization.add_to_index(rv_data) + # Add to kwh data + try: + data_dict = { + 'type': 'visualization', + 'title': rv_data.get('title'), + 'description': rv_data.get('description'), + 'visualization': rv_data.get('id') + } + logic.get_action('kwh_data_create')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to store visualization %s to knowledgehub data: %s' + % (rv_data.get('id'), str(e)) + ) + # this check is because of the unit tests if rv_data.get('__extras'): ext = rv_data['__extras'] @@ -402,11 +464,42 @@ def dashboard_create(context, data_dict): # Add to index Dashboard.add_to_index(dashboard_data) + # Add to kwh data + try: + data_dict = { + 'type': 'dashboard', + 'title': dashboard_data.get('title'), + 'description': dashboard_data.get('description'), + 'dashboard': dashboard_data.get('id') + } + logic.get_action('kwh_data_create')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to store dashboard %s to knowledgehub data: %s' + % (dashboard_data.get('id'), str(e)) + ) + return dashboard_data def package_create(context, data_dict): - return ckan_package_create(context, data_dict) + dataset = ckan_package_create(context, data_dict) + + try: + data_dict = { + 'type': 'dataset', + 'title': dataset.get('title'), + 'description': dataset.get('notes'), + 'dataset': dataset.get('id') + } + logic.get_action('kwh_data_create')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to store dataset %s to knowledgehub data: %s' + % (dataset.get('id'), str(e)) + ) + + return dataset def resource_feedback(context, data_dict): @@ -519,13 +612,43 @@ def resource_validation_create(context, data_dict): return _table_dictize(rv, context) +@toolkit.side_effect_free def kwh_data_create(context, data_dict): + ''' Store Knowledge Hub data needed for predictove search. + It keeps the title and description of KWH entities: themes, sub-themes, research + questions, datasets, visualizations and dashboards. + + :param type: the type of the entity, can be: + [ + 'search_query', + 'theme', + 'sub_theme', + 'research_question', + 'dataset', + 'visualization', + 'dashboard' + ] + :type type: string + :param title: the title of the entity + :type title: string + :param description: the description of the entity (optional) + :type description: string + :param theme: the ID of th theme (optional) + :type theme: string + :param sub_theme: the ID of the sub theme (optional) + :type sub_theme: string + :param research_question: the ID of the research question (optional) + :type research_question: string + :param dataset: the ID of the dataset (optional) + :type dataset: string + :param visualization: the ID of the visualization (optional) + :type visualization: string + :param dashboard: the ID of the dashboard (optional) + :type dashboard: string + + returns: the newly created data + :rtype: dict ''' - Store Knowledge Hub data - :param type - :param content - ''' - check_access('kwh_data', context, data_dict) session = context['session'] @@ -537,20 +660,21 @@ def kwh_data_create(context, data_dict): raise ValidationError(errors) user = context.get('user') - user_data = model.User.by_name(user.decode('utf8')) - if user_data: - data['user'] = user_data.id + if user: + data['user'] = model.User.by_name(user.decode('utf8')).id kwh_data = KWHData.get( user=data.get('user'), - content=data.get('content'), - type=data.get('type') + type=data.get('type'), + title=data.get('title'), + description=data.get('description'), ).first() if not kwh_data: kwh_data = KWHData(**data) kwh_data.save() - return kwh_data.as_dict() + + return kwh_data.as_dict() def corpus_create(context, data_dict): diff --git a/ckanext/knowledgehub/logic/action/delete.py b/ckanext/knowledgehub/logic/action/delete.py index bc37b330..734c8cd2 100644 --- a/ckanext/knowledgehub/logic/action/delete.py +++ b/ckanext/knowledgehub/logic/action/delete.py @@ -9,18 +9,21 @@ from ckan.plugins import toolkit from ckan.common import _ from ckan.model import Session -from ckanext.knowledgehub import helpers as plugin_helpers +from ckan.logic.action.delete import package_delete as ckan_package_delete -from ckanext.knowledgehub.model import Dashboard -from ckanext.knowledgehub.model import Theme -from ckanext.knowledgehub.model import SubThemes -from ckanext.knowledgehub.model import ResearchQuestion -from ckanext.knowledgehub.model import Visualization -from ckanext.knowledgehub.model import UserIntents -from ckanext.knowledgehub.model import ResourceValidate -from ckanext.knowledgehub.model import Keyword -from ckanext.knowledgehub import helpers as kwh_helpers +from ckanext.knowledgehub import helpers as plugin_helpers +from ckanext.knowledgehub.model import ( + Dashboard, + Theme, + SubThemes, + ResearchQuestion, + Visualization, + UserIntents, + ResourceValidate, + Keyword, + KWHData +) log = logging.getLogger(__name__) @@ -217,7 +220,7 @@ def member_delete(context, data_dict=None): model.repo.commit() if obj_type == 'package': - kwh_helpers.views_dashboards_groups_update(data_dict.get('object')) + plugin_helpers.views_dashboards_groups_update(data_dict.get('object')) def keyword_delete(context, data_dict): @@ -240,3 +243,23 @@ def keyword_delete(context, data_dict): Session.delete(Keyword.get(keyword['id'])) Session.commit() + + +def package_delete(context, data_dict): + '''Delete a dataset (package). + + This makes the dataset disappear from all web & API views, apart from the + trash. + + You must be authorized to delete the dataset. + + :param id: the id or name of the dataset to delete + :type id: string + + ''' + + ckan_package_delete(context, data_dict) + try: + KWHData.delete({'dataset': data_dict['id']}) + except Exception as e: + log.debug('Cannot remove dataset from kwh data %s' % str(e)) diff --git a/ckanext/knowledgehub/logic/action/get.py b/ckanext/knowledgehub/logic/action/get.py index e7ab8ec6..801570d6 100644 --- a/ckanext/knowledgehub/logic/action/get.py +++ b/ckanext/knowledgehub/logic/action/get.py @@ -1,4 +1,5 @@ import logging +import re import os import json from six import string_types @@ -29,7 +30,7 @@ UserProfile, ) from ckanext.knowledgehub import helpers as kh_helpers -from ckanext.knowledgehub.rnn import helpers as rnn_helpers +from ckanext.knowledgehub.lib.rnn import PredictiveSearchModel from ckanext.knowledgehub.lib.solr import ckan_params_to_solr_args from ckan.lib import helpers as h from ckan.controllers.admin import get_sysadmins @@ -714,10 +715,10 @@ def kwh_data_list(context, data_dict): rq = data_dict.get('rq', '') q = data_dict.get('q', '') - page_size = int(data_dict.get('pageSize', 1000000)) + limit = int(data_dict.get('limit', 1000000)) page = int(data_dict.get('page', 1)) order_by = data_dict.get('order_by', None) - offset = (page - 1) * page_size + offset = (page - 1) * limit kwargs = {} @@ -735,7 +736,7 @@ def kwh_data_list(context, data_dict): kwargs['rq'] = rq kwargs['q'] = q - kwargs['limit'] = page_size + kwargs['limit'] = limit kwargs['offset'] = offset kwargs['order_by'] = order_by @@ -743,9 +744,10 @@ def kwh_data_list(context, data_dict): try: db_data = KWHData.get(**kwargs).all() - except Exception: + except Exception as e: + log.debug('Knowledge hub data list: %s' % str(e)) return {'total': 0, 'page': page, - 'pageSize': page_size, 'data': []} + 'limit': limit, 'data': []} for entry in db_data: kwh_data.append(_table_dictize(entry, context)) @@ -753,7 +755,7 @@ def kwh_data_list(context, data_dict): total = len(kwh_data) return {'total': total, 'page': page, - 'pageSize': page_size, 'data': kwh_data} + 'limit': limit, 'data': kwh_data} @toolkit.side_effect_free @@ -770,20 +772,89 @@ def get_last_rnn_corpus(context, data_dict): return c.corpus +def _get_predictions_from_db(query): + number_predictions = int( + config.get( + u'ckanext.knowledgehub.rnn.number_predictions', + 3 + ) + ) + + text = ' '.join(query.split()[-3:]) + data_dict = { + 'q': text, + 'limit': 10, + 'order_by': 'created_at desc' + } + kwh_data = toolkit.get_action('kwh_data_list')({}, data_dict) + + def _get_predict(text, query, index): + predict = '' + if index != -1: + index = index + len(query) + for i, ch in enumerate(text[index:]): + if ch.isalnum() or (i == 0 and ch == ' '): + predict += ch + else: + break + + return predict + + def _findall(pattern, text): + return [ + match.start(0) for match in re.finditer(pattern, text) + ] + + predictions = [] + for data in kwh_data.get('data', []): + if len(predictions) >= number_predictions: + break + + title = data.get('title').lower() + for index in _findall(query, title): + predict = _get_predict(title, query, index) + if predict != '' and predict not in predictions: + predictions.append(predict) + + if data.get('description'): + description = data.get('description').lower() + for index in _findall(query, description): + predict = _get_predict(description, query, index) + if predict != '' and predict not in predictions: + predictions.append(predict) + + return predictions[:number_predictions] + + @toolkit.side_effect_free def get_predictions(context, data_dict): - ''' Returns a list of predictions - :param text: the text for which predictions have to be made - :type text: string + ''' Returns a list of predictions from RNN model and DB based + on data store in knowledge hub + + :param query: the search query for which predictions have to be made + :type query: string :returns: predictions :rtype: list ''' + query = data_dict.get('query') + if not query: + raise ValidationError({'query': _('Missing value')}) + if len(query) < int(config.get( + u'ckanext.knowledgehub.rnn.sequence_length', 10)): + return [] - text = data_dict.get('text') - if not text: - raise ValidationError({'text': _('Missing value')}) + if query.isspace(): + return [] + query = query.lower() + + predictions = _get_predictions_from_db(query) + + model = PredictiveSearchModel() + for p in model.predict(query): + if p not in predictions: + predictions.append(p) - return rnn_helpers.predict_completions(text) + return predictions def _search_entity(index, ctx, data_dict): diff --git a/ckanext/knowledgehub/logic/action/update.py b/ckanext/knowledgehub/logic/action/update.py index 7a76b662..e5e0b105 100644 --- a/ckanext/knowledgehub/logic/action/update.py +++ b/ckanext/knowledgehub/logic/action/update.py @@ -71,6 +71,8 @@ def theme_update(context, data_dict): # we need the old theme name in the context for name validation context['theme'] = theme.name + context['title'] = theme.title + context['description'] = theme.description session = context['session'] data, errors = _df.validate(data_dict, knowledgehub_schema.theme_schema(), @@ -92,7 +94,23 @@ def theme_update(context, data_dict): session.add(theme) session.commit() - return _table_dictize(theme, context) + theme_data = _table_dictize(theme, context) + + # Update kwh data + try: + data_dict = { + 'type': 'theme', + 'entity_id': theme_data.get('id'), + 'title': theme_data.get('title'), + 'description': theme_data.get('description') + } + logic.get_action(u'kwh_data_update')(context, data_dict) + except Exception as e: + log.debug('Unable to update theme {} in knowledgehub data: {}'.format( + theme_data.get('id'), str(e) + )) + + return theme_data @toolkit.side_effect_free @@ -126,6 +144,8 @@ def sub_theme_update(context, data_dict): raise logic.NotFound(_('Sub-Theme was not found.')) context['sub_theme'] = sub_theme.name + context['title'] = sub_theme.title + context['description'] = sub_theme.description data, errors = _df.validate(data_dict, knowledgehub_schema.sub_theme_update(), context) @@ -139,7 +159,24 @@ def sub_theme_update(context, data_dict): filter = {'id': id} st = SubThemes.update(filter, data_dict) - return st.as_dict() + sub_theme_data = st.as_dict() + + # Update kwh data + try: + data_dict = { + 'type': 'sub_theme', + 'entity_id': sub_theme_data.get('id'), + 'title': sub_theme_data.get('title'), + 'description': sub_theme_data.get('description') + } + logic.get_action(u'kwh_data_update')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to update sub-theme {} in knowledgehub data: {}'.format( + sub_theme_data.get('id'), str(e) + )) + + return sub_theme_data def research_question_update(context, data_dict): @@ -200,6 +237,20 @@ def research_question_update(context, data_dict): # Update index ResearchQuestion.update_index_doc(rq_data) + # Update kwh data + try: + data_dict = { + 'type': 'research_question', + 'entity_id': rq_data.get('id'), + 'title': rq_data.get('title') + } + logic.get_action(u'kwh_data_update')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to update sub-theme {} in knowledgehub data: {}'.format( + rq_data.get('id'), str(e) + )) + return rq_data @@ -320,6 +371,21 @@ def resource_view_update(context, data_dict): # Update index Visualization.update_index_doc(resource_view_data) + # Update kwh data + try: + data_dict = { + 'type': 'visualization', + 'entity_id': resource_view_data.get('id'), + 'title': resource_view_data.get('title'), + 'description': resource_view_data.get('description') + } + logic.get_action(u'kwh_data_update')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to update visualization %s in knowledgehub data: %s' + % (resource_view_data.get('id'), str(e)) + ) + # this check is done for the unit tests if resource_view_data.get('__extras'): ext = resource_view_data['__extras'] @@ -399,6 +465,21 @@ def dashboard_update(context, data_dict): # Update index Dashboard.update_index_doc(dashboard_data) + # Update kwh data + try: + data_dict = { + 'type': 'dashboard', + 'entity_id': dashboard_data.get('id'), + 'title': dashboard_data.get('title'), + 'description': dashboard_data.get('description') + } + logic.get_action(u'kwh_data_update')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to update dashboard %s in knowledgehub data: %s' + % (dashboard_data.get('id'), str(e)) + ) + return dashboard_data @@ -408,20 +489,48 @@ def package_update(context, data_dict): ''' result = ckan_package_update(context, data_dict) schedule_data_quality_check(result['id']) + + try: + data_dict = { + 'type': 'dataset', + 'entity_id': result.get('id'), + 'title': result.get('title'), + 'description': result.get('notes') + } + logic.get_action(u'kwh_data_update')(context, data_dict) + except Exception as e: + log.debug( + 'Unable to update dataset {} to knowledgehub data: {}'.format( + result.get('id'), str(e) + )) + return result def kwh_data_update(context, data_dict): - ''' - Store Knowledge Hub data - :param type - :param old_content - :param new_content + ''' Update existing knowledgehub data or create new one + + :param type: the type of the entity, can be: + [ + 'search_query', + 'theme', + 'sub_theme', + 'research_question', + 'dataset', + 'visualization', + 'dashboard' + ] + :param title: the title of the entity + :type title: string + :param description: the description of the entity (optional) + :type description: string + :param entity_id: the ID of the entity + :type entity_id: string + :returns: the updated data + :rtype: dict ''' check_access('kwh_data', context, data_dict) - session = context['session'] - data, errors = _df.validate(data_dict, knowledgehub_schema.kwh_data_schema_update(), context) @@ -430,24 +539,32 @@ def kwh_data_update(context, data_dict): raise ValidationError(errors) user = context.get('user') - data['user'] = model.User.by_name(user.decode('utf8')).id + if user: + data['user'] = model.User.by_name(user.decode('utf8')).id - kwh_data = KWHData.get( - user=data['user'], - content=data['old_content'], - type=data['type'] - ).first() + data_filter = {data['type']: data['entity_id']} + kwh_data = KWHData.get(**data_filter).first() if kwh_data: update_data = _table_dictize(kwh_data, context) update_data.pop('id') update_data.pop('created_at') - update_data['content'] = data['new_content'] + update_data['title'] = data.get('title') + update_data['description'] = data.get('description') - filter = {'id': kwh_data.id} - data = KWHData.update(filter, update_data) + kwh_data = KWHData.update(data_filter, update_data) + else: + data_dict = { + 'user': data.get('user'), + 'type': data.get('type'), + 'title': data.get('title'), + 'description': data.get('description'), + data['type']: data['entity_id'] + } + kwh_data = KWHData(**data_dict) + kwh_data.save() - return data.as_dict() + return kwh_data.as_dict() def user_intent_update(context, data_dict): @@ -815,15 +932,18 @@ def resource_validation_revert(context, data_dict): def tag_update(context, data_dict): ''' Update the tag name or vocabulary + :param id: id or name of the tag :type id: string :param name: the name of the tag :type name: string :param vocabulary_id: the id of the vocabulary (optional) :type vocabulary_id: string + :returns: the updated tag :rtype: dictionary ''' + model = context['model'] try: @@ -831,7 +951,7 @@ def tag_update(context, data_dict): except NotAuthorized: raise NotAuthorized(_(u'Need to be system ' u'administrator to administer')) - + schema = knowledgehub_schema.tag_update_schema() data, errors = _df.validate(data_dict, schema, context) if errors: @@ -849,7 +969,7 @@ def tag_update(context, data_dict): session = context['session'] tag.save() session.commit() - + return _table_dictize(tag, context) diff --git a/ckanext/knowledgehub/logic/auth/create.py b/ckanext/knowledgehub/logic/auth/create.py index 33633208..967e6010 100644 --- a/ckanext/knowledgehub/logic/auth/create.py +++ b/ckanext/knowledgehub/logic/auth/create.py @@ -69,8 +69,8 @@ def kwh_data(context, data_dict): ''' Authorization check for storing KWH data ''' - # sysadmins only - return {'success': False} + # all login users + return {'success': True} def corpus_create(context, data_dict): diff --git a/ckanext/knowledgehub/logic/auth/delete.py b/ckanext/knowledgehub/logic/auth/delete.py index 90f0a345..2af9401b 100644 --- a/ckanext/knowledgehub/logic/auth/delete.py +++ b/ckanext/knowledgehub/logic/auth/delete.py @@ -1,3 +1,5 @@ +from ckan.logic.auth.delete import package_delete as ckan_package_delete + def theme_delete(context, data_dict): ''' @@ -54,4 +56,12 @@ def keyword_delete(context, data_dict=None): ''' Authorization check for deletion of a keyword. Sysadmin only. ''' - return {'success': False} \ No newline at end of file + return {'success': False} + + +def package_delete(context, data_dict): + ''' + This auth function must be overriden like this, + otherwise a recursion error is thrown. + ''' + return ckan_package_delete(context, data_dict) diff --git a/ckanext/knowledgehub/logic/schema.py b/ckanext/knowledgehub/logic/schema.py index ad291d12..34069461 100644 --- a/ckanext/knowledgehub/logic/schema.py +++ b/ckanext/knowledgehub/logic/schema.py @@ -135,18 +135,23 @@ def resource_validation_schema(): def kwh_data_schema(): return { 'type': [not_empty, validators.kwh_data_type_validator], - 'content': [not_empty, unicode], + 'title': [not_empty, unicode], + 'description': [ignore_missing, unicode], 'theme': [ignore_missing, unicode], 'sub_theme': [ignore_missing, unicode], - 'rq': [ignore_missing, unicode] + 'research_question': [ignore_missing, unicode], + 'dataset': [ignore_missing, unicode], + 'visualization': [ignore_missing, unicode], + 'dashboard': [ignore_missing, unicode] } def kwh_data_schema_update(): return { 'type': [not_empty, validators.kwh_data_type_validator], - 'old_content': [not_empty, unicode], - 'new_content': [not_empty, unicode] + 'title': [not_empty, unicode], + 'description': [ignore_missing, unicode], + 'entity_id': [not_empty, unicode] } diff --git a/ckanext/knowledgehub/logic/validators.py b/ckanext/knowledgehub/logic/validators.py index c48d6ced..4329cbb4 100644 --- a/ckanext/knowledgehub/logic/validators.py +++ b/ckanext/knowledgehub/logic/validators.py @@ -177,14 +177,22 @@ def resource_feedbacks_type_validator(key, data, errors, context): def kwh_data_type_validator(key, data, errors, context): clean_data = df.unflatten(data) - rf_type = clean_data.get('type') - - rf_types = ['theme', 'sub-theme', 'rq', 'search'] - - if rf_type not in rf_types: + entity_type = clean_data.get('type') + + entity_types = [ + 'search_query', + 'theme', + 'sub_theme', + 'research_question', + 'dataset', + 'visualization', + 'dashboard' + ] + + if entity_type not in entity_types: errors[key].append( p.toolkit._( - 'Allowed KWH data types are: %s' % ', '.join(rf_types) + 'Allowed KWH data types are: %s' % ', '.join(entity_types) ) ) diff --git a/ckanext/knowledgehub/model/kwh_data.py b/ckanext/knowledgehub/model/kwh_data.py index acb99184..921b4896 100644 --- a/ckanext/knowledgehub/model/kwh_data.py +++ b/ckanext/knowledgehub/model/kwh_data.py @@ -21,28 +21,25 @@ class KWHData(DomainObject): @classmethod def get(cls, id_or_name=None, **kwargs): - q = kwargs.get('q') - limit = kwargs.get('limit') - offset = kwargs.get('offset') - order_by = kwargs.get('order_by') - - kwargs.pop('q', None) - kwargs.pop('limit', None) - kwargs.pop('offset', None) - kwargs.pop('order_by', None) + q = kwargs.pop('q', None) + limit = kwargs.pop('limit', None) + offset = kwargs.pop('offset', None) + order_by = kwargs.pop('order_by', None) query = Session.query(cls).autoflush(False) query = query.filter_by(**kwargs) if id_or_name: query = query.filter( - or_(cls.id == id_or_name, cls.content == id_or_name) + or_(cls.id == id_or_name, cls.title == id_or_name) ) if q: query = query.filter( - or_(cls.content.contains(q), - cls.content.ilike(r"%{}%".format(q))) + or_(cls.title.contains(q), + cls.title.ilike(r"%{}%".format(q)), + cls.description.contains(q), + cls.description.ilike(r"%{}%".format(q))) ) if order_by: @@ -71,7 +68,7 @@ def delete(cls, filter): Session.delete(obj) Session.commit() else: - raise logic.NotFound + raise logic.NotFound('data not found') kwh_data_table = Table( @@ -87,9 +84,12 @@ def delete(cls, filter): types.UnicodeText, nullable=False), Column( - 'content', + 'title', types.UnicodeText, nullable=False), + Column( + 'description', + types.UnicodeText), Column( 'user', types.UnicodeText, @@ -103,9 +103,21 @@ def delete(cls, filter): types.UnicodeText, ForeignKey('sub_themes.id', ondelete='CASCADE')), Column( - 'rq', + 'research_question', types.UnicodeText, ForeignKey('research_question.id', ondelete='CASCADE')), + Column( + 'dataset', + types.UnicodeText, + ForeignKey('package.id', ondelete='CASCADE')), + Column( + 'visualization', + types.UnicodeText, + ForeignKey('resource_view.id', ondelete='CASCADE')), + Column( + 'dashboard', + types.UnicodeText, + ForeignKey('ckanext_knowledgehub_dashboard.id', ondelete='CASCADE')), Column( 'created_at', types.DateTime, diff --git a/ckanext/knowledgehub/model/theme.py b/ckanext/knowledgehub/model/theme.py index e28c1658..7503e9b3 100644 --- a/ckanext/knowledgehub/model/theme.py +++ b/ckanext/knowledgehub/model/theme.py @@ -44,6 +44,7 @@ class Theme(DomainObject): + doctype = 'theme' @classmethod def get(cls, reference): @@ -106,6 +107,36 @@ def delete(cls, filter): else: raise logic.NotFound(_(u'Theme')) + @classmethod + def add_to_kwh_data(cls, ctx, theme): + content = theme.get('title') + if theme.get('description'): + content += ' %s' % theme.get('description') + + data_dict = { + 'type': cls.doctype, + 'content': content, + 'theme': theme.get('id') + } + logic.get_action('kwh_data_create')(ctx, data_dict) + + @classmethod + def update_kwh_data(cls, ctx, theme): + old_content = ctx.get('title') + if ctx.get('description'): + old_content += ' %s' % ctx.get('description') + + new_content = theme.get('title') + if theme.get('description'): + new_content += ' %s' % theme.get('description') + + data_dict = { + 'type': cls.doctype, + 'old_content': old_content, + 'new_content': new_content + } + logic.get_action(u'kwh_data_update')(ctx, data_dict) + mapper(Theme, theme_table) diff --git a/ckanext/knowledgehub/rnn/__init__.py b/ckanext/knowledgehub/rnn/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/ckanext/knowledgehub/rnn/helpers.py b/ckanext/knowledgehub/rnn/helpers.py deleted file mode 100644 index e6395233..00000000 --- a/ckanext/knowledgehub/rnn/helpers.py +++ /dev/null @@ -1,138 +0,0 @@ -import logging -import os - -import heapq -import numpy as np -import tensorflow as tf -from keras.models import load_model -from filelock import FileLock - -from ckan.plugins import toolkit -from ckan import logic -from ckan.common import config - - -log = logging.getLogger(__name__) - - -def prepare_rnn_corpus(corpus): - unique_chars = sorted(list(set(corpus))) - char_indices = dict((c, i) for i, c in enumerate(unique_chars)) - indices_char = dict((i, c) for i, c in enumerate(unique_chars)) - - return unique_chars, char_indices, indices_char - - -def prepare_input(text, unique_chars, char_indices): - sequence_length = int( - config.get(u'ckanext.knowledgehub.rnn.sequence_length', 10) - ) - x = np.zeros((1, sequence_length, len(unique_chars))) - for t, char in enumerate(text): - x[0, t, char_indices[char]] = 1. - - return x - - -def sample(preds, top_n=3): - preds = np.asarray(preds).astype('float64') - preds = np.log(preds) - exp_preds = np.exp(preds) - preds = exp_preds / np.sum(exp_preds) - - return heapq.nlargest(top_n, range(len(preds)), preds.take) - - -def predict_completion(model, text, unique_chars, char_indices, indices_char): - original_text = text - generated = text - completion = '' - while True: - x = prepare_input(text, unique_chars, char_indices) - preds = model.predict(x, verbose=0)[0] - next_index = sample(preds, top_n=1)[0] - next_char = indices_char[next_index] - text = text[1:] + next_char - if next_char in ['?', '!', '.', ',', ' ']: - return completion - - completion += next_char - if (len(original_text + completion) + 2 > len(original_text) and - next_char == ' '): - - return completion - - -def predict_completions(text): - text = text.lower() - try: - c = toolkit.get_action('get_last_rnn_corpus')({}, {}).lower() - unique_chars, char_indices, indices_char = prepare_rnn_corpus(c) - unique_chars_text = sorted(list(set(text))) - - for char in unique_chars_text: - if char not in unique_chars: - return [] - except Exception as e: - log.debug('Error while preparing the RNN corpus: %s' % str(e)) - return [] - - sequence_length = int( - config.get(u'ckanext.knowledgehub.rnn.sequence_length', 10) - ) - if sequence_length > len(text): - return [] - - text = text[-sequence_length:].lower() - - model_path = config.get( - u'ckanext.knowledgehub.rnn.model', - './keras_model.h5' - ) - if not os.path.isfile(model_path): - log.debug('Error: RNN model does not exist!') - return [] - - try: - lock_model_path = '%s.lock' % model_path - lock = FileLock(lock_model_path) - with lock.acquire(timeout=1): - n = int( - config.get(u'ckanext.knowledgehub.rnn.number_prediction', 3) - ) - graph = tf.Graph() - with graph.as_default(): - session = tf.Session() - with session.as_default(): - model = load_model(model_path) - x = prepare_input(text, unique_chars, char_indices) - preds = model.predict(x, verbose=0)[0] - next_indices = sample(preds, n) - return [ - indices_char[idx] + - predict_completion( - model, - text[1:] + indices_char[idx], - unique_chars, - char_indices, - indices_char - ) for idx in next_indices] - except Exception as e: - log.debug('Error while prediction: %s' % str(e)) - return [] - - -def get_kwh_data(): - corpus = '' - try: - kwh_data = toolkit.get_action('kwh_data_list')({}, {}) - except Exception as e: - log.debug('Error while loading KnowledgeHub data: %s' % str(e)) - return corpus - - if kwh_data.get('total'): - data = kwh_data.get('data', []) - for entry in data: - corpus += ' %s' % entry.get('content') - - return corpus diff --git a/ckanext/knowledgehub/rnn/worker.py b/ckanext/knowledgehub/rnn/worker.py deleted file mode 100644 index 95666afb..00000000 --- a/ckanext/knowledgehub/rnn/worker.py +++ /dev/null @@ -1,181 +0,0 @@ -import os -import logging -import pickle -import time - -import numpy as np -import tensorflow as tf -from filelock import FileLock - -from keras.models import Sequential, load_model -from keras.layers import Dense, Activation -from keras.layers import LSTM, Dropout -from keras.layers import TimeDistributed -from keras.layers.core import Dense, Activation, Dropout, RepeatVector -from keras.optimizers import RMSprop -from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau - -from ckan.common import config -from ckan.plugins import toolkit -from ckan.controllers.admin import get_sysadmins - -from ckanext.knowledgehub.rnn import helpers as rnn_h - - -np.random.seed(42) -tf.set_random_seed(42) -log = logging.getLogger(__name__) - - -def learn(): - try: - original_data = rnn_h.get_kwh_data() - data = original_data.lower() - except Exception as e: - log.debug('Error while training the model: %s' % str(e)) - - unique_chars, char_indices, indices_char = rnn_h.prepare_rnn_corpus(data) - log.info('unique chars: %d' % len(unique_chars)) - - min_length_corpus = int( - config.get(u'ckanext.knowledgehub.rnn.min_length_corpus', 300) - ) - if min_length_corpus > len(data): - log.info( - 'The minimum length of the corpus is %s, current corpus has %d' - % (min_length_corpus, len(data))) - return - - sequence_length = int( - config.get(u'ckanext.knowledgehub.rnn.sequence_length', 10) - ) - if len(data) <= sequence_length: - log.info( - 'The length of the RNN sequence %d, given %d' - % (sequence_length, len(data))) - return - - step = int(config.get(u'ckanext.knowledgehub.rnn.sentence_step', 3)) - sentences = [] - next_chars = [] - for i in range(0, len(data) - sequence_length, step): - sentences.append(data[i: i + sequence_length]) - next_chars.append(data[i + sequence_length]) - - log.info('num training examples: %d ' % len(sentences)) - - X = np.zeros(( - len(sentences), - sequence_length, - len(unique_chars)), - dtype=np.bool - ) - y = np.zeros((len(sentences), len(unique_chars)), dtype=np.bool) - for i, sentence in enumerate(sentences): - for t, char in enumerate(sentence): - X[i, t, char_indices[char]] = 1 - y[i, char_indices[next_chars[i]]] = 1 - - history = '' - try: - model = Sequential() - model.add(LSTM(128, input_shape=(sequence_length, len(unique_chars)))) - model.add(Dense(len(unique_chars))) - model.add(Activation('softmax')) - - optimizer = RMSprop(lr=0.01) - model.compile( - loss='categorical_crossentropy', - optimizer=optimizer, - metrics=['accuracy']) - - model_path = config.get( - u'ckanext.knowledgehub.rnn.model', - './keras_model.h5' - ) - - model_dir = os.path.dirname(model_path) - train_module_path = os.path.join( - model_dir, 'keras_module_%s.h5' % time.time() - ) - if not os.path.exists(model_dir): - try: - os.makedirs(model_dir) - except Exception as e: - log.debug('Error while creating RNN model DIR: %s' % str(e)) - return - - # earlyStopping, mcp_save and reduce_lr_loss are used to avoid - # overfitting and keep only best results - earlyStopping = EarlyStopping( - monitor='val_loss', - patience=10, - verbose=0, - mode='min' - ) - mcp_save = ModelCheckpoint( - train_module_path, - save_best_only=True, - monitor='val_loss', - mode='min' - ) - reduce_lr_loss = ReduceLROnPlateau( - monitor='val_loss', - factor=0.1, - patience=7, - verbose=1, - epsilon=1e-4, - mode='min' - ) - history = model.fit( - X, - y, - validation_split=0.05, - batch_size=128, - epochs=int(config.get(u'ckanext.knowledgehub.rnn.max_epochs', 50)), - shuffle=True, - callbacks=[ - earlyStopping, - mcp_save, - reduce_lr_loss - ] - ).history - except Exception as e: - log.debug('Error while creating RNN model: %s' % str(e)) - return - - try: - sysadmin = get_sysadmins()[0].name - context = {'user': sysadmin, 'ignore_auth': True} - # model.save(model_path) - toolkit.get_action('corpus_create')(context, { - 'corpus': original_data - }) - - lock_model_path = '%s.lock' % model_path - lock = FileLock(lock_model_path) - with lock.acquire(timeout=1000): - if os.path.exists(model_path): - os.remove(model_path) - os.rename(train_module_path, model_path) - except Exception as e: - log.debug('Error while saving RNN model: %s' % str(e)) - return - - history_path = config.get( - u'ckanext.knowledgehub.rnn.history', - './history.p' - ) - history_dir = os.path.dirname(history_path) - if not os.path.exists(history_dir): - try: - os.makedirs(history_dir) - except Exception as e: - log.debug('Error while creating RNN history DIR: %s' % str(e)) - return - - try: - pickle.dump(history, open(history_path, "wb")) - except Exception as e: - log.debug('Error while saving RNN history: %s' % str(e)) - return diff --git a/ckanext/knowledgehub/tests/test_actions.py b/ckanext/knowledgehub/tests/test_actions.py index 69271890..382c1958 100644 --- a/ckanext/knowledgehub/tests/test_actions.py +++ b/ckanext/knowledgehub/tests/test_actions.py @@ -10,6 +10,7 @@ from ckan.plugins import toolkit from ckan import model from datetime import datetime +from ckan.common import config from ckanext.knowledgehub.model.theme import theme_db_setup from ckanext.knowledgehub.model.research_question import setup as rq_db_setup @@ -43,8 +44,10 @@ ResourceValidate, ) from ckanext.knowledgehub.model.keyword import extend_tag_table +from ckanext.knowledgehub.lib.rnn import PredictiveSearchWorker from ckanext.knowledgehub.lib.util import monkey_patch from ckanext.datastore.logic.action import datastore_create + from pysolr import Results assert_equals = nose.tools.assert_equals @@ -70,14 +73,20 @@ def setup(self): rnn_corpus_setup() resource_validate_setup() os.environ["CKAN_INI"] = './test.ini' + extend_tag_table() + config['ckanext.knowledgehub.rnn.min_length_corpus'] = 100 + + if not plugins.plugin_loaded('knowledgehub'): + plugins.load('knowledgehub') if not plugins.plugin_loaded('datastore'): plugins.load('datastore') if not plugins.plugin_loaded('datapusher'): plugins.load('datapusher') - extend_tag_table() @classmethod def teardown_class(self): + if not plugins.plugin_loaded('knowledgehub'): + plugins.unload('knowledgehub') if not plugins.plugin_loaded('datastore'): plugins.unload('datastore') if not plugins.plugin_loaded('datapusher'): @@ -278,11 +287,12 @@ def test_kwh_data_create(self): data_dict = { 'type': 'theme', - 'content': 'Refugees in Syria' + 'title': 'Refugees in Syria', + 'description': 'Number of refugees in Syria 2020' } kwh_data = create_actions.kwh_data_create(context, data_dict) - assert_equals(kwh_data.get('content'), data_dict.get('content')) + assert_equals(kwh_data.get('title'), data_dict.get('title')) def test_corpus_create(self): user = factories.Sysadmin() @@ -769,9 +779,10 @@ def test_kwh_data_list(self): data_dict = { 'type': 'theme', - 'content': 'Refugees in Syria' + 'title': 'Refugees in Syria', + 'description': 'Number of refugees in Syria 2020' } - kwh_data = create_actions.kwh_data_create(context, data_dict) + create_actions.kwh_data_create(context, data_dict) kwh_data_list = get_actions.kwh_data_list(context, {}) @@ -790,7 +801,7 @@ def test_get_last_rnn_corpus(self): data_dict = { 'corpus': 'KHW corpus' } - kwh_corpus = create_actions.corpus_create(context, data_dict) + create_actions.corpus_create(context, data_dict) last_rnn_corpus = get_actions.get_last_rnn_corpus(context, {}) @@ -968,11 +979,56 @@ def test_resource_validate_status(self): resource_validate_show = get_actions.resource_validate_status( context, {'id': rv.get('resource')} - ) + ) assert_equals( resource_validate_show.get('what'), data_dict.get('what') + ) + + def test_get_predictions(self): + data_dict = { + 'type': 'theme', + 'title': 'Returns Resettlement Protection Social', + 'description': ( + 'Network Displacement Trends Labor Market Social ' + 'Cohesion Civil Documentation Demographics ' + 'Reception/Asylum Conditions Conditions of Return ' + 'What is the residential distribution of refugees in ' + 'COA? What is the change in total population numbers ' + 'before and after the crisis? What is the breakdown ' + 'of refugees by place of origin at governorate level?' + ' What are the monthly arrival trends by place of ' + 'origin at governorate level? What is the average ' + 'awaiting period in COA prior to registration? What ' + 'are the demographic characteristics of the population?' ) + } + create_actions.kwh_data_create(get_context(), data_dict) + + worker = PredictiveSearchWorker() + worker.run() + + data_dict = { + 'name': 'theme-name', + 'title': 'Test title', + 'description': 'Test description' + } + theme = create_actions.theme_create(get_context(), data_dict) + + data_dict = { + 'type': 'theme', + 'title': 'Refugees in Syria', + 'description': 'Number of refugees in Syria 2020', + 'theme': theme['id'] + } + create_actions.kwh_data_create(get_context(), data_dict) + + predicts = get_actions.get_predictions( + get_context(), + {'query': 'Refugees in Sy'} + ) + + assert(len(predicts) > 3) class TestKWHDeleteActions(ActionsBase): @@ -1189,6 +1245,12 @@ def test_resource_validate_delete(self): 'Validation report of the resource is deleted.' ) + def test_package_delete(self): + dataset = create_dataset() + r = delete_actions.package_delete(get_context(), {'id': dataset['id']}) + + assert_equals(r, None) + class TestKWHUpdateActions(ActionsBase): @@ -1515,16 +1577,26 @@ def test_kwh_data_update(self): 'session': model.Session } + data_dict = { + 'name': 'theme-name', + 'title': 'Test title', + 'description': 'Test description' + } + theme = create_actions.theme_create(context, data_dict) + data_dict = { 'type': 'theme', - 'content': 'Refugees in Syria' + 'title': 'Refugees in Syria', + 'description': 'Number of refugees in Syria 2020', + 'theme': theme['id'] } - kwh_data = create_actions.kwh_data_create(context, data_dict) + create_actions.kwh_data_create(context, data_dict) data_dict = { 'type': 'theme', - 'old_content': 'Refugees in Syria', - 'new_content': 'Refugees in Syria updated' + 'entity_id': theme['id'], + 'title': 'Refugees in Syria', + 'description': 'Refugees in Syria updated' } kwh_data_updated = update_actions.kwh_data_update( @@ -1533,8 +1605,8 @@ def test_kwh_data_update(self): ) assert_equals( - kwh_data_updated.get('content'), - data_dict.get('new_content') + kwh_data_updated.get('description'), + data_dict.get('description') ) def test_resource_validate_update(self): diff --git a/ckanext/knowledgehub/tests/test_rnn.py b/ckanext/knowledgehub/tests/test_rnn.py index e9f0f09d..c5d11ec4 100644 --- a/ckanext/knowledgehub/tests/test_rnn.py +++ b/ckanext/knowledgehub/tests/test_rnn.py @@ -1,14 +1,10 @@ """Tests for rnn/worker.py.""" import os -import mock import nose.tools -from ckan.tests import factories from ckan import plugins from ckan.tests import helpers -from ckan.plugins import toolkit -from ckan import model from ckan.common import config from ckanext.knowledgehub.model.theme import theme_db_setup @@ -23,12 +19,9 @@ setup as kwh_data_setup ) from ckanext.knowledgehub.logic.action import create as create_actions -from ckanext.knowledgehub.rnn.helpers import predict_completions -from ckanext.knowledgehub.rnn.worker import learn -from ckanext.knowledgehub.logic.action import get as get_actions -from ckanext.knowledgehub.tests.helpers import (User, - create_dataset, - mock_pylons) +from ckanext.knowledgehub.lib.rnn import PredictiveSearchModel +from ckanext.knowledgehub.lib.rnn import PredictiveSearchWorker +from ckanext.knowledgehub.tests.helpers import get_context assert_equals = nose.tools.assert_equals assert_raises = nose.tools.assert_raises @@ -37,62 +30,77 @@ class ActionsBase(helpers.FunctionalTestBase): def setup(self): - # helpers.reset_db() - # theme_db_setup() - # sub_theme_db_setup() - # rq_db_setup() - # dashboard_db_setup() - # resource_feedback_setup() - # kwh_data_setup() - # rnn_corpus_setup() + helpers.reset_db() + theme_db_setup() + sub_theme_db_setup() + rq_db_setup() + dashboard_db_setup() + resource_feedback_setup() + kwh_data_setup() + rnn_corpus_setup() os.environ["CKAN_INI"] = 'subdir/test.ini' - learn() + config['ckanext.knowledgehub.rnn.min_length_corpus'] = 100 if not plugins.plugin_loaded('knowledgehub'): plugins.load('knowledgehub') -class TestRNN(ActionsBase): - - def test_learn(self): - user = factories.Sysadmin() - context = { - 'user': user.get('name'), - 'auth_user_obj': User(user.get('id')), - 'ignore_auth': True, - 'model': model, - 'session': model.Session - } +class TestPredictiveSearchWorker(ActionsBase): + def test_run(self): data_dict = { 'type': 'theme', - 'content': ('Demo again Returns Resettlement Protection Social ' - 'Network Displacement Trends Labor Market Social ' - 'Cohesion Civil Documentation Demographics ' - 'Reception/Asylum Conditions Conditions of Return ' - 'What is the residential distribution of refugees in ' - 'COA? What is the change in total population numbers ' - 'before and after the crisis? What is the breakdown ' - 'of refugees by place of origin at governorate level?' - ' What are the monthly arrival trends by place of ' - 'origin at governorate level? What is the average ' - 'awaiting period in COA prior to registration? What ' - 'are the demographic characteristics of the ' - 'population?') + 'title': 'Returns Resettlement Protection Social', + 'description': ( + 'Network Displacement Trends Labor Market Social ' + 'Cohesion Civil Documentation Demographics ' + 'Reception/Asylum Conditions Conditions of Return ' + 'What is the residential distribution of refugees in ' + 'COA? What is the change in total population numbers ' + 'before and after the crisis? What is the breakdown ' + 'of refugees by place of origin at governorate level?' + ' What are the monthly arrival trends by place of ' + 'origin at governorate level? What is the average ' + 'awaiting period in COA prior to registration? What ' + 'are the demographic characteristics of the population?' + ) } - kwh_data = create_actions.kwh_data_create(context, data_dict) + create_actions.kwh_data_create(get_context(), data_dict) + + worker = PredictiveSearchWorker() + worker.run() + + assert_equals(os.path.isfile(worker.weights_path), True) + assert_equals(os.path.isfile(worker.network_path), True) - learn() - model_path = config.get( - u'ckanext.knowledgehub.rnn.model', - './keras_model.h5' - ) +class TestPredictiveSearchModel(ActionsBase): + + def test_predict(self): + data_dict = { + 'type': 'theme', + 'title': 'Returns Resettlement Protection Social', + 'description': ( + 'Network Displacement Trends Labor Market Social ' + 'Cohesion Civil Documentation Demographics ' + 'Reception/Asylum Conditions Conditions of Return ' + 'What is the residential distribution of refugees in ' + 'COA? What is the change in total population numbers ' + 'before and after the crisis? What is the breakdown ' + 'of refugees by place of origin at governorate level?' + ' What are the monthly arrival trends by place of ' + 'origin at governorate level? What is the average ' + 'awaiting period in COA prior to registration? What ' + 'are the demographic characteristics of the population?' + ) + } + create_actions.kwh_data_create(get_context(), data_dict) - assert_equals(os.path.isfile(model_path), True) + worker = PredictiveSearchWorker() + worker.run() - def test_predict_completions(self): text = 'Demo again Returns Resettlement' - completions = predict_completions(text) + model = PredictiveSearchModel() + predicts = model.predict(text) - assert_equals(len(completions), 3) + assert_equals(len(predicts), 3) diff --git a/ckanext/knowledgehub/views/theme.py b/ckanext/knowledgehub/views/theme.py index 77ba09e9..224178c5 100644 --- a/ckanext/knowledgehub/views/theme.py +++ b/ckanext/knowledgehub/views/theme.py @@ -189,18 +189,6 @@ def post(self): errors, error_summary) - try: - kwh_data = { - 'type': 'theme', - 'content': theme.get('title'), - 'theme': theme.get('id') - } - logic.get_action(u'kwh_data_create')( - context, kwh_data - ) - except Exception as e: - log.debug('Error while storing KWH data: %s' % str(e)) - return h.redirect_to(u'theme.read', name=theme['name']) @@ -218,7 +206,6 @@ def _prepare(self, name): context, data_dict) check_access(u'theme_update', context) context['id'] = theme['id'] - context['old_theme'] = theme except NotAuthorized: return base.abort(403, _(u'Unauthorized' u' to update theme')) @@ -273,19 +260,6 @@ def post(self, name): return self.get(name, data_dict, errors, error_summary) - try: - old_theme = context.get('old_theme') - kwh_data = { - 'type': 'theme', - 'old_content': old_theme.get('title'), - 'new_content': theme.get('title') - } - logic.get_action(u'kwh_data_update')( - context, kwh_data - ) - except Exception as e: - log.debug('Error while storing KWH data: %s' % str(e)) - return h.redirect_to(u'theme.read', name=theme['name']) diff --git a/requirements.txt b/requirements.txt index c91b5666..3ef6cecf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ pymssql==2.1.4 functools32==3.2.3.post2 -tensorflow==1.13.1 +tensorflow==1.6.0 keras==2.2.4 h5py==2.9.0 filelock==3.0.12