In [None]:
from multiprocessing import Pool as ProcessPool
from multiprocessing.util import Finalize

def init(tokenizer_class, db_class, db_opts):
    global PROCESS_TOK, PROCESS_DB
    PROCESS_TOK = tokenizer_class()  # initialize tokenizer
    Finalize(PROCESS_TOK, PROCESS_TOK.shutdown, exitpriority=100)  # used to exit tokenize process
    PROCESS_DB = db_class(**db_opts)  # connect to DB
    Finalize(PROCESS_DB, PROCESS_DB.close, exitpriority=100)   # use to exit process: close connection to DB

# 4 tokenizers to choose
tok_class = tokenizers.get_class(args.tokenizer)

workers = ProcessPool(
    args.num_workers,
    initializer=init,
    initargs=(tok_class, db_class, db_opts)
)

In [None]:
def count(ngram, hash_size, doc_id):
    """Fetch the text of a document and compute hashed ngrams counts."""
    global DOC2IDX
    row, col, data = [], [], []
    # Tokenize
    tokens = tokenize(retriever.utils.normalize(fetch_text(doc_id)))

    # Get ngrams from tokens, with stopword/punctuation filtering.
    ngrams = tokens.ngrams(
        n=ngram, uncased=True, filter_fn=retriever.utils.filter_ngram
    )
    print('----ngrams----')
    print(ngrams[:10])
    print('\n----number of grams before hash----')
    print(len(ngrams))

    # Hash ngrams and count occurences
    counts = Counter([retriever.utils.hash(gram, hash_size) for gram in ngrams])
    print('\n----number of grams after hash----')
    print(len(counts))
    

    # Return in sparse matrix data format.
    row.extend(counts.keys())
    col.extend([DOC2IDX[doc_id]] * len(counts))
    data.extend(counts.values())
    return row, col, data

In [None]:
def get_count_matrix(args, db, db_opts):
    """Form a sparse word to document count matrix (inverted index).

    M[i, j] = # times word i appears in document j.
    
    Return: sparce matrix with shape [hash_size, n_docs] ([hashed_ngram, docs])
    """
    # Map doc_ids to indexes
    global DOC2IDX
    db_class = retriever.get_class(db)  # class DocDB
    with db_class(**db_opts) as doc_db:
        doc_ids = doc_db.get_doc_ids()  # get all doc ids, (a list)
    DOC2IDX = {doc_id: i for i, doc_id in enumerate(doc_ids)}  # {doc_ids: numeric index}

    # Setup worker pool
    tok_class = tokenizers.get_class(args.tokenizer)   # 4 tokenizer to choose
    workers = ProcessPool(   # initialize Pool workers
        args.num_workers,
        initializer=init,
        initargs=(tok_class, db_class, db_opts)
    )

    # Compute the count matrix in steps (to keep in memory)
    logger.info('Mapping...')
    row, col, data = [], [], []
    step = max(int(len(doc_ids) / 10), 1)   # 10 is number_of_batchs, step is batch_size
    batches = [doc_ids[i:i + step] for i in range(0, len(doc_ids), step)]  # create batches
    _count = partial(count, args.ngram, args.hash_size)  # count function with bigram and 2**24 bins
    for i, batch in enumerate(batches):
        logger.info('-' * 25 + 'Batch %d/%d' % (i + 1, len(batches)) + '-' * 25)
        for b_row, b_col, b_data in workers.imap_unordered(_count, batch):   # count function apply to docs
            row.extend(b_row)
            col.extend(b_col)
            data.extend(b_data)
    workers.close()
    workers.join()

    logger.info('Creating sparse matrix...')
    count_matrix = sp.csr_matrix(
        (data, (row, col)), shape=(args.hash_size, len(doc_ids))  # sparse_matrix [hashed_ngram, docs]
    )
    count_matrix.sum_duplicates()
    return count_matrix, (DOC2IDX, doc_ids)

In [None]:
count_matrix, doc_dict = get_count_matrix(
        args, 'sqlite', {'db_path': args.db_path}
    )

Finalize: what to do when exiting process:
https://stackoverflow.com/questions/24717468/context-managers-and-multiprocessing-pools

