## Spark Pipeline Intro

In this notebook, we will discuss how to search documents based on the Disease described in the CVD tree.

In [1]:
import pandas as pd
import json
from neo4j import GraphDatabase
import csv

#### Authentication to access covidgraph.org graph

In [2]:
covid_browser = "https://db.covidgraph.org/browser/"
covid_url = "bolt://db.covidgraph.org:7687"
user = "public"
password = "corona"

#driver = GraphDatabase.driver(uri, auth=(user, password))
driver = GraphDatabase.driver(uri = covid_url,\
                              auth = (user,password))

##### Example of a paper node in the covid graph

In [42]:
paper_query = "MATCH (n:Paper) RETURN n LIMIT 1"
Data = []
with driver.session() as session:
    info = session.run(paper_query)
    for item in info:
        print(item)

<Record n=<Node id=2385529 labels={'Paper'} properties={'cord_uid': 'ocp6yodg', 'cord19-fulltext_hash': 'b8957d48b6bcf17b7b51e004d19314ce77f653a1', 'journal': 'BMC Infect Dis', 'publish_time': '2011-12-28', 'source': 'PMC', 'title': 'Timeliness of contact tracing among flight passengers for influenza A/H1N1 2009', '_hash_id': '84b069ab23fb0ecebe6925af9c2b18ae', 'url': 'https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3265549/'}>>


#### Fragments collection

In [94]:
TEXT = []
query = "MATCH (p:Paper)-[:PAPER_HAS_BODYTEXTCOLLECTION]-(:BodyTextCollection)\
        -[:BODYTEXTCOLLECTION_HAS_BODYTEXT]-(:BodyText)-[:HAS_FRAGMENT]\
        -(f:Fragment)-[:MENTIONS]->(g:GeneSymbol) RETURN f.text limit 100"

with driver.session() as session:
    info = session.run(query)
    for item in info:
        TEXT.append(item.values())

#### Spark-Pipeline

In [95]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as fun
from pyspark.sql.types import *

In [96]:
from sparknlp.base import DocumentAssembler, Finisher

In [97]:
packages = ','.join([
    "com.johnsnowlabs.nlp:spark-nlp_2.11:2.5.1",
])

spark_conf = SparkConf()
spark_conf = spark_conf.setAppName('spark1')
spark_conf = spark_conf.setAppName('master[*]')
spark_conf = spark_conf.set("spark.jars.packages", packages)

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

In [98]:
schema = StructType([
    StructField('text', StringType()),
])
texts_df = spark.createDataFrame(TEXT, schema)

In [99]:
texts_df.show()

+--------------------+
|                text|
+--------------------+
|SARS and viral he...|
|Other nation's he...|
|MHS Kennemerland ...|
|Requests for cont...|
|Requests for cont...|
|In case of Schiph...|
|The CIb verifies ...|
|The MHS of the ai...|
|For tracing forei...|
|The other interva...|
|For the 17 comple...|
|After acceptance ...|
|Interval III of t...|
|Interval III of t...|
|Overall delay in ...|
|Although one migh...|
|Of the 21 request...|
|This random-effec...|
|This random-effec...|
|For influenza B, ...|
+--------------------+
only showing top 20 rows



In [100]:
texts_df.show(n=5, truncate=100, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------
 text | SARS and viral hemorrhagic fevers)                                                                   
-RECORD 1----------------------------------------------------------------------------------------------------
 text | Other nation's health authorities will make a request to the CIb in case they diagnosed a patient... 
-RECORD 2----------------------------------------------------------------------------------------------------
 text | MHS Kennemerland then completes contact details through booking offices or using other search met... 
-RECORD 3----------------------------------------------------------------------------------------------------
 text | Requests for contact tracing to the CIb for Dutch index patients originate from any Dutch MHS whi... 
-RECORD 4----------------------------------------------------------------------------------------------------
 text | Re

In [101]:
texts_df.limit(5).toPandas()

Unnamed: 0,text
0,SARS and viral hemorrhagic fevers)
1,Other nation's health authorities will make a ...
2,MHS Kennemerland then completes contact detail...
3,Requests for contact tracing to the CIb for Du...
4,Requests for contact tracing to the CIb for Du...


In [102]:
from sparknlp.pretrained import PretrainedPipeline

In [103]:
pipeline = PretrainedPipeline('explain_document_ml', lang='en')

explain_document_ml download started this may take some time.
Approx size to download 9.4 MB
[OK!]


In [104]:
pipeline.annotate('Hellu wrold!')

