In [1]:
import os
os.path.abspath("")

'/home/yuxiangliao/PhD'

In [2]:
import os, sys
from loguru import logger

LOG_ROOT = os.path.abspath("./")
LOG_FILE = LOG_ROOT + "/logs/sr-3.log"

# Remove all handlers and reset stderr
logger.remove(handler_id=None)
logger.add(
    LOG_FILE,
    level="TRACE",
    mode="w",
    backtrace=False,
    diagnose=True,
    colorize=False,
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
)
logger.info("\r\n" + ">" * 29 + "\r\n" + ">>> New execution started >>>" + "\r\n" + ">" * 29)
# To filter log level: TRACE=5, DEBUG=10, INFO=20, SUCCESS=25, WARNING=30, ERROR=40, CRITICAL=50
logger.add(sys.stdout, level="INFO", filter=lambda record: record["level"].no < 40, colorize=True)
logger.add(sys.stderr, level="ERROR", backtrace=False, diagnose=True, colorize=True)

3

# Install Metamap

Follow the following instructions:
- Install Metamap2020: https://lhncbc.nlm.nih.gov/ii/tools/MetaMap/documentation/Installation.html 
- Install additional datasets (2022 Specialist Lexicon, 2022AA UMLS NLM Datasets): https://lhncbc.nlm.nih.gov/ii/tools/MetaMap/additional-tools/DataSetDownload.html

# Processing

## Check if the servers started
- taggerServer
- DisambiguatorServer

In [3]:
import os
cmd = 'ps -ef | grep java'
out = os.popen(cmd)
print(out.read())

yuxiang+   31170    1377  0 Jul08 ?        00:07:56 java -Dtaggerserver.port=1795 -DlexFile=/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/MedPost-SKR/data/lexDB.serial -DngramOne=/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/MedPost-SKR/data/ngramOne.serial -cp /home/yuxiangliao/PhD/UMLS/Metamap/public_mm/MedPost-SKR/Tagger_server/lib/taggerServer.jar:/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/MedPost-SKR/Tagger_server/lib/mps.jar taggerServer
yuxiang+   31218    1377  0 Jul08 ?        00:08:31 java -Xmx2g -Dserver.config.file=/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/WSD_Server/config/disambServer.cfg -classpath /home/yuxiangliao/PhD/UMLS/Metamap/public_mm/WSD_Server/lib/metamapwsd.jar:/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/WSD_Server/lib/utils.jar:/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/WSD_Server/lib/lucene-core-3.0.1.jar:/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/WSD_Server/lib/monq-1.1.1.jar:/home/yuxiangliao/PhD/UMLS/Metamap/public_mm/WSD_Server/lib/wsd.jar:/home/

## Check metamap human readable output

In [4]:
# import subprocess, shlex
# text =  "There is no focal consolidation, pleural effusion or pneumothorax.  Cardiomediastinal silhouette and hilar contours are otherwise unremarkable."
# input_command = f"echo -e {text}"
# input_process = subprocess.Popen(shlex.split(input_command), stdout=subprocess.PIPE)
# meta_command = "metamap -V NLM -Z 2022AA -A --silent -I"
# metamap_process = subprocess.Popen(shlex.split(meta_command), stdout=subprocess.PIPE, stdin=input_process.stdout)
# output, error = metamap_process.communicate()
# print(output.decode())

## Load data

In [5]:
import pandas as pd
REPORT_PATH = "/home/yuxiangliao/PhD/data/mimic_cxr_reports_core.json"
df = pd.read_json(REPORT_PATH,orient="records",lines=True)
print(df)

pid_list = df.loc[:,'pid'].to_list()
sid_list = df.loc[:,'sid'].to_list()
findings_list = df.loc[:,'findings'].to_list()
impression_list = df.loc[:,'impression'].to_list()
pfi_list = df.loc[:,'provisional_findings_impression'].to_list()
fai_list = df.loc[:,'findings_and_impression'].to_list()

DATA_SIZE = len(sid_list)

              pid        sid  \
0       p10000032  s50414267   
1       p10000032  s53189527   
2       p10000032  s53911762   
3       p10000032  s56699142   
4       p10000764  s57375967   
...           ...        ...   
227830  p19999442  s58708861   
227831  p19999733  s57132437   
227832  p19999987  s55368167   
227833  p19999987  s58621812   
227834  p19999987  s58971208   

                                                 findings  \