In [None]:
def get_tfidf_matrix(cnts):
    """Convert the word count matrix into tfidf one.

    tfidf = log(tf + 1) * log((N - Nt + 0.5) / (Nt + 0.5))
    * tf = term frequency in document
    * N = number of documents
    * Nt = number of occurences of term in all documents
    """
    Ns = get_doc_freqs(cnts)  # Nt, one dimension array with shape [hash_size]
    idfs = np.log((cnts.shape[1] - Ns + 0.5) / (Ns + 0.5))  # [hash_size]
    idfs[idfs < 0] = 0
    idfs = sp.diags(idfs, 0)   # inverse document frequency with shape [hash_size, hash_size]
    tfs = cnts.log1p()   # term frequency with shape [hash_size, n_docs]
    tfidfs = idfs.dot(tfs)  # dot product: [hash_size, n_docs]
    return tfidfs

In [9]:
import scipy.sparse as sp
sp.diags([1,2,3], 0).toarray()

array([[1., 0., 0.],
       [0., 2., 0.],
       [0., 0., 3.]])

In [16]:
idfs = sp.diags([1,2,3], 0); print(idfs.shape)
tfs = np.random.rand(3,5); print(tfs.shape)
idfs.dot(tfs)

(3, 3)
(3, 5)


array([[0.37417581, 0.37549935, 0.20378499, 0.5638298 , 0.400142  ],
       [0.39012129, 0.80376494, 0.22833869, 0.91804787, 0.37156071],
       [1.66346211, 1.17767683, 2.08005168, 0.93570479, 1.85065382]])

In [21]:
# this can actually be achieved by element-wise multiplication
idfs = np.array([1,2,3])
tfs * idfs.reshape(3,1)

array([[0.37417581, 0.37549935, 0.20378499, 0.5638298 , 0.400142  ],
       [0.39012129, 0.80376494, 0.22833869, 0.91804787, 0.37156071],
       [1.66346211, 1.17767683, 2.08005168, 0.93570479, 1.85065382]])

In [18]:
idfs.toarray()

array([[1., 0., 0.],
       [0., 2., 0.],
       [0., 0., 3.]])

In [19]:
tfs

array([[0.37417581, 0.37549935, 0.20378499, 0.5638298 , 0.400142  ],
       [0.19506065, 0.40188247, 0.11416934, 0.45902393, 0.18578035],
       [0.55448737, 0.39255894, 0.69335056, 0.3119016 , 0.61688461]])

In [None]:
def get_doc_freqs(cnts):
    """Return word --> # of docs it appears in."""
    binary = (cnts > 0).astype(int)            # presence
    freqs = np.array(binary.sum(1)).squeeze()  # number of occurences of term in all documents
    return freqs   # one dimension array [hash_size]

In [12]:
matrix = sp.diags([1,2,3], 0)
matrix

<3x3 sparse matrix of type '<class 'numpy.float64'>'
	with 3 stored elements (1 diagonals) in DIAgonal format>

In [14]:
np.array((matrix>0).astype(int).sum(1)).squeeze()

array([1, 1, 1])

In [None]:
def save_sparse_csr(filename, matrix, metadata=None):
    data = {
        'data': matrix.data,
        'indices': matrix.indices,
        'indptr': matrix.indptr,
        'shape': matrix.shape,
        'metadata': metadata,
    }
    np.savez(filename, **data)


def load_sparse_csr(filename):
    loader = np.load(filename)
    matrix = sp.csr_matrix((loader['data'], loader['indices'],
                            loader['indptr']), shape=loader['shape'])
    return matrix, loader['metadata'].item(0) if 'metadata' in loader else None

In [None]:
# Counting words...
count_matrix, doc_dict = get_count_matrix(
    args, 'sqlite', {'db_path': args.db_path}
)

# 'Making tfidf vectors...'
tfidf = get_tfidf_matrix(count_matrix)

# 'Getting word-doc frequencies...'
freqs = get_doc_freqs(count_matrix)

basename = os.path.splitext(os.path.basename(args.db_path))[0]
basename += ('-tfidf-ngram=%d-hash=%d-tokenizer=%s' %
             (args.ngram, args.hash_size, args.tokenizer))
filename = os.path.join(args.out_dir, basename)

# 'Saving to %s.npz' % filename
metadata = {
    'doc_freqs': freqs,
    'tokenizer': args.tokenizer,
    'hash_size': args.hash_size,
    'ngram': args.ngram,
    'doc_dict': doc_dict
}
retriever.utils.save_sparse_csr(filename, tfidf, metadata)