In [1]:
!pip install pymongo
!pip install pycld2



In [2]:
from __future__ import print_function

import string
import re
import sys

import pycld2 as cld2

from multiprocessing.pool import Pool
from tqdm._tqdm_notebook import tqdm

from IPython.display import display, clear_output

import numpy as np
import pandas as pd

from os import getcwd, pardir
from os.path import join, basename

from utils.literature import DataLoader, get_document_title, is_english, get_section, get_sections
from utils.preprocessing import NLPPipeline, Tokenizer, Stemmer, ToLowercase, Lemmatizer, StopwordRemover, CitationRemover, SymbolRemover, ContentInBracketsRemover
from glob import glob

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, RegexpTokenizer
from nltk.stem.porter import PorterStemmer
from nltk.stem import WordNetLemmatizer #TODO: Try lemmatizer instead of stemmer

import pymongo
from pymongo import MongoClient as DBClient

nltk.download()

## Specification of the document paths

- download from: [Kaggle](https://www.kaggle.com/allen-institute-for-ai/CORD-19-research-challenge)
- extract folder content to `{ProjectDir}/dataset`

In [3]:
root_dir = join(pardir, 'dataset')

json_paths = [
    join(root_dir, 'arxiv', 'arxiv', 'pdf_json'),
    join(root_dir, 'arxiv', 'arxiv', 'pdf_json'),
    join(root_dir, 'comm_use_subset', 'comm_use_subset', 'pdf_json'),
    join(root_dir, 'noncomm_use_subset', 'noncomm_use_subset', 'pdf_json'),
    join(root_dir, 'custom_license', 'custom_license', 'pdf_json'),
    join(root_dir, 'biorxiv_medrxiv', 'biorxiv_medrxiv', 'pdf_json'),
]

files = []
[files.extend(glob(join(path, '*.json'))) for path in json_paths];

## Building the document index

Each document gets stored in the document_index collection with the following layout: 
```json
{
    _id: ..., 
    document_title: ...
}
```

## Building the inverted index

```json
{
    'stem': ...,
    'doc_ids': [
        {
            {'doc_id': ..., 
             'count': {
                 'title': ...,
                 'abstract': ...,
                 'body_text': ...}
             },
             {'doc_id': ..., 
             'count': {
                 'title': ...,
                 'abstract': ...,
                 'body_text': ...}
             }
        }
    ]
}
```

In [4]:
with DBClient('localhost', 27017, w=0) as client:
    db = client['covid_19']
    document_index_collection = db['document_index']
    inverted_index_collection = db['inverted_index']

    if '_id' not in document_index_collection.index_information():
        document_index_collection.create_index('_id')

    inverted_index_collection.create_index([("_id", pymongo.ASCENDING)], background=True)
    inverted_index_collection.create_index([("doc_ids.doc_id", pymongo.ASCENDING)], background=True, name='doc_ids.doc_id')

In [5]:
pipeline = NLPPipeline([
    ToLowercase(),
    CitationRemover(),
    ContentInBracketsRemover(),
    Tokenizer(),
    SymbolRemover(),
    StopwordRemover(),
    Stemmer()
])

In [6]:
def create_post(stem, doc_id, count_object):
    return {'_id': stem, 'doc_ids': [{'doc_id': doc_id, 'count': count_object}]}

In [7]:
count_object = {
    'title': {'title': 1, 'abstract': 0, 'body_text': 0},
    'abstract': {'title': 0, 'abstract': 1, 'body_text': 0},
    'body_text': {'title': 0, 'abstract': 0, 'body_text': 1}
}

In [8]:
def update_document_index(doc_id, document_title, document_index_collection):
    doc_id_entry = document_index_collection.find({'_id': doc_id}).limit(1)
    if doc_id_entry.count() == 0:
        document_index_collection.insert_one({'_id': doc_id, 'document_title': document_title})

In [9]:
def update_inverted_index(doc_id, stemmed_tokens, section, inverted_index_collection):
    #update inverted index
    for stem in stemmed_tokens:
        stem_entry = inverted_index_collection.find({'_id': stem}).limit(1)
        if stem_entry.count() > 0:
            #try to find current document id in stem entry
            doc_id_object = inverted_index_collection.find(
                {'_id': stem, 
                'doc_ids': {'$elemMatch': {'doc_id': doc_id}}}).limit(1)
            if doc_id_object.count() > 0:
                # update occurrence of stem in document section
                inverted_index_collection.update(
                    {'_id': stem, 
                    'doc_ids': {'$elemMatch': {'doc_id': doc_id}}},
                    {'$inc': {f'doc_ids.$.count.{section}': 1}})
            else:
                # add document id
                inverted_index_collection.update(
                    {'_id': stem},
                    {'$push': {'doc_ids': {'doc_id': doc_id, 'count': count_object[section]}}})
        else:
            post = create_post(stem, doc_id, count_object[section])
            inverted_index_collection.insert_one(post)

In [10]:
def process_chunk(args, update_doc_idx=True, update_inv_indx=True):
    with DBClient('localhost', 27017, w=0) as client:

        #open a connection to the database
        db = client['covid_19']
        document_index_collection = db['document_index']
        inverted_index_collection = db['inverted_index']
        
        #iterate over all documents in chunk
        for fpath, doc_id in args:
            doc_id = int(doc_id)
            data_loader = DataLoader(fpath)

            doc_title = get_document_title(fpath, data_loader)

            if doc_title == '' or not is_english(doc_title):
                continue

            #database should only contain english documents with an valid document title
            if update_doc_idx:
                update_document_index(
                    doc_id, 
                    doc_title, 
                    document_index_collection)

            if not update_inverted_index:
                continue
            
            for section in get_sections():
                text = get_section(fpath, section, dl=data_loader)
                stemmed_tokens = pipeline.transform(text)
                update_inverted_index(doc_id, stemmed_tokens, section, inverted_index_collection)

In [11]:
def create_chunks(files, chunk_size=128):
    doc_ids = list(range(len(files)))
    chunks = list()
    for i in range(0, len(files), chunk_size):
        indices = np.array(doc_ids[i: min(i+chunk_size, len(files))])
        chunks.append(list(zip(
            files[i: min(i+chunk_size, len(files))], 
            doc_ids[i: min(i+chunk_size, len(files))])))
    return chunks

In [12]:
pool = Pool()
chunks = create_chunks(files)

for _ in tqdm(pool.imap_unordered(process_chunk, chunks), total=len(chunks)):
    pass

pool.close()
pool.join()

HBox(children=(FloatProgress(value=0.0, max=385.0), HTML(value='')))

tobias/anaconda3/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/home/tobias/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/tobias/anaconda3/lib/python3.7/site-packages/pymongo/cursor.py", line 446, in limit
    self.__check_okay_to_chain()
  File "/home/tobias/anaconda3/lib/python3.7/site-packages/pymongo/collection.py", line 856, in _update_retryable
    _update, session)
  File "/home/tobias/anaconda3/lib/python3.7/site-packages/pymongo/collection.py", line 1600, in _count
    _cmd, self._read_preference_for(session), session)
  File "/home/tobias/anaconda3/lib/python3.7/site-packages/pymongo/collection.py", line 1594, in _cmd
    session=session)
  File "/home/tobias/anaconda3/lib/python3.7/site-packages/pymongo/collection.py", line 1600, in _count
    _cmd, self._read_preference_for(session), session)
  File "<ipython-input-10-c9df4c44c7e4

KeyboardInterrupt: 