# Berlin Beam Summit - NLP with beam and spacy

## Setup

This tutorial assumes you are running Python 3.7 please follow these steps to setup your environment: 
1. install requirements from `requirements.txt`
2. download the spacy large english model


In [1]:
# !pip install -r requirements.txt
# !python -m spacy download en_core_web_lg

## Import and initialise everything

In [1]:
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import spacy
from spacy import displacy
from spacy.pipeline import EntityRuler
from spacy.symbols import nsubj, dobj, VERB
from pathlib import Path
import ujson as json
import logging
from six import string_types


  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


Initialising the spacy language model:

In [3]:
nlp = spacy.load('en_core_web_lg')

Let's setup mock disease and drug entity recognition:

In [4]:
ruler = EntityRuler(nlp)
patterns = [{"label": "DRUG", "pattern": [{"lower": "aspirin"}]},
            {"label": "DISEASE", "pattern": "heart attacks"},
            {"label": "DISEASE", "pattern": "headaches"}]
ruler.add_patterns(patterns)
nlp.add_pipe(ruler)

Configuration for the spacy visualisations:

In [5]:
colors = {"DRUG": "hsla(100, 30%, 50%, 0.9)", "DISEASE": "salmon"}
options = {"ents": ["DRUG", "DISEASE"], "colors": colors, "font": "Source Sans Pro", "collapse_phrases": True}

In [6]:
text = "Aspirin treats headaches. Aspirin can help prevent heart attacks."
doc = nlp(text)
displacy.render(doc, style="ent", options=options)

In [7]:
doc = nlp("Aspirin treats headaches.")
svg = displacy.render(doc, style="dep")

In [8]:
doc = nlp("Aspirin can help prevent heart attacks.")
svg = displacy.render(doc, style="dep")

In [9]:
def extract_entities(nlp, text):
    doc = nlp(text)
    entities = [{"entity_text": entity.text, "label": entity.label_} for entity in doc.ents]
    entity_dict = {u"entities": entities, u"text": text}
    return entity_dict


class EntityExtraction(beam.DoFn):

    def __init__(self):
        self.nlp = None

    def process(self, element, *args, **kwargs):
        if element and isinstance(element, string_types):
            element = json.loads(element)
        try:
            entity_dict = extract_entities(self.nlp, element["text"])
            yield json.dumps(entity_dict)
        except Exception as e:
            logging.error("Can not extract entities from {}".format(element["text"]) + str(e))
            
    def start_bundle(self):
        """
        Lazy initialisation of spacy model
        """
        if self.nlp is None:
            self.nlp = spacy.load('en_core_web_lg')
            ruler = EntityRuler(nlp)
            patterns = [{"label": "DRUG", "pattern": [{"lower": "aspirin"}]},
                        {"label": "DISEASE", "pattern": [{"lower": "heart"}, {"lower": "attacks"}]},
                        {"label": "DISEASE", "pattern": [{"lower": "headaches"}]},
                       ]
            ruler.add_patterns(patterns)
            self.nlp.add_pipe(ruler)
    

In [10]:
input_json = ['{"text":"Aspirin treats headaches."}', '{"text":"Aspirin can help prevent heart attacks."}']

pipeline_options = PipelineOptions(
    project = "extract-drug-entities",
    job_name = "extractents",
    temp_location = "extractents/tmp/",
    runner = 'DirectRunner',)

output = "output_folder/"

pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
    json_collection = p | 'GetJSON' >> beam.Create(input_json)
    entities = json_collection | 'ExtractEntity' >> beam.ParDo(EntityExtraction())
    entities | 'WriteEntityJSON' >> beam.io.WriteToText(output,
                                                     file_name_suffix='ent.json')





In [11]:
p = Path("output_folder").glob('**/*ent.json')
files = [x for x in p if x.is_file()]

for f in files:
    with f.open() as data_file:
        print("Data in {}".format(f))
        for line in data_file:
            data = json.loads(line)
            print(data)

