In [2]:
import re

from joblib import load

from utils.preprocessing import preprocessing

order_labels = ["Quantity", "Pizza", "Topping", "Size", "Crust", "O"]
customer_info_labels = [
    "B-Cus",
    "I-Cus",
    "B-Phone",
    "B-Address",
    "I-Address",
    "B-Payment",
    "I-Payment",
    "O",
]


class EntitiesRecognizer:
    def __init__(self, model_path: str, is_order: bool):
        self.model = self._load_model(model_path)
        self.labels = order_labels if is_order else customer_info_labels
        self.is_order = is_order

    def _load_model(self, model_path):
        return load(model_path)

    def word2features(self, sentence, i):
        word = sentence[i]
        features = {
            "bias": 1.0,
            "word.lower()": word.lower(),
            "word[-3:]": word[-3:],
            "word[-2:]": word[-2:],
            "word.isupper()": word.isupper(),
            "word.isdigit()": word.isdigit(),
        }
        if i > 0:
            word1 = sentence[i - 1]
            features.update(
                {
                    "-1:word.lower()": word1.lower(),
                    "-1:word.isupper()": word1.isupper(),
                    "-1:word.isdigit()": word1.isdigit(),
                }
            )
        else:
            features["BOS"] = True

        if i < len(sentence) - 1:
            word1 = sentence[i + 1]
            features.update(
                {
                    "+1:word.lower()": word1.lower(),
                    "+1:word.isupper()": word1.isupper(),
                    "+1:word.isdigit()": word1.isdigit(),
                }
            )
        else:
            features["EOS"] = True

        return features

    def sentence_features(self, words):
        return [self.word2features(words, i) for i in range(len(words))]

    def sentence_labels(self, labels):
        return labels

    def process_sentence(self, sentence):
        tokens = re.findall(r"[\w']+|[.,!?;]", sentence)
        return {
            "words": tokens,
            "label": ["O"] * len(tokens),
        }

    def predict(self, text):
        text = preprocessing(text, True)
        processed_sentence = self.process_sentence(text)
        words = processed_sentence["words"]
        features = self.sentence_features(words)

        labels = self.model.predict([features])[0]

        result = {
            "words": words,
            "label": labels,
        }
        if self.is_order:
            return self.reformat_order_result(result)
        return self.reformat_customer_result(result)

    def reformat_order_result(self, predicted_result):
        output_dict = {}
        for index, (word, label) in enumerate(
            zip(predicted_result["words"], predicted_result["label"])
        ):
            if label == "O":
                continue
            if label not in output_dict:
                output_dict[label] = []
            output_dict[label].append((word, index))
        return output_dict

    def reformat_customer_result(self, predicted_result):
        aggregated_entities = {}
        current_entity = None
        current_label = None

        for word, label in zip(predicted_result["words"], predicted_result["label"]):
            if label.startswith("B-"):
                if current_entity is not None and current_label is not None:
                    if current_label in aggregated_entities:
                        aggregated_entities[current_label].append(
                            " ".join(current_entity)
                        )
                    else:
                        aggregated_entities[current_label] = [" ".join(current_entity)]

                current_entity = [word]
                current_label = label[2:]
            elif (
                label.startswith("I-")
                and current_entity is not None
                and label[2:] == current_label
            ):
                current_entity.append(word)
            else:
                if current_entity is not None and current_label is not None:
                    if current_label in aggregated_entities:
                        aggregated_entities[current_label].append(
                            " ".join(current_entity)
                        )
                    else:
                        aggregated_entities[current_label] = [" ".join(current_entity)]
                    current_entity = None
                    current_label = None
                if label == "O":
                    continue
                else:
                    aggregated_entities[label] = aggregated_entities.get(label, []) + [
                        word
                    ]

        if current_entity is not None and current_label is not None:
            if current_label in aggregated_entities:
                aggregated_entities[current_label].append(" ".join(current_entity))
            else:
                aggregated_entities[current_label] = [" ".join(current_entity)]

        return aggregated_entities


In [3]:
order_model = EntitiesRecognizer("../output/savedmodels/order_entity_v3_1.h5", True)
customer_info_model = EntitiesRecognizer("../output/savedmodels/customer_info_entity_v1.h5", False)

In [4]:
preprocessing("e cho t xin 3 xl pepperoni, 2 cái đế mỏng và 1 cái đế dày", True)

'xin 3 xl pepperoni 2 cái đế mỏng 1 cái đế dày'

In [12]:
a = order_model.predict("ê cho 2 hawaiian, 1 dày lớn với nấm rơm và ớt xanh, 1 cái mỏng vừa")
a

{'Quantity': [('2', 1), ('1', 3), ('1', 8)],
 'Pizza': [('hawaiian', 2)],
 'Crust': [('dày', 4), ('mỏng', 10)],
 'Size': [('lớn', 5), ('vừa', 11)],
 'Topping': [('nấm_rơm', 6), ('ớt_xanh', 7)]}