0       There is no focal consolidation, pleural effus...   
1       The cardiac, mediastinal and hilar contours ar...   
2       Single frontal view of the chest provided. \n ...   
3       The lungs are clear of focal consolidation, pl...   
4       PA and lateral views of the chest provided.   ...   
...                                                   ...   
227830  ET tube ends 4.7 cm above the carina.  NG tube...   
227831  The lungs are clear, and the cardiomediastinal...   
227832  There has been interval extubation and improve...   
22783

## Run multiprocessing in jupyter

Construct metama command

In [6]:
import subprocess, shlex
# Documentation: https://lhncbc.nlm.nih.gov/ii/tools/MetaMap/Docs/MM_2016_Usage.pdf
def get_metamap_command():
    command = format_command_arg("metamap")
    command += format_command_arg("-V NLM")                # Data Version: -V (--mm data version) [Base, USAbase, NLM]
    command += format_command_arg("-Z 2022AA")             # Knowledge Source: -Z (--mm data year)
    command += format_command_arg("-A")                    # Data Model: [-A (--strict model), -C (--relaxed model)]
    command += format_command_arg("--silent")              # Hide Header Output: --silent
    command += format_command_arg("--JSONn")               # Output format: [-q (--machine output), --XMLf, --XMLn, --XMLf1, --XMLn1, --JSONf, --JSONn, -N (--fielded mmi output), -F (--formal tagger output)]
    # command += " --conj"                                   # Turn on Conjunction Processing
    # command += " -y"                                       # Word-Sense Disambiguation: -y (--word sense disambiguation)
    # UDA_path = "/home/yuxiangliao/PhD/UMLS/custom-resources/custom-word-replacement"
    # command += format_command_arg(f"--UDA {UDA_path}")     # User-Defined Acronyms/Abbreviations (word replacement): --UDA <file>
    # semantic_types = "virs,cgab,acab,ffas,bpoc,medd,tmco,qlco,qnco,bsoj,blor,fndg,sosy,topp,ortf,patf,dsyn,inpo"
    # commend += f"-J {semantic_types}"                      # Retain only Concepts with Specified Semantic Types: -J (--restrict to sts) <list>
    # command += format_command_arg("-I")                    # For human readable output
    return command

def format_command_arg(arg):
    return " " + arg

def run_metamap(startIndex,batch_size):
    endIndex = startIndex + batch_size if startIndex + batch_size < DATA_SIZE else DATA_SIZE
    input_list = [(record if record else "None") for record in findings_list[startIndex:endIndex]]
    input = repr("\n\n".join(input_list))
    input_command = f"echo -e {input}"
    input_process = subprocess.Popen(shlex.split(input_command), stdout=subprocess.PIPE)
    
    meta_command = get_metamap_command()
    metamap_process = subprocess.Popen(shlex.split(meta_command), stdout=subprocess.PIPE, stdin=input_process.stdout)
   
    output_bytes, error_bytes = metamap_process.communicate()
    if error_bytes:
        logger.error(error_bytes.decode())
    return output_bytes.decode(), [startIndex,endIndex]

Data Object for Metamap

In [7]:
class Concept(object):
    def __init__(self, sourceTokens:list, startPosList:list, lengthList:list, umlsCUI:str, preferedName:str, hitTerm:str, categories:list, isHead:int, isNegated:int):
        self.sourceTokens = sourceTokens
        self.startPosList = startPosList
        self.lengthList = lengthList
        self.umlsCUI = umlsCUI
        self.preferedName = preferedName
        self.hitTerm = hitTerm
        self.categories = categories
        self.isHead =  isHead
        self.isNegated =  isNegated
        
class ConceptGroup(object):
    def __init__(self):
        self.concepts = []
    def addConcept(self, concept:Concept):
        self.concepts.append(concept)
        
class SyntaxChunk(object):
    def __init__(self, text:str, lexicalMatch:str, syntaxType:str, partOfSpeech:str, tokens:list):
        self.text = text # The original form of the text (case sensitive)
        self.lexicalMatch = lexicalMatch
        self.syntaxType = syntaxType
        self.partOfSpeech = partOfSpeech
        self.tokens = tokens
                