{'document': ['Hellu wrold!'],
 'lemmas': ['Hilo', 'world', '!'],
 'pos': ['NNP', 'NN', '.'],
 'sentence': ['Hellu wrold!'],
 'spell': ['Hilo', 'world', '!'],
 'stems': ['hilo', 'world', '!'],
 'token': ['Hellu', 'wrold', '!']}

In [105]:
texts_df.printSchema()

root
 |-- text: string (nullable = true)



In [106]:
procd_texts_df = pipeline.annotate(texts_df, 'text')

In [107]:
procd_texts_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- sentence: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true

In [108]:
procd_texts_df.show(n=2)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|            document|            sentence|               token|               spell|              lemmas|               stems|                 pos|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|SARS and viral he...|[[document, 0, 33...|[[document, 0, 33...|[[token, 0, 3, SA...|[[token, 0, 3, SA...|[[token, 0, 3, SA...|[[token, 0, 3, sa...|[[pos, 0, 3, NNP,...|
|Other nation's he...|[[document, 0, 16...|[[document, 0, 16...|[[token, 0, 4, Ot...|[[token, 0, 4, Ot...|[[token, 0, 4, Ot...|[[token, 0, 4, ot...|[[pos, 0, 4, JJ, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--

In [109]:
procd_texts_df.show(n=2, truncate=100, vertical=True)

-RECORD 0--------------------------------------------------------------------------------------------------------
 text     | SARS and viral hemorrhagic fevers)                                                                   
 document | [[document, 0, 33, SARS and viral hemorrhagic fevers), [sentence -> 0], []]]                         
 sentence | [[document, 0, 33, SARS and viral hemorrhagic fevers), [sentence -> 0], []]]                         
 token    | [[token, 0, 3, SARS, [sentence -> 0], []], [token, 5, 7, and, [sentence -> 0], []], [token, 9, 13... 
 spell    | [[token, 0, 3, SARS, [confidence -> 1.0, sentence -> 0], []], [token, 5, 7, and, [confidence -> 1... 
 lemmas   | [[token, 0, 3, SARS, [confidence -> 1.0, sentence -> 0], []], [token, 5, 7, and, [confidence -> 1... 
 stems    | [[token, 0, 3, sar, [confidence -> 1.0, sentence -> 0], []], [token, 5, 7, and, [confidence -> 1.... 
 pos      | [[pos, 0, 3, NNP, [word -> SARS], []], [pos, 5, 7, CC, [word -> and], []], [

In [110]:
from sparknlp import Finisher
finisher = Finisher()
finisher = finisher
# taking the lemma column
finisher = finisher.setInputCols(['lemmas'])
# seperating lemmas by a single space
finisher = finisher.setAnnotationSplitSymbol(' ')
finished_texts_df = finisher.transform(procd_texts_df)
finished_texts_df.show(n=1, truncate=100, vertical=True)

-RECORD 0----------------------------------------------------
 text            | SARS and viral hemorrhagic fevers)        
 finished_lemmas | [SARS, and, viral, hemorrhagic, fever, )] 
only showing top 1 row



In [112]:
finished_texts_df.select('finished_lemmas').take(10)

[Row(finished_lemmas=['SARS', 'and', 'viral', 'hemorrhagic', 'fever', ')']),
 Row(finished_lemmas=['Other', "nation's", 'health', 'authority', 'will', 'make', 'a', 'request', 'to', 'the', 'CIb', 'in', 'case', 'they', 'diagnose', 'a', 'patient', 'which', 'arrive', 'at', 'Schiphol', 'airport', 'for', 'transit', 'while', 'be', 'infectious']),
 Row(finished_lemmas=['MHS', 'Kennemerland', 'then', 'complete', 'contact', 'detail', 'through', 'book', 'office', 'or', 'use', 'other', 'search', 'method']),
 Row(finished_lemmas=['request', 'for', 'contact', 'trace', 'to', 'the', 'CIb', 'for', 'Dutch', 'index', 'patient', 'originate', 'from', 'any', 'Dutch', 'MHS', 'which', 'identify', 'a', 'patient', 'who', 'travel', 'by', 'plane', 'while', 'be', 'contagious', 'for', 'an', 'infectious', 'disease', 'which', 'require', 'contact', 'trace']),
 Row(finished_lemmas=['request', 'for', 'contact', 'trace', 'to', 'the', 'CIb', 'for', 'Dutch', 'index', 'patient', 'originate', 'from', 'any', 'Dutch', 'MHS', '