In [147]:
import torch.utils.data as D
import torch
import numpy as np
import json
import csv
import glob
import concurrent.futures

from functools import lru_cache
from collections import OrderedDict
from types import SimpleNamespace
from collections.abc import Iterable
from tqdm import tqdm_notebook as tqdm
from pytorch_transformers import BertTokenizer

In [22]:
class Schemas(object):

    def __init__(self, filepath):
        with open(filepath) as f:
            self.index = {}
            for schema in json.load(f):
                service_name = schema["service_name"]
                self.index[service_name] = schema

    def get_service_desc(self, service):
        return self.index[service]["description"]

    @lru_cache(maxsize=None)
    def get_slot_desc(self, service, slot):
        for item in self.index[service]["slots"]:
            if item["name"] == slot:
                return item["description"]

    @lru_cache(maxsize=None)
    def get_intent_desc(self, service, intent):
        for item in self.index[service]["intents"]:
            if item["name"] == intent:
                return item["description"]

    @lru_cache(maxsize=None)
    def get(self, service):
        result = dict(
            # service
            service_name=service,
            service_desc=self.index[service]["description"],
            
            # slots
            slot_name=[],
            slot_desc=[],
            slot_iscat=[], 
            slot_vals=[], # collected only for cat slots.. not sure if that makes sense

            # intents
            intent_name=[],
            intent_desc=[],
            intent_istrans=[],
            intent_reqslots=[],
            intent_optslots=[],
            intent_optvals=[],
        )

        for slot in self.index[service]["slots"]:
            result["slot_name"].append(slot["name"])
            result["slot_desc"].append(slot["description"])
            result["slot_iscat"].append(slot["is_categorical"])
            result["slot_vals"].append(slot["possible_values"])
        
        for intent in self.index[service]["intents"]:
            result["intent_name"].append(intent["name"])
            result["intent_desc"].append(intent["description"])
            result["intent_istrans"].append(intent["is_transactional"])
            result["intent_reqslots"].append(intent["required_slots"])
            result["intent_optslots"].append(list(intent["optional_slots"].keys()))
            result["intent_optvals"].append(list(intent["optional_slots"].values()))

        return result    