class Phrase(object):
    def __init__(self, text:str, startPos:int, length:int):
        self.text = text
        self.startPos = startPos
        self.length = length
        self.syntaxChunks = []
        self.mappings = []
    def addSyntaxChunk(self, syntaxChunk:SyntaxChunk):
        self.syntaxChunks.append(syntaxChunk)
    def addConceptGroup(self, conceptGroup:ConceptGroup):
        self.mappings.append(conceptGroup)
        
        
class Sentence(object):
    def __init__(self, text:str, startPos:int, length:int):
        self.text = text
        self.startPos = startPos
        self.length = length
        self.phrases = []
    def addPhrase(self, phrase:Phrase):
        self.phrases.append(phrase)

class Negation(object):
    def __init__(self, text:str, triStartPosList:list, triLengthList:list, conceptsCUIs:list, tarStartPosList:list, tarLengthList:list):
        self.trgger = {
            'text': text,
            'startPosList': triStartPosList,
            'lengthList': triLengthList,
        }
        self.tarrget = {
            'conceptsCUIs': conceptsCUIs,
            'startPosList': tarStartPosList,
            'lengthList': tarLengthList,
        }

class Section(object):
    def __init__(self, name:str):
        self.name = name
        self.text = "" # context
        self.sentences = []
        self.negations = []
    def addSentence(self, sentence:Sentence):
        self.sentences.append(sentence)
        self.text += sentence.text
    def addNegation(self, negation:Negation):
        self.negations.append(negation)
        
class Record(object):
    def __init__(self, sid:str):
        self.sid = sid
        self.sections = []
    def addSection(self, section:Section):
        self.sections.append(section)
    def getFindingSection(self) -> Section:
        assert self.sections[0].name == "findings"
        return self.sections[0]
        
class Records(object):
    def __init__(self):
        self.records = []
    def addRecord(self, record:Record):
        self.records.append(record)

Methods to resolve specific JSON subtags

In [8]:
def resolveSyntaxUnit(syntaxUnit):
    text = syntaxUnit['InputMatch']
    syntaxType = syntaxUnit['SyntaxType']
    tokens = syntaxUnit['Tokens']
    # Add punc to token list
    if not tokens:
        logger.trace(f"Empty token detected: SyntaxType:{syntaxType}, InputMatch:{text}")
        tokens = [text]
    try:
        lexicalMatch = syntaxUnit['LexMatch']
        partOfSpeech = syntaxUnit['LexCat']
    except KeyError:
        lexicalMatch = ""
        partOfSpeech = ""
    if text.lower() != lexicalMatch and text.isalnum():
        logger.trace(f"text:[{text}], lexicalMatch:[{lexicalMatch}]")
    return SyntaxChunk(text, lexicalMatch, syntaxType, partOfSpeech, tokens)

def resolveConcept(mappingCandidate):
    sourceTokens = mappingCandidate['MatchedWords']
    startPosList = [int(i['StartPos']) for i in mappingCandidate['ConceptPIs']]
    lengthList = [int(i['Length']) for i in mappingCandidate['ConceptPIs']]
    umlsCUI = mappingCandidate['CandidateCUI']
    preferedName = mappingCandidate['CandidatePreferred']
    hitTerm = mappingCandidate['CandidateMatched']
    categories = mappingCandidate['SemTypes']
    isHead = 1 if mappingCandidate['IsHead'] == "yes" else 0
    isNegated = 1 if mappingCandidate['Negated'] == "1" else 0
    return Concept(sourceTokens, startPosList, lengthList, umlsCUI, preferedName, hitTerm, categories, isHead, isNegated)

def resolveNegation(negation):
    trigger = negation['NegTrigger']
    triggerStartPosList = [int(i['StartPos']) for i in negation['NegTriggerPIs']]
    triggerLengthList = [int(i['Length']) for i in negation['NegTriggerPIs']]
    conceptCUIs = [i['NegConcCUI'] for i in negation['NegConcepts']]
    targetStartPosList = [int(i['StartPos']) for i in negation['NegConcPIs']]
    targetLengthList = [int(i['Length']) for i in negation['NegConcPIs']]
    return Negation(trigger, triggerStartPosList, triggerLengthList, conceptCUIs, targetStartPosList, targetLengthList)
    

