In [1]:
#| default_exp core

In [11]:
#| hide
from nbdev.showdoc import *
import nbdev; nbdev.nbdev_export()

In [4]:
#| export
import scipy.sparse as sp, numpy as np
from tqdm.auto import tqdm
from typing import Dict

In [13]:
#| export
def load_raw_txt(fname:str, encoding:str='utf-8'):
    ids, raw_txt = [], []
    with open(fname, 'r', encoding=encoding) as file:
        for line in file:
            k, v = line[:-1].split('->', maxsplit=1)
            ids.append(k); raw_txt.append(v)
    return ids, raw_txt
    

In [14]:
#| export
def get_all_ids(raw_dir, encoding='utf-8'):
    trn_ids, _ = load_raw_txt(f'{raw_dir}/train.raw.txt', encoding=encoding)
    tst_ids, _ = load_raw_txt(f'{raw_dir}/test.raw.txt', encoding=encoding)
    lbl_ids, _ = load_raw_txt(f'{raw_dir}/label.raw.txt', encoding=encoding)
    return set(trn_ids + tst_ids + lbl_ids)
    

In [1]:
#| export
def filter_mapping(mapping, ids):
    return {id:mapping[id] for id in ids if id in mapping}
    

In [17]:
#| export
def create_vocab_and_item2idx(mapping):
    mapping_item2idx, vocab = dict(), dict()
    for k,v in tqdm(mapping.items()):
        for o in v:
            idx = vocab.setdefault(o, len(vocab))
            l = mapping_item2idx.setdefault(k, [])
            l.append(idx)
    return vocab, mapping_item2idx
    

In [3]:
#| export
def save_raw_txt(fname, ids, raw_txt, encoding='utf-8'):
    assert len(ids) == len(raw_txt), "Number of identifiers and elements in raw text should be the same."
    with open(fname, 'w', encoding=encoding) as file:
        for i,txt in zip(ids, raw_txt):
            file.write(f'{i}->{txt}\n')
            

In [11]:
#| export
def get_matrix_from_item2idx(mapping, vocab_size, ids=None):
    data, indices, indptr = [], [], [0]
    ids = list(mapping) if ids is None else ids
    for i in tqdm(ids):
        if i in mapping:
            item_idx = mapping[i]
            data.extend([1]*len(item_idx))
            indices.extend(item_idx)
        indptr.append(len(data))
    return sp.csr_matrix((data, indices, indptr), shape=(len(ids), vocab_size), dtype=np.int64), ids
    

In [2]:
#| export
def get_matrix_from_mapping(mapping, ids=None):
    if ids is not None:
        mapping = filter_mapping(mapping, ids)
    vocab, mapping_item2idx = create_vocab_and_item2idx(mapping)
    matrix, ids = get_matrix_from_item2idx(mapping_item2idx, len(vocab))
    return matrix, ids, vocab
    