In [39]:
class Vocab:
    
    def __init__(self, specials=["PAD", "SOS", "EOS", "UNK"]):
        self.token2index = OrderedDict()
        self.index2token = OrderedDict()
        for s in specials:
            index = self.add(s)
            setattr(self, s, index)
    
    def __len__(self):
        return len(self.token2index)
    
    def __iter__(self):
        for index, token in self.index2token.items():
            yield index, token
            
    def get_token(self, index):
        return self.index2token.get(index)
    
    def get_index(self, token):
        return self.token2index.get(token)
    
    def add(self, token):
        if token not in self.token2index:
            index = len(self.token2index)
            self.token2index[token] = index
            self.index2token[index] = token
            return index
        
    def save(self, filename):
        with open(filename, "w", newline="") as f:
            writer = csv.writer(f, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
            for index, token in self:
                writer.writerow([index, token])
    
    @classmethod
    def load(cls, filename):
        vocab = cls(specials=[])
        with open(filename, newline="") as f:
            reader = csv.reader(f, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
            for index, token in reader:
                vocab.add(token)
                assert vocab.get_index(token) == int(index)
        return vocab

In [169]:
class Tokenizer:
    
    def __init__(self, bert):
        self.bert = bert
        
    def __call__(self, text, include_sos=True):
        tokens = self.bert.tokenize(text)
        if include_sos:
            tokens.insert(0, "[CLS]")
            tokens.append("[SEP]")
        return tokens
    
    
class TokenIndexer:
    
    def __init__(self, bert):
        self.bert = bert
        
    def __call__(self, *args, **kw):
        return self.bert.convert_tokens_to_ids(*args, **kw)
    
    
bert_ = BertTokenizer.from_pretrained("bert-base-uncased")
tokenizer = Tokenizer(bert_)
token_indexer = TokenIndexer(bert_)

In [41]:
def label_binarize(labels, classes):
    # labels: np.array or tensor [batch, classes]
    # classes: [..] list of classes
    # weirdly,`sklearn.preprocessing.label_binarize` returns [1] or [0]
    # instead of onehot ONLY when executing in this script!
    vectors = [np.zeros(len(classes)) for _ in classes]
    for i, label in enumerate(labels):
        for j, c in enumerate(classes):
            if c == label:
                vectors[i][j] = 1
    return np.array(vectors)
    

def label_inv_binarize(vectors, classes):
    # labels: np.array or tensor [batch, classes]
    # classes: [..] list of classes
    # follows sklearn LabelBinarizer.inverse_transform()
    # given all zeros, predicts label at index 0, instead of returning none!
    # sklearn doesn't have functional API of inverse transform
    labels = []
    for each in vectors:
        index = np.argmax(each)
        labels.append(classes[index])
    return labels

In [99]:
def padded_array(array, value=0):
    # TODO: this does not do type checking.
    # expects array to have fixed _number_ of dimensions
    # resolve the shape of padded array
    shape_index = {}
    queue = [(array, 0)]
    while queue:
        subarr, dim = queue.pop(0)
        shape_index[dim] = max(shape_index.get(dim, -1), len(subarr))
        for x in subarr:
            if isinstance(x, Iterable):
                queue.append((x, dim+1))
    shape = [shape_index[k] for k in range(max(shape_index) + 1)]

    padded = np.ones(shape) * value
    queue = [(array, [])]
    while queue:
        subarr, index = queue.pop(0)
        for j, x in enumerate(subarr):
            if isinstance(x, Iterable):
                queue.append((x, index + [j]))
            else:
                padded[tuple(index + [j])] = x
    return padded

In [163]:
class DialogueDataset(D.Dataset):

    def __init__(self, filename, schemas, tokenizer, token_indexer):
        with open(filename) as f:
            self.ds = json.load(f)
        self.schemas = schemas
        self.tokenizer = tokenizer
        self.token_indexer = token_indexer
        self.dialogues = []
        self.default_padding = 0
        for dial in self.ds:
            fields = self.text_to_fields(dial)
            self.dialogues.append(fields)
        # cant' pickle these, and not required too
        self.tokenizer = None
        self.token_indexer = None
        self.schemas = None
    
    def __getitem__(self, idx):
        return self.dialogues[idx]
    
    def __len__(self):
        return len(self.dialogues)

    def field_dialogue_id(self, dialogue):
        return {"value": dialogue["dialogue_id"]}

    def field_turn_speaker(self, turnid, dialogue):
        return {"value": dialogue["turns"][turnid]["speaker"]}
    
    def field_turn_utter(self, turnid, dialogue):
        text = dialogue["turns"][turnid]["utterance"]
        tokens = self.tokenizer(text)
        token_indices = self.token_indexer(tokens)
        token_mask = [1] * len(tokens)
        return {"value": text, "tokens": tokens, "ids": token_indices, "mask": token_mask}
    
    def field_turn_sys_utter(self, turnid, dialogue):
        turn = dialogue["turns"][turnid]
        if turn["speaker"] == "SYSTEM":
            return self.field_turn_utter(turnid, dialogue)
        
    def field_turn_usr_utter(self, turnid, dialogue):
        turn = dialogue["turns"][turnid]
        if turn["speaker"] == "USER":
            return self.field_turn_utter(turnid, dialogue)

    def field_service(self, dialogue):
        return {"value": dialogue["services"]}

    def field_service_desc(self, dialogue):
        resp = dict(
            value=[],
            tokens=[],
            ids=[],
            mask=[]
        )
        for service in dialogue["services"]:
            desc = self.schemas.get_service_desc(service)
            resp["value"].append(desc)
            resp["tokens"].append(self.tokenizer(desc))
            resp["ids"].append(self.token_indexer(resp["tokens"][-1]))
            resp["mask"].append([1] * len(resp["tokens"][-1]))
        return resp

    def field_turn_service_exist(self, turnid, dialogue):
        turn = dialogue["turns"][turnid]
        services = dialogue["services"]
        # order frames by dialog.services list, to establish one to one mappings across fields
        sorted_frames = sorted(turn["frames"], key=lambda x: services.index(x["service"]))
        exists_onehot = label_binarize([f["service"] for f in sorted_frames], classes=services)
        exists = np.sum(exists_onehot, axis=0) # eg: [1, 0, 1, 0]
        return {"ids": exists, "padding": -1}
        
    def field_intent(self, dialogue):
        return {"value": [self.schemas.get(s)["intent_name"] for s in dialogue["services"]]}

    def field_intent_desc(self, dialogue):
        resp = dict(
            value=[],
            tokens=[],
            ids=[],
            mask=[]
        )
        for service in dialogue["services"]:
            s_desc = [d for d in self.schemas.get(service)["intent_desc"]]
            s_tokens = [self.tokenizer(d) for d in s_desc]
            s_ids = [self.token_indexer(d) for d in s_tokens]
            s_mask = [[1] * len(d) for d in s_tokens]
            resp["value"].append(s_desc)
            resp["tokens"].append(s_tokens)
            resp["ids"].append(s_ids)
            resp["mask"].append(s_mask)
        return resp

    def field_turn_intent_exist(self, turnid, dialogue):
        turn = dialogue["turns"][turnid]
        if turn["speaker"] == "USER":
            # maintain order of services; onehot per service
            exists_onehot = OrderedDict()
            for service in dialogue["services"]:
                exists_onehot[service] = None
            
            # fill encodings of existing services
            # this _will_ be onehot assuming each service has only one intent!
            for frame in turn["frames"]:
                service = frame["service"]
                all_intents = self.schemas.get(service)["intent_name"]
                intent = frame["state"]["active_intent"]
                encoding = label_binarize([intent], classes=all_intents)[0]
                exists_onehot[service] = encoding
            
            # fill with empty encodings for remaining
            for service in exists_onehot:
                if exists_onehot[service] is None:
                    all_intents = self.schemas.get(service)["intent_name"]
                    encoding = np.array([0] * len(all_intents))
                    exists_onehot[service] = encoding

            return {"ids": list(exists_onehot.values()), "padding": -1}

    def field_turn_intent_changed(self, turnid, dialogue):
        turn = dialogue["turns"][turnid]
        if turn["speaker"] == "USER":
            # assumes system turn is always followed by user turn
            prev_user_turn = dialogue["turns"][turnid-2] if turnid >= 2 else turn
            # maintain order of services: service -> changed
            intent_changed = OrderedDict()
            for service in dialogue["services"]:
                intent_changed[service] = 0
            
            for frame, prevframe in zip(turn["frames"], prev_user_turn["frames"]):
                service = frame["service"]
                intent = frame["state"]["active_intent"]
                prev_intent = prevframe["state"]["active_intent"]
                intent_changed[service] = int(intent == prev_intent)
            
            values = list(intent_changed.values())
            return {"ids": values, "padding": -1}

    def field_slots(self, dialogue):
        slot_list = []
        for service in dialogue["services"]:
            slots = self.schemas.get(service)["slot_name"]
            slot_list.append(slots)
        return {"value": slot_list}

    def field_slots_desc(self, dialogue):
        resp = dict(
            value=[],
            tokens=[],
            ids=[],
            mask=[]
        )
        for service in dialogue["services"]:
            s_desc = [d for d in self.schemas.get(service)["slot_desc"]]
            s_tokens = [self.tokenizer(d) for d in s_desc]
            s_ids = [self.token_indexer(d) for d in s_tokens]
            s_mask = [[1] * len(d) for d in s_tokens]
            resp["value"].append(s_desc)
            resp["tokens"].append(s_tokens)
            resp["ids"].append(s_ids)
            resp["mask"].append(s_mask)
        return resp

    def field_slots_iscat(self, dialogue):
        iscat_list = []
        for service in dialogue["services"]:
            iscat = [int(i) for i in self.schemas.get(service)["slot_iscat"]]
            iscat_list.append(iscat)
        return {"ids": iscat_list, "padding": -1}

    def field_num_turns(self, dialogue):
        return {"value": len(dialogue["turns"])}

    def field_turn_num_frames(self, turnid, dialogue):
        return {"value": len(dialogue["turns"][turnid]["frames"])}

    def text_to_fields(self, dialogue):
        """
        fields = dict(
            dialogue_id=None, # [Batch,]
            num_turns=None, # [Batch,]
            num_frames=[], # [Batch, Turn] equals to number of services per turn

            # messages
            speaker=[], # [Batch, Turn]
            utter=[], # [Batch, Turn, Tokens]
            sys_utter=[], # [Batch, Turn, Tokens] only system utters
            usr_utter=[], # [Batch, Turn, Tokens] only user utters

            # services
            service=None, # [Batch, Service] all dialog services
            service_desc=None, # [Batch, Service, Tokens] service descriptions
            service_exist=[], # [Batch, Turn, Service] binarized
            
            # intents
            intent=None, # [Batch, Service, Intent]
            intent_desc=None, # [Batch, Service, Intent, Tokens]
            intent_exist=[], # [Batch, Turn, Service, Intent]
            intent_changed=[], # [Batch, Turn, Service]

            # state slots
            slots=None, # [Batch, Service, Slot]
            slots_desc=None, # [Batch, Service, Slot, Tokens]
            slots_iscat=None, # [Batch, Service, Slot]
        )
        """
        fields = {}
        
        # filter the field names in the instance
        dial_field_funcs = []
        turn_field_funcs = []
        for attr in dir(self):
            if attr.startswith("field_turn_"):
                turn_field_funcs.append(attr)
            elif attr.startswith("field_"):
                dial_field_funcs.append(attr)
        
        # fill dialogue level fields
        for func in dial_field_funcs:
            name = func.split("field_", maxsplit=1)[-1]
            resp = getattr(self, func)(dialogue)
            resp["padding"] = resp.get("padding", self.default_padding)
            fields[name] = resp

        # fill turn level fields
        for turnid in range(len(dialogue["turns"])):
            for func in turn_field_funcs:
                name = attr.split("field_turn_", maxsplit=1)[-1]
                resp = getattr(self, func)(turnid, dialogue) or {}
                if name not in fields:
                    fields[name] = {"padding": resp.get("padding", self.default_padding)}
                for k, v in resp.items():
                    if k != "padding":
                        fields[name][k] = fields[name].get(k, [])
                        fields[name][k].append(v)
        
        # combine the turn field ids and mask.. with default padding or the one given by func resp
        for name, data in fields.items():
            padding_value = data["padding"]
            for attr in ["ids", "mask"]:
                if attr in data:
                    data[attr] = padded_array(data[attr], padding_value)
            
        return fields
    
schemas = Schemas("../data/train/schema.json")
ds = DialogueDataset("../data/train/dialogues_001.json", schemas, tokenizer, token_indexer)

In [164]:
def dialogue_mini_batcher(dialogues):
    batch = {}
    for dial in dialogues:
        # populate the batch
        for field, data in dial.items():
            if field not in batch:
                batch[field] = {}
            for attr, val in data.items():
                if attr == "padding":
                    batch[field][attr] = val
                else:
                    batch[field][attr] = batch[field].get(attr, [])
                    batch[field][attr].append(val)

    # padding on field attributes
    for field_name, data in batch.items():
        for attr in ["ids", "mask"]:
            if attr in data:
                data[attr] = padded_array(data[attr], data["padding"])
                data[attr] = torch.tensor(data[attr], device="cpu")
    
    return batch

In [166]:
# dial_sets = []
# for filename in tqdm(glob.glob("../data/train/dialogues*.json")):
#     ds = DialogueDataset(filename, schemas, tokenizer, token_indexer)
#     dial_sets.append(ds)

In [172]:
dial_sets = []
dial_files = glob.glob("../data/train/dialogues*.json")

def create_dataset(filename):
    return DialogueDataset(filename, schemas, tokenizer, token_indexer)

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for ds in tqdm(executor.map(create_dataset, dial_files), total=len(dial_files)):
        dial_sets.append(ds)

HBox(children=(IntProgress(value=0, max=127), HTML(value='')))

In [173]:
train_ds = D.ConcatDataset(dial_sets)
torch.save(train_ds, "../data/preprocessed/train_ds2.pkl")