Method to resolve JSON format output

In [9]:
def parseMetamapJSON(json_obj,id_subList) -> Records:
    records = Records()
    for _idx, _document in enumerate(json_obj['AllDocuments']):
        # print(_document.keys())
        # print(record['Document']['Negations'])
        record = Record(id_subList[_idx])
        section = Section("findings")
        for _utterance in _document['Document']['Utterances']:
            # print(_utterance.keys())
            sentence = Sentence(text=_utterance['UttText'], startPos=int(_utterance['UttStartPos']), length=int(_utterance['UttLength']))
            for _phrase in _utterance['Phrases']:
                # print(_phrase.keys())
                phrase = Phrase(text=_phrase['PhraseText'], startPos=int(_phrase['PhraseStartPos']), length=int(_phrase['PhraseLength']))
                for _syntaxUnit in _phrase['SyntaxUnits']:
                    # print(_syntaxUnit.keys())
                    syntaxChunk = resolveSyntaxUnit(_syntaxUnit)
                    phrase.addSyntaxChunk(syntaxChunk)
                for _mapping in _phrase['Mappings']:
                    # print(_mapping.keys())
                    conceptGroup = ConceptGroup()
                    for _mappingCandidate in _mapping['MappingCandidates']:
                        # print(_mappingCandidate.keys())
                        concept = resolveConcept(_mappingCandidate)
                        conceptGroup.addConcept(concept)
                    phrase.addConceptGroup(conceptGroup)
                sentence.addPhrase(phrase)
            section.addSentence(sentence)
        for _negation in _document['Document']['Negations']:
            negation = resolveNegation(_negation)
            section.addNegation(negation)
        record.addSection(section)
        records.addRecord(record)
    return records

Methods to align the metamap output to the spacy output

In [10]:
from spacy.tokens import Doc
from operator import itemgetter

def align(tokNum, inputTokenGroups):
    alignment= [-1] * tokNum
    for id, tokenGroup in enumerate(inputTokenGroups):
        alignment[tokenGroup.start:tokenGroup.end] = [id] * (tokenGroup.end-tokenGroup.start)
    return alignment

def align_byIndex(tokNum, inputIndexGroups):
    alignment= [-1] * tokNum
    for id, indexGroup in enumerate(inputIndexGroups):
        alignment[indexGroup[0]:indexGroup[-1]+1] = [id] * len(indexGroup)
    return alignment

def align_byIndex_individually_withData_noOverlap(tokNum, inputIndexGroups_withData):
    alignment= [-1] * tokNum
    for id, indexGroup_withData in enumerate(inputIndexGroups_withData):
        indexGroup = indexGroup_withData['indices']
        extra_str = indexGroup_withData['extra_str']
        for index in indexGroup:
            alignment[index] = f"{id}|{extra_str}"
    return alignment

def align_byIndex_individually_withData(tokNum, inputIndexGroups_withData):
    alignment= [-1] * tokNum
    for indexGroup_withData in inputIndexGroups_withData:
        indexGroup = indexGroup_withData['indices']
        extra_str = indexGroup_withData['extra_str']
        for index in indexGroup:
            if alignment[index] == -1:
                alignment[index] = [extra_str]
            else:
                alignment[index].append(extra_str) 
    return alignment

def getTokenOffset(baseText:str, inputTokens):
    startPos = 0
    offset= []
    for token in inputTokens:
        offsetPos = baseText.find(token.text, startPos, len(baseText))
        offset.append(offsetPos)
        startPos = offsetPos + len(token.text)
    return offset

def resolveTokenIndices_byPosition(tokenOffset, startPos, length) -> list:
    indicesList = []
    doInsert = False
    posPointer = startPos
    for i, currPos in enumerate(tokenOffset):
        nextPos = tokenOffset[i+1] if i + 1 < len(tokenOffset) else tokenOffset[i] + 99
        if not doInsert and posPointer >= currPos and posPointer < nextPos:
            doInsert = True
            posPointer = startPos + length - 1
        elif doInsert and posPointer < currPos: 
            break # break the loop in advance, othewise will stop when finish the loop.
        if doInsert:
            indicesList.append(i)
    return indicesList