Data in output_folder/-00000-of-00001ent.json
{'entities': [{'entity_text': 'Aspirin', 'label': 'DRUG'}, {'entity_text': 'headaches', 'label': 'DISEASE'}], 'text': 'Aspirin treats headaches.'}
{'entities': [{'entity_text': 'Aspirin', 'label': 'DRUG'}, {'entity_text': 'heart attacks', 'label': 'DISEASE'}], 'text': 'Aspirin can help prevent heart attacks.'}


In [12]:
doc = nlp("Aspirin can help prevent heart attacks.")
svg = displacy.render(doc, style="dep")

head = None
relation = None
tail = None
doc = nlp(text)
for noun_chunk in doc.noun_chunks:
    if noun_chunk.root.dep == nsubj and noun_chunk.root.head.pos == VERB:
        relation = noun_chunk.root.head
        head = noun_chunk
    if noun_chunk.root.dep == dobj:
        tail = noun_chunk
    print(head, relation, tail)


Aspirin treats None
Aspirin treats headaches
Aspirin help headaches
Aspirin help heart attacks


In [2]:
def extract_triple(nlp, text):
    head = None
    relation = None
    tail = None
    doc = nlp(text)
    for noun_chunk in doc.noun_chunks:
        if noun_chunk.root.dep == nsubj and noun_chunk.root.head.pos == VERB:
            relation = noun_chunk.root.head
            head = noun_chunk
        if noun_chunk.root.dep == dobj:
            tail = noun_chunk
            
    print(head, relation, tail)
    if head and relation and tail:
        if head.ents or tail.ents:
            yield {"head": head.text, "relation": relation.text, "tail": tail.text}


class TripleExtraction(beam.DoFn):

    def __init__(self):
        self.nlp = None

    def process(self, element, *args, **kwargs):
        if element and isinstance(element, string_types):
            element = json.loads(element)
            print(element)
            try:
                for triple_dict in extract_triple(self.nlp, element["text"]):
                    print(triple_dict)
                    yield json.dumps(triple_dict)
            except Exception as e:
                logging.error("Can not extract entities from {}".format(element["text"]) + str(e))
            
    def start_bundle(self):
        """
        Lazy initialisation of spacy model
        """
        if self.nlp is None:
            self.nlp = spacy.load('en_core_web_lg')
            ruler = EntityRuler(self.nlp)
            patterns = [{"label": "DRUG", "pattern": [{"lower": "aspirin"}]},
                        {"label": "DISEASE", "pattern": [{"lower": "heart"}, {"lower": "attacks"}]},
                        {"label": "DISEASE", "pattern": [{"lower": "headaches"}]},
                       ]
            ruler.add_patterns(patterns)
            self.nlp.add_pipe(ruler)

In [3]:
input_json = ['{"text":"Aspirin treats headaches."}', '{"text":"Aspirin can help prevent heart attacks."}']

rel_pipeline_options = PipelineOptions(
    project = "extract-drug-triples",
    job_name = "extracttriples",
    temp_location = "extracttriples/tmp/",
    runner = 'DirectRunner',)

output = "output_folder/"

rel_pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=rel_pipeline_options) as rp:
    rel_json_collection = rp | 'GetRelJSON' >> beam.Create(input_json)
    triples = rel_json_collection | 'ExtractTriple' >> beam.ParDo(TripleExtraction())
    triples | 'WriteRelationJSON' >> beam.io.WriteToText(output,
                                                     file_name_suffix='relation.json')



{'text': 'Aspirin treats headaches.'}
Aspirin treats headaches
{'head': 'Aspirin', 'relation': 'treats', 'tail': 'headaches'}
{'text': 'Aspirin can help prevent heart attacks.'}
Aspirin help heart attacks
{'head': 'Aspirin', 'relation': 'help', 'tail': 'heart attacks'}


In [None]:
p = Path("output_folder").glob('**/*relation.json')
files = [x for x in p if x.is_file()]
print(files)

for f in files:
    with f.open() as data_file:
        print("Data in {}".format(f))
        for line in data_file:
            data = json.loads(line)
            print(data)