In [234]:
import re
import nltk
import functools
import copy
import typing as t
import pandas as pd
import numpy as np
from nltk.stem.snowball import FrenchStemmer

In [257]:
from tensorflow.keras.layers import Dense, Convolution1D, MaxPooling1D
from tensorflow.keras.models import Sequential

In [122]:
nltk.download('french')

[nltk_data] Error loading french: Package 'french' not found in index


False

In [273]:
def apply_for_each_key(data: dict, func: t.Callable) -> dict:
    return {k:func(data[k]) for k in data}

In [268]:
class DataLoader:
    def load(self) -> dict:
        raise Exception('load method not implemented')

In [448]:
class InputData:
    examples: t.Dict[str, t.List[str]]
    responses: t.Dict[str, t.List[str]]
        
    def __init__(self, examples, responses):
        self.examples = examples
        self.responses = responses

In [531]:
class MdLoader(DataLoader):
    path: str
        
    def __init__(self, path: str):
        self.path = path
        
    def get_if_match(self, regex, text):
        matcher = re.match(regex, text)
        if matcher is None:
            return None
        return matcher.group(1)
        
    def get_list_item(self, line):
        return self.get_if_match(r'- (.*)$', line)
    
    def get_intent_name(self, line):
        return self.get_if_match(r'^## (.*)$', line)
    
    def get_sub_part(self, line: str) -> str:
        return self.get_if_match(r'^### (.*)$', line)
    
    def load(self) -> InputData:
        with open(path, 'r', encoding='utf-8') as file:
            content = [re.sub('\\n', '', l) for l in file]    
        mega_part = ''
        current_intent = ''
        current_sub_part = ''
        intents = dict()
        response = dict()
        
        for line in content:
            if line == '# intents':
                mega_part = 'intents'
            if mega_part == 'intents':
                intent_name: str = self.get_intent_name(line)
                list_item: str = self.get_list_item(line)
                sub_part: str = self.get_sub_part(line)
                    
                if intent_name is not None and intent_name != '':
                    current_intent = intent_name
                    if intent_name != 'not_found':
                        intents[intent_name] =  []
                    response[intent_name] = []
                    
                if sub_part is not None:
                    current_sub_part = sub_part
                
                if current_sub_part == 'examples' and list_item is not None:
                    if current_intent != 'not_found':
                        intents[current_intent].append(list_item)
                
                if current_sub_part == 'response' and list_item is not None:
                    response[current_intent].append(list_item)
                    
        return InputData(intents, response)

In [509]:
class Preprocessing:
    def __init__(self, data_loader: DataLoader):
        self.tokenizer = nltk.RegexpTokenizer(r'\w+')
        self.stop_words = nltk.corpus.stopwords.words('french')
        self.stemmer = FrenchStemmer()
        
        # TODO compose it
        inputs: InputData = data_loader.load()
        data = inputs.examples
        self.responses = inputs.responses
        data = self.to_lower_case_all(data)
        data = self.tokenize_all_examples(data)
        data = self.remove_stop_words_for_all(data)
        data = self.lemmatize_all(data)
        self.data = data
        
    def to_lower_case_one(self, example: str):
        return example.lower()
        
    def to_lower_case_all(self, data):
        to_lower_case = lambda examples: [self.to_lower_case_one(ex) for ex in examples]
        return apply_for_each_key(data, to_lower_case)
    
    def tokenize_one_example(self, example):
        return self.tokenizer.tokenize(example)
    
    def tokenize_all_examples(self, data):
        tokenize = lambda data_list: [self.tokenize_one_example(d) for d in data_list]
        return apply_for_each_key(data, tokenize)
    
    def remove_stop_words(self, example):
        return [w for w in example if not w in self.stop_words]
    
    def remove_stop_words_for_all(self, data):
        remove_sw = lambda examples: [self.remove_stop_words(ex) for ex in examples]
        return apply_for_each_key(data, remove_sw)
    
    def lemmatize(self, example):
        return [self.stemmer.stem(w) for w in example]
    
    def lemmatize_all(self, data):
        get_lems = lambda examples: [self.lemmatize(ex) for ex in examples]
        return apply_for_each_key(data, get_lems)
    
    def process_sentence(self, sentence):
        # TODO compose
        data = self.to_lower_case_one(sentence)
        data = self.tokenize_one_example(data)
        data = self.remove_stop_words(data)
        data = self.lemmatize(data)
        return data

In [351]:
class IndexedRepresentation:
    index_columns: t.List[str] = ['lem', 'index']
    index: pd.DataFrame
    index_len: int
    data: t.Dict[str, np.array]
    data_merged: t.Dict[str, np.array]
    
    def __init__(self, data):
        self.index = self.build_index(data)
        self.index_len = self.index.shape[0]
        
    def build_index(self, data) -> pd.DataFrame:
        index = dict()
        idx = 0
        for intent in data:
            for example in data[intent]:
                for lem in example:
                    if index.get(lem) is None:
                        index[lem] = idx
                        idx += 1
        return pd.DataFrame([{'lem': k, 'index': index[k]} for k in index], columns=self.index_columns)
    
    def get_index_from_lem(self, lem) -> int:
        idx = self.index.loc[self.index['lem'] == lem, 'index']
        if idx.empty:
            return None
        else:
            return idx.values[0]
        
    def get_matrix_from_index(self, idx) -> np.array:
        matrix = np.zeros(self.index_len)
        matrix[idx] = 1
        return matrix
    
    def get_matrix_from_lem(self, lem) -> np.array:
        idx = self.get_index_from_lem(lem)
        if idx is None:
            return idx
        else:
            return self.get_matrix_from_index(idx)
    
    def process_words_matrix(self, example) -> np.array:
        return np.array(
            [self.get_matrix_from_lem(l) for l in example if not self.get_matrix_from_lem(l) is None])