def resolveTokenIndices_byPosition_multiToken(tokenOffset, startPosList, lengthList) -> list:
    idxList_3d = [resolveTokenIndices_byPosition(tokenOffset, startPos, length) for startPos, length in zip(startPosList, lengthList)]
    idxList_flatten = [idx for idxList in idxList_3d for idx in idxList]
    return idxList_flatten

def trimIndices(_indices, keepNum):
    interval = []
    for id, current in enumerate(_indices):
        if id == len(_indices)-1:
            break
        nextid = id+1
        next = _indices[nextid]
        interval.append(next-current)
    interval_withIdx = list(enumerate(interval))
    trimed_list = sorted(interval_withIdx,key=itemgetter(1))[0:keepNum-1]
    idx_remained = set()
    for i in trimed_list:
        idx_remained.add(i[0])
        idx_remained.add(i[0]+1)
    return [_indices[i] for i in idx_remained]

def replPunc(matchObj):
    if matchObj.string == matchObj.group(0):
        return matchObj.string
    else:
        return ""
    
def findSubString(sourceText,subStr,subStr_tokens,begin):
    sourceText = sourceText.lower()
    startPos = sourceText.find(subStr.lower(),begin)
    if startPos != -1:
        return startPos, len(subStr)
    else: 
        # Sometimes metamap will rewrite the text, making the subStr differ to the source text.
        # In this case, we use token.
        if subStr_tokens:
            subStr_tokens = [i.lower() for i in subStr_tokens]
            startPos = sourceText.find(subStr_tokens[0],begin)
            assert startPos!=-1
            nextStartPos = startPos + len(subStr_tokens[0])
            for token in subStr_tokens[1:]:
                nextStartPos = sourceText.find(token,nextStartPos)
                nextStartPos += len(token)
            assert nextStartPos-startPos>0
            return startPos, nextStartPos-startPos
        else:
            return begin, 0

In [11]:
import jsonpickle

def classToJSON(obj) -> str:

    return jsonpickle.encode(obj,unpicklable=False)

Format metamap outputs so that it can be aligned to spacy tokens

In [12]:
def formatMetamapRecord(df_base, metamapRecord):
    reportText = metamapRecord.getFindingSection().text
    phrases = [phrase for sentence in metamapRecord.getFindingSection().sentences for phrase in sentence.phrases]
    tokenOffset = df_base.loc[:,SPACY_COLUMN_NAME['token_offset']].tolist()
    phraseIdxGroups = []
    syntaxChunkIdxGroups_withData = []
    conceptIdxGroup_withData = []
    negTriggerGroups_withData = []
    negTargetGroups_withData = []
    conceptGroupId = 0
    negationGroupId = 0
    offsetBegin = 0
    for phrase in phrases:
        phraseIdxList = resolveTokenIndices_byPosition(tokenOffset, phrase.startPos, phrase.length)
        phraseIdxGroups.append(phraseIdxList)
        for syntaxChunk in phrase.syntaxChunks:
            startPos, length = findSubString(reportText,syntaxChunk.text,syntaxChunk.tokens,offsetBegin)
            offsetBegin = startPos + length
            syntaxChunkIdxGroups_withData.append({
                "indices": resolveTokenIndices_byPosition(tokenOffset, startPos, length),
                "extra_str": f"{syntaxChunk.syntaxType}|{syntaxChunk.partOfSpeech}|{syntaxChunk.tokens}"
            })
        for conceptGroup in phrase.mappings:
            for concept in conceptGroup.concepts:
                conceptIdxList_flatten = resolveTokenIndices_byPosition_multiToken(tokenOffset, concept.startPosList, concept.lengthList)
                conceptIdxGroup_withData.append({
                    "indices":conceptIdxList_flatten,
                    "extra_str":f"{conceptGroupId}|{concept.umlsCUI}|{concept.preferedName}({concept.hitTerm})|{','.join(concept.categories)}|{concept.isHead}|{concept.isNegated}",
                })
            conceptGroupId += 1
    for negation in metamapRecord.getFindingSection().negations:
        negTriggerIdxList_flatten = resolveTokenIndices_byPosition_multiToken(tokenOffset, negation.trgger['startPosList'], negation.trgger['lengthList'])
        negTargetIdxList_flatten = resolveTokenIndices_byPosition_multiToken(tokenOffset, negation.tarrget['startPosList'], negation.tarrget['lengthList'])
        negTriggerGroups_withData.append({
            "indices": negTriggerIdxList_flatten,
            "extra_str":f"{negationGroupId}|{','.join([str(i) for i in negTargetIdxList_flatten])}|{','.join(negation.tarrget['conceptsCUIs'])}"
        })
        negTargetGroups_withData.append({
            "indices": negTargetIdxList_flatten,
            "extra_str":f"{negationGroupId}|{','.join([str(i) for i in negTriggerIdxList_flatten])}|{','.join(negation.tarrget['conceptsCUIs'])}"
        })
        negationGroupId += 1
    return phraseIdxGroups, syntaxChunkIdxGroups_withData, conceptIdxGroup_withData, negTriggerGroups_withData, negTargetGroups_withData