In [19]:
def create_cart_items(entities):
    def find_key_with_max_elements(data):
        priority_keys = ["Pizza", "Size", "Crust"]
        max_key = None
        max_length = 0

        for key, values in data.items():
            if key != "Topping":
                current_length = len(values)
                if current_length > max_length or (current_length == max_length and key in priority_keys):
                    if current_length > max_length or (max_key not in priority_keys or priority_keys.index(key) < priority_keys.index(max_key)):
                        max_length = current_length
                        max_key = key

        if "Size" in data and "Crust" in data and "Pizza" in data:
            size_length = len(data["Size"])
            crust_length = len(data["Crust"])
            pizza_length = len(data["Pizza"])
            if size_length == max_length and crust_length == max_length and pizza_length == max_length:
                return "Pizza"
            if size_length == max_length and crust_length == max_length:
                size_min_index = min([item[1] for item in data["Size"]])
                crust_min_index = min([item[1] for item in data["Crust"]])
                return "Size" if size_min_index < crust_min_index else "Crust"
        
        return max_key

    def split_entities_by_quantity_index(entities):
        has_quantity = True
        indices = [index for _, index in entities.get('Quantity', [])]
        if not indices:
            max_key = find_key_with_max_elements(entities)
            indices = [index for _, index in entities[max_key]]
            has_quantity = False

        ranges = [(indices[i], indices[i+1]) for i in range(len(indices) - 1)]
        ranges.append((indices[-1], -1))

        result = [{} for _ in ranges]

        for entity_type, values in entities.items():
            for value, index in values:
                for i, (start, end) in enumerate(ranges):
                    if (end == -1 and index >= start) or (start <= index < end):
                        if entity_type == "Topping":
                            if entity_type not in result[i]:
                                result[i][entity_type] = []
                            result[i][entity_type].append(value)
                        else:
                            result[i][entity_type] = value
                        break

        return result, has_quantity

    split_entities, has_quantity = split_entities_by_quantity_index(entities)
    current_pizza_index = None
    for split_entity_index in range(len(split_entities)):
        split_entity = split_entities[split_entity_index]
        split_entity["Quantity"] = int(split_entity["Quantity"])

        if "Pizza" in split_entity:
            current_pizza_index = split_entity_index
        elif "Pizza" not in split_entity and current_pizza_index is not None:
            split_entities[split_entity_index]["Pizza"] = split_entities[current_pizza_index]["Pizza"]
            if has_quantity:
                split_entities[current_pizza_index]["Quantity"] -= split_entities[split_entity_index]["Quantity"]
            if "Size" not in split_entity:
                split_entities[split_entity_index]["Size"] = split_entities[current_pizza_index]["Size"]
            if "Crust" not in split_entity:
                split_entities[split_entity_index]["Crust"] = split_entities[current_pizza_index]["Crust"]

        if not has_quantity:
            split_entities[split_entity_index]["Quantity"] = None
        if "Size" not in split_entity:
            split_entities[split_entity_index]["Size"] = None
        if "Crust" not in split_entity:
            split_entities[split_entity_index]["Crust"] = None
        if "Topping" not in split_entity:
            split_entities[split_entity_index]["Topping"] = []

    cart_items = []
    for cart_item in split_entities:
        if cart_item["Quantity"] and cart_item["Quantity"] > 0:
            cart_items.append(cart_item)
    return cart_items

create_cart_items(a)

[{'Quantity': 1,
  'Crust': 'dày',
  'Size': 'lớn',
  'Topping': ['nấm_rơm', 'ớt_xanh'],
  'Pizza': 'hawaiian'},
 {'Quantity': 1,
  'Crust': 'mỏng',
  'Size': 'vừa',
  'Pizza': 'hawaiian',
  'Topping': []}]

In [19]:
customer_info_model.predict("ship den pho thanh xuan cho t")

{'Address': ['thanh_xuân']}

In [16]:
import torch
import torch.nn as nn
import numpy as np
from transformers import AutoModel, AutoTokenizer
from utils.preprocessing import preprocessing

THRESHOLD = 0.99
MAX_LEN = 128
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")


class IntentsRecognizer(nn.Module):
    def __init__(self):
        super(IntentsRecognizer, self).__init__()
        self.phobert = AutoModel.from_pretrained("vinai/phobert-base")
        self.dropout = nn.Dropout(p=0.3)
        self.linear = nn.Linear(768, 9)
        self.intent_labels = [
            "view_menu",
            "view_cart",
            "add_to_cart",
            "remove_from_cart",
            "modify_cart_item",
            "confirm_order",
            "track_order",
            "cancel_order",
            "provide_info",
        ]
        self.intent_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base", use_fast=False)

    def forward(self, input_ids, attn_mask, token_type_ids):
        output = self.phobert(input_ids, attention_mask=attn_mask, token_type_ids=token_type_ids)
        output_dropout = self.dropout(output.pooler_output)
        output = self.linear(output_dropout)
        return output

    def predict(self, text):
        text = preprocessing(text, False)
        encoded_text = self.intent_tokenizer.encode_plus(
            text,
            max_length=MAX_LEN,
            add_special_tokens=True,
            return_token_type_ids=True,
            pad_to_max_length=True,
            return_attention_mask=True,
            return_tensors="pt",
        )
        input_ids = encoded_text["input_ids"].to(device)
        attention_mask = encoded_text["attention_mask"].to(device)
        token_type_ids = encoded_text["token_type_ids"].to(device)
        output = self(input_ids, attention_mask, token_type_ids)
        probabilities = torch.softmax(output, dim=-1).detach().cpu().numpy().flatten()

        max_prob = np.max(probabilities)
        max_prob_index = np.argmax(probabilities)

        print(probabilities)
        if max_prob >= THRESHOLD:
            return self.intent_labels[max_prob_index]
        else:
            return None

In [17]:
intent_model = IntentsRecognizer()
intent_model.load_state_dict(torch.load("../output/savedmodels/intents_v2.bin"))
intent_model.to(device)

IntentsRecognizer(
  (phobert): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(64001, 768, padding_idx=1)
      (position_embeddings): Embedding(258, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): L

In [27]:
intent_model.predict("e xoa 1 cai pepperoni na")

[5.91382050e-05 1.05170882e-04 1.97371322e-04 9.99201596e-01
 1.00157326e-04 7.44513309e-05 8.43253147e-05 1.04384693e-04
 7.33972411e-05]


'remove_from_cart'