In [352]:
class MatrixRepresentation(IndexedRepresentation):
    data: t.Dict[str, np.array]
        
    def __init__(self, data):
        IndexedRepresentation.__init__(self, data)
        self.data = self.process_words_matrix_for_all(data)
    
    def process_words_matrix_for_all(self, data) -> t.Dict[str, np.array]:
        get_matrix = lambda examples: np.array(
            [self.process_words_matrix(ex) for ex in examples])
        return apply_for_each_key(data, get_matrix)
    
    def process_new_data(self, lems) -> np.array:
        return self.process_words_matrix(lems)

In [353]:
class MergedMatrixRepresentation(IndexedRepresentation):
    data: t.Dict[str, np.array]
        
    def __init__(self, data):
        IndexedRepresentation.__init__(self, data)
        self.data = self.process_merged_sentences_for_all(data)
        
    def process_words_merged_matrix(self, example) -> np.array:
        matrix = self.process_words_matrix(example)
        return sum(list(matrix))
    
    def process_merged_sentences_for_all(self, data) -> t.Dict[str, np.array]:
        get_matrix = lambda examples: np.array(
            [self.process_words_merged_matrix(ex) for ex in examples])
        return apply_for_each_key(data, get_matrix)
    
    def process_new_data(self, lems) -> np.array:
        return self.process_words_merged_matrix(lems)

In [289]:
class AbstractClassifier:
    def build(self, data):
        raise Exception('build mathod not implemented')
        
    def train(self):
        raise Exception('train mathod not implemented')
        
    def predict(self, value):
        raise Exception('predict method not implemented')

In [262]:
class ConvolutionClassifier(AbstractClassifier):
    data: t.Dict[str, np.array]
    input_shape: t.Tuple[int, int]
    model: Sequential
    
    def __init__(self, input_shape: t.Tuple[int, int]):
        AbstractClassifier.__init__(self)
        self.input_shape = input_shape
    
    def build(self, data):
        self.data = data
        self.model = Sequential([
            Convolution1D(
                filters=32, 
                kernel_size=3, 
                activation='relu', 
                input_shape=(self.input_shape)),
            MaxPooling1D(),
            Dense(units=100)
        ])
        
    def train(self):
        return 'train'
    
    def predict(self, value):
        return 'predict'

In [360]:
class ClassificationProcessor:
    training_set: t.Dict[str, np.array]
    classifier: AbstractClassifier
    
    def __init__(self, 
                 classifier: AbstractClassifier, 
                 training_set: t.Dict[str, np.array]):
        self.training_set = training_set
        self.classifier = classifier
        self.classifier.build(training_set)
        
    def train(self):
        classifier.train()
    
    def predict(self, data: np.array):
        return classifier.predict(data)

In [444]:
class NaiveBayseClassifier(AbstractClassifier):
    """besoin de la représentation mergée des exemples
    """
    data: t.Dict[str, np.array]
    proba_per_intent: t.Dict[str, np.array]
        
    def build(self, data: t.Dict[str, np.array]):
        self.data = data
    
    def get_proba_per_intent(self, examples):
        return sum(list(examples)) / len(examples)
    
    def train(self):
        self.proba_per_intent: t.Dict[str, np.array] = \
            {k:self.get_proba_per_intent(self.data[k]) for k in self.data}
    
    def predict(self, value: np.array) -> t.Tuple[str, float]:
        max_p = 0
        intents = list(self.data.keys())
        best_intent = 'not_found'
        for intent in intents:
            repres = value * self.proba_per_intent[intent]
            repres = [e for e in repres if e != 0]
            current_p = 0
            if len(repres) != 0:
                current_p = np.prod(repres)
            if current_p > max_p:
                max_p = current_p
                best_intent = intent
        return best_intent, max_p

In [540]:
loader = MdLoader('./training.md')
processor = Preprocessing(loader)
merged_matrix_repr = MergedMatrixRepresentation(processor.data)

classifier = NaiveBayseClassifier()
classifier_processor = ClassificationProcessor(classifier, merged_matrix_repr.data)
classifier_processor.train()

In [520]:
def get_random_message(messages: t.List[str]) -> str:
    i = np.random.randint(low=0, high=len(messages))
    return messages[i]

In [554]:
message = "j'aimerai voir un bon film"
message = processor.process_sentence(message)
message = merged_matrix_repr.process_new_data(message)
res = classifier_processor.predict(message)
get_random_message(processor.responses[res[0]])

"j'en sais rien moi !"