Run

In [13]:
SPACY_PREFIXX = "[sp]"
SPACY_COLUMN_NAME = {
    'token': SPACY_PREFIXX+'token',
    'token_offset':SPACY_PREFIXX+'token_offset',
    'sentence_group':SPACY_PREFIXX+'sentence_group',
    'noun_chunk':SPACY_PREFIXX+'noun_chunk',
    'lemma':SPACY_PREFIXX+'lemma',
    'pos_core':SPACY_PREFIXX+'pos_core',
    'pos_feature':SPACY_PREFIXX+'pos_feature',
    'dependency':SPACY_PREFIXX+'dependency',
    'dependency_head':SPACY_PREFIXX+'dependency_head',
    'dependency_children':SPACY_PREFIXX+'dependency_children',
    'morphology':SPACY_PREFIXX+'morphology',
    'is_alpha':SPACY_PREFIXX+'is_alpha',
    'is_stop':SPACY_PREFIXX+'is_stop',
    'is_pronoun':SPACY_PREFIXX+'is_pronoun',
    'trailing_space':SPACY_PREFIXX+'trailing_space',
}
METAMAP_PREFIXX = "[mm]"
METAMAP_COLUMN_NAME = {
    'phrase': METAMAP_PREFIXX+'metamap_phrase',
    'syntax_chunk': METAMAP_PREFIXX+'syntax_chunk|syntax_type|pos',
    'concept': METAMAP_PREFIXX+'concept_group|CUI|prefered_name(hit_synonym)|categories|isHead|isNegated',
    'neg_trigger': METAMAP_PREFIXX+'negation_group|target_token_indices|target_CUI',
    'negated_target': METAMAP_PREFIXX+'negation_group|trigger_token_indices|target_CUI'
}

In [14]:
CORENLP_CUSTOM_PROPS = {
    'annotators':'tokenize, ssplit, pos, lemma, ner, depparse, coref',
    "coref.algorithm": "statistical"
}

STANZA_PROCESSOR_DICT = {
    'tokenize': 'mimic', 
    'pos': 'mimic', 
    'lemma': 'mimic',
    'depparse': 'mimic',
    # 'sentiment':'sstplus', # Sentiment scores of 0, 1, or 2 (negative, neutral, positive).
    'constituency': 'wsj', # wsj, wsj_bert, wsj_roberta
    'ner': 'radiology',
}

WORKERS_IN_POOL = 3

BATCH_SIZE = 2
DATA_START_POS = 0
DATA_END_POS = 2

TEXT_LIST = findings_list

Multi processing

In [15]:
import time, random
import spacy
from multiprocessing import Process

SPACY_NLP = spacy.load("en_core_web_md", disable=['ner'])

class SpacyProcess(Process):
    def __init__(self,input_pipe, input_text_list, input_sid_list):
        super().__init__()
        self.id = random.randint(100,999)
        self.input_pipe = input_pipe
        self.input_text_list = input_text_list
        self.input_sid_list = input_sid_list
    def run(self):
        print(f"Spacy Process {self.id} started.")
        time0 = time.time()
        text_tuples = [(text,{"sid":sid}) for text,sid in zip(self.input_text_list,self.input_sid_list)]
        spacy_output_list = []
        for doc, context in SPACY_NLP.pipe(text_tuples, as_tuples=True):
            sid = context['sid']
            spacy_output_list.append({'sid':sid,'doc':doc})
        time1 = time.time()
        print(f"Spacy Process {self.id} finished in {time1-time0}s")
        self.input_pipe.send(spacy_output_list)

In [16]:
import json

class MetamapPrcess(Process):
    def __init__(self,input_pipe, input_text_list, input_sid_list):
        super().__init__()
        self.id = random.randint(100,999)
        self.input_pipe = input_pipe
        self.input_text_list = input_text_list
        self.input_sid_list = input_sid_list
    def run(self):
        print(f"Metamap Process {self.id} started.")
        time0 = time.time()
        
        input_list = self.input_text_list
        input = repr("\n\n".join(input_list))
        input_command = f"echo -e {input}"
        input_process = subprocess.Popen(shlex.split(input_command), stdout=subprocess.PIPE)
        
        meta_command = get_metamap_command()
        metamap_process = subprocess.Popen(shlex.split(meta_command), stdout=subprocess.PIPE, stdin=input_process.stdout)
    
        output_bytes, error_bytes = metamap_process.communicate()
        if error_bytes:
            logger.error(error_bytes.decode())
            
        metamap_output = output_bytes.decode()
        metamap_json_output = list(metamap_output.split("\n"))[1] # Only the second line is the required JSON string.
        metamap_json_obj = json.loads(metamap_json_output)
        metamap_records_obj = parseMetamapJSON(metamap_json_obj, input_sid_list)
        
        time1 = time.time()
        print(f"Metamap Process {self.id} finished in {time1-time0}s")
        self.input_pipe.send(metamap_records_obj)

In [21]:
def batch_processing(input_text_list, input_sid_list):
    batch_id = random.randint(100,999)
    print(f"Batch Process {batch_id} started")
    startTime = time.time()
    
    # PROCESSOR_NAME = {'spacy':'spacy','metamap':'metamap','corenlp':'corenlp','stanza':'stanza'}
    batch_data = dict.fromkeys(input_sid_list,{}) # Dict: Key = sid, Value = {df_[name]:DataFrame, ...}
    
    # We create three sub processors as below, as each of them take 3s to process a batch of 10 records
    # 1. For Spacy
    spacy_outPipe, spacy_inPipe = Pipe(False)
    spacy_process = SpacyProcess(spacy_inPipe, input_text_list, input_sid_list)
    spacy_process.start()
    # 2. For Metamap
    metamap_outPipe, metamapinPipe = Pipe(False)
    metamap_process = MetamapPrcess(metamapinPipe, input_text_list, input_sid_list)
    metamap_process.start()
    # 3. For Stanza
    
    # 4. For CoreNLP
    
    # The integration process
    # Spacy
    spacy_output_list = spacy_outPipe.recv() # Wait until receive a result, thus no need for p.join()
    time1 = time.time()
    for output in spacy_output_list:
        sid = output['sid']
        doc = output['doc']
        df_spacy = pd.DataFrame({
                SPACY_COLUMN_NAME['token']: [tok.text for tok in doc],
                SPACY_COLUMN_NAME['token_offset']: getTokenOffset(doc.text, doc),
                SPACY_COLUMN_NAME['sentence_group']: align(len(doc), doc.sents),
                SPACY_COLUMN_NAME['noun_chunk']: align(len(doc), doc.noun_chunks),
                SPACY_COLUMN_NAME['lemma']: [tok.lemma_ for tok in doc],
                SPACY_COLUMN_NAME['pos_core']: [f"[{tok.pos_}]{spacy.explain(tok.pos_)}" for tok in doc],
                SPACY_COLUMN_NAME['pos_feature']: [f"[{tok.tag_}]{spacy.explain(tok.tag_)}" for tok in doc],
                SPACY_COLUMN_NAME['dependency']: [f"[{tok.dep_}]{spacy.explain(tok.dep_)}" for tok in doc],
                SPACY_COLUMN_NAME['dependency_head']: [tok.head.text for tok in doc],
                SPACY_COLUMN_NAME['dependency_children']: [[child for child in tok.children] for tok in doc],
                SPACY_COLUMN_NAME['morphology']: [tok.morph for tok in doc],
                SPACY_COLUMN_NAME['is_alpha']: [tok.is_alpha for tok in doc],
                SPACY_COLUMN_NAME['is_stop']: [tok.is_stop for tok in doc],
                SPACY_COLUMN_NAME['is_pronoun']: [True if tok.pos_ == 'PRON' else False for tok in doc],
                SPACY_COLUMN_NAME['trailing_space']: [True if tok.whitespace_ else False for tok in doc],
            })
        batch_data[sid] = {'df_spacy':df_spacy}
        print(f"sid:{sid}, df_spacy.shape:{df_spacy.shape}")
    time2 = time.time()
    print(f"Finished processing Spacy, cost: {time2-time1}s")
    
    # Metamap
    metamapRecords = metamap_outPipe.recv()
    time3 = time.time()
    
    # Metamap
    for metamapRecord in metamapRecords.records:
        sid = metamapRecord.sid
        df_base = batch_data[sid]['df_spacy']
        df_rowsNum = df_base.shape[0]
        phraseInfo, syntaxChunkInfo, conceptInfo, negTriggerInfo, negTargetInfo = formatMetamapRecord(df_base, metamapRecord)
        df_metamap = pd.DataFrame({
            METAMAP_COLUMN_NAME['phrase']: align_byIndex(df_rowsNum, phraseInfo),
            METAMAP_COLUMN_NAME['syntax_chunk']: align_byIndex_individually_withData_noOverlap(df_rowsNum, syntaxChunkInfo),
            METAMAP_COLUMN_NAME['concept']: align_byIndex_individually_withData(df_rowsNum,conceptInfo),
            METAMAP_COLUMN_NAME['neg_trigger']: align_byIndex_individually_withData(df_rowsNum,negTriggerInfo),
            METAMAP_COLUMN_NAME['negated_target']: align_byIndex_individually_withData(df_rowsNum,negTargetInfo),
        })
        batch_data[sid] = {'df_metamap':df_metamap}
        print(f"sid:{sid}, df_metamap.shape:{df_metamap.shape}")
    time4 = time.time()
    print(f"Finished processing Spacy, cost: {time4-time3}s")
    
    endTime = time.time()
    print(f"Batch Process {batch_id} finished in {endTime-startTime}s, processed: {input_sid_list}")
    return "Done"

In [22]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Process, Pool, Pipe, Lock
from IPython.display import display, HTML
import json, time, random, os
import stanza
from stanza.server import CoreNLPClient
from stanza.pipeline.core import DownloadMethod

# We create a batch of records at each time and submit a task
all_task = []
executor = ProcessPoolExecutor(max_workers=WORKERS_IN_POOL)
for startIndex in range(DATA_START_POS, DATA_END_POS, BATCH_SIZE):
    # Construct batch data
    endIndex = startIndex + BATCH_SIZE if startIndex + BATCH_SIZE < DATA_SIZE else DATA_SIZE
    input_text_list = [(text if text else "None") for text in TEXT_LIST[startIndex:endIndex]]
    input_sid_list = [i for i in sid_list[startIndex:endIndex]]
    
    # Submit the task for one batch
    all_task.append(executor.submit(batch_processing,input_text_list,input_sid_list))

# When a submitted task finished, the output is received here.
for future in as_completed(all_task):
    output = future.result()
    print(f"Result from batch_processing: {output}")

print(f"End")
        

Batch Process 207 started
Spacy Process 971 started.
Metamap Process 840 started.
Spacy Process 971 finished in 0.018230915069580078s




Metamap Process 840 finished in 1.4533309936523438s
sid:s50414267, df_spacy.shape:(61, 15)
sid:s53189527, df_spacy.shape:(51, 15)
Finished processing Spacy, cost: 0.004683017730712891s
sid:s50414267, df_metamap.shape:(61, 5)
sid:s53189527, df_metamap.shape:(51, 5)
Finished processing Spacy, cost: 0.003422975540161133s
Batch Process 207 finished in 1.98738431930542s, processed: ['s50414267', 's53189527']
Result from batch_processing: Done
End
