In [1]:
from __future__ import division, print_function

import sys

In [2]:
sys.path.append('/home/jovyan/pysparknlp-1.0.0/')

In [3]:
from Bio import Entrez, Medline

import matplotlib.pyplot as plt
import nltk
import numpy as np
import pandas as pd
import pyspark
from pyspark.ml import Pipeline, feature as spark_ft, classification as spark_cls
from sklearn import metrics as skmetrics
import spacy
import sparknlp.annotators as sparknlp
import wordcloud

%matplotlib inline

In [4]:
spark = pyspark.sql.SparkSession.builder \
    .master('local[4]') \
    .appName('notebook') \
    .config('spark.jars', 'pysparknlp-1.0.0/lib/sparknlp.jar') \
    .config('spark.jars.packages', 'com.databricks:spark-xml_2.11:0.4.1') \
    .getOrCreate()

In [5]:
def query(terms, num_docs=1000):
    search_term = '+'.join(terms)
    print('Searching PubMed abstracts for documents containing term: ',search_term)
    handle = Entrez.esearch(db="pubmed", term=search_term, retmax=num_docs)
    record = Entrez.read(handle)
    handle.close()
    idlist = record["IdList"]
    
    handle = Entrez.efetch(db="pubmed", id=idlist, rettype="medline",retmode="text")
    records = Medline.parse(handle)
    data = []
    for record in records:
        data.append((record.get("TI", "?"),record.get("AU", "?"),record.get("SO", "?"),record.get("AB","?")))

    df = pd.DataFrame(data=data, columns=['Title','Authors','Source','Abstract'])
    df.head(10)

    df.replace(r'^\?$', np.nan, regex=True, inplace=True)
    df['Authors'] = df['Authors'].apply(lambda x: x if isinstance(x, list) else [])
    df.fillna('', inplace=True)
    df['Topic'] = search_term
    
    return spark.createDataFrame(df)

In [6]:
topics = [
    ['type', '1', 'diabetes'], 
    ['creutzfeldt', 'jakob', 'disease'], 
    ['post', 'traumatic', 'stress', 'disorder'],
    ['heart', 'disease']
]

In [7]:
texts = None

np.random.seed(123)
for terms in topics:
    num_docs = np.random.randint(200, 1000)
    print('terms', terms, 'num_docs', num_docs)
    if texts is None:
        texts = query(terms, num_docs)
    else:
        texts = texts.union(query(terms, num_docs))

terms ['type', '1', 'diabetes'] num_docs 710
Searching PubMed abstracts for documents containing term:  type+1+diabetes


Email address is not specified.

To make use of NCBI's E-utilities, NCBI requires you to specify your
email address with each request.  As an example, if your email address
is A.N.Other@example.com, you can specify it as follows:
   from Bio import Entrez
   Entrez.email = 'A.N.Other@example.com'
In case of excessive usage of the E-utilities, NCBI will attempt to contact
a user at the email address provided before blocking access to the
E-utilities.


terms ['creutzfeldt', 'jakob', 'disease'] num_docs 565
Searching PubMed abstracts for documents containing term:  creutzfeldt+jakob+disease
terms ['post', 'traumatic', 'stress', 'disorder'] num_docs 582
Searching PubMed abstracts for documents containing term:  post+traumatic+stress+disorder
terms ['heart', 'disease'] num_docs 522
Searching PubMed abstracts for documents containing term:  heart+disease


In [8]:
texts.count()

2379

In [9]:
texts.show()

+--------------------+--------------------+--------------------+--------------------+---------------+
|               Title|             Authors|              Source|            Abstract|          Topic|
+--------------------+--------------------+--------------------+--------------------+---------------+
|Presence of Human...|[Ericsson M, Skog O]|Pancreas. 2017 Se...|OBJECTIVES: The a...|type+1+diabetes|
|A Multicenter Rea...|[Anjana RM, Kesav...|Diabetes Technol ...|AIM: To assess th...|type+1+diabetes|
|A coordinated con...|[Herrero P, Bondi...|Comput Methods Bi...|Type 1 diabetes i...|type+1+diabetes|
|Association betwe...|[Jermendy A, Szat...|Pediatr Diabetes....|BACKGROUND: Infec...|type+1+diabetes|
|Prefronto-tempora...|[Yoon S, Kim J, M...|Pediatr Diabetes....|OBJECTIVE: Microv...|type+1+diabetes|
|Possible Long-Ter...|[Awata T, Shimada...|Diabetes Ther. 20...|INTRODUCTION: We ...|type+1+diabetes|
|The Elevated Rate...|[Magne F, Puchi S...|Front Pediatr. 20...|The current recom.

In [10]:
non_empty_texts = texts.where('Abstract != ""') \
    .withColumn('id', pyspark.sql.functions.monotonically_increasing_id())

In [11]:
non_empty_texts.count()

2148

In [12]:
non_empty_texts.show()

+--------------------+--------------------+--------------------+--------------------+---------------+---+
|               Title|             Authors|              Source|            Abstract|          Topic| id|
+--------------------+--------------------+--------------------+--------------------+---------------+---+
|Presence of Human...|[Ericsson M, Skog O]|Pancreas. 2017 Se...|OBJECTIVES: The a...|type+1+diabetes|  0|
|A Multicenter Rea...|[Anjana RM, Kesav...|Diabetes Technol ...|AIM: To assess th...|type+1+diabetes|  1|
|A coordinated con...|[Herrero P, Bondi...|Comput Methods Bi...|Type 1 diabetes i...|type+1+diabetes|  2|
|Association betwe...|[Jermendy A, Szat...|Pediatr Diabetes....|BACKGROUND: Infec...|type+1+diabetes|  3|
|Prefronto-tempora...|[Yoon S, Kim J, M...|Pediatr Diabetes....|OBJECTIVE: Microv...|type+1+diabetes|  4|
|Possible Long-Ter...|[Awata T, Shimada...|Diabetes Ther. 20...|INTRODUCTION: We ...|type+1+diabetes|  5|
|The Elevated Rate...|[Magne F, Puchi S...|Fro

In [13]:
label_indexer = spark_ft.StringIndexer(inputCol='Topic', outputCol='label')

In [14]:
label_indexer_model = label_indexer.fit(non_empty_texts)

In [15]:
label_deindexer = spark_ft.IndexToString(inputCol='prediction', outputCol='pred_label', 
                                         labels=label_indexer_model.labels)

In [16]:
train, test = label_indexer_model.transform(non_empty_texts).randomSplit(weights=[0.8, 0.2], seed=123)

In [17]:
def extract_tokens(annotations):
    return [a.metadata['token'] for a in annotations]
retType = pyspark.sql.types.ArrayType(pyspark.sql.types.StringType())

spark.udf.register('extract', f=extract_tokens, returnType=retType)

In [18]:
abstract_assembler = sparknlp.DocumentAssembler(inputCol='Abstract', outputCol='document')
title_assembler = sparknlp.DocumentAssembler(inputCol='Title', outputCol='document')

sentence_segmenter = sparknlp.SentenceDetectorModel(inputCols=['document'], outputCol='sentence')
    
tokenizer = sparknlp.RegexTokenizer(inputCols=['sentence'], outputCol='token')

stemmer = sparknlp.Stemmer(inputCols=['token'], outputCol='stem')

normalizer = sparknlp.Normalizer(inputCols=['stem'], outputCol='ntoken')

text_extractor = spark_ft.SQLTransformer(statement='''
SELECT Title, Authors, Source, Topic, label, extract(ntoken) AS text
FROM __THIS__
''')

title_extractor = spark_ft.SQLTransformer(statement='''
SELECT Authors, Source, Topic, label, text, extract(ntoken) AS title
FROM __THIS__
''')

stopWords = spark_ft.StopWordsRemover.loadDefaultStopWords('english')
sw_remover = spark_ft.StopWordsRemover(inputCol='text', outputCol='clean_text', stopWords=stopWords)

nlp_pipeline = Pipeline(stages=[sentence_segmenter, tokenizer, stemmer, normalizer])

abstract_pipeline = Pipeline(stages=[abstract_assembler, nlp_pipeline, text_extractor, sw_remover])
title_pipeline = Pipeline(stages=[title_assembler, nlp_pipeline, title_extractor])

preproc_pipeline = Pipeline(stages=[abstract_pipeline, title_pipeline])

In [19]:
processed = preproc_pipeline.fit(train).transform(train)

In [20]:
processed.show()

+--------------------+--------------------+---------------+-----+--------------------+--------------------+
|             Authors|              Source|          Topic|label|                text|               title|
+--------------------+--------------------+---------------+-----+--------------------+--------------------+
|                  []|                    |type+1+diabetes|  0.0|[educ, patient, i...|                  []|
|[Lindstrom C, Ama...|J Pediatr Nurs. 2...|type+1+diabetes|  0.0|[purpose, to, exp...|[mission, impossi...|
|[Anjana RM, Kesav...|Diabetes Technol ...|type+1+diabetes|  0.0|[aim, to, assess,...|[a, multicent, re...|
|[Ramkissoon CM, A...|IEEE Rev Biomed E...|type+1+diabetes|  0.0|[the, artifici, p...|[a, review, of, s...|
|[Hinojosa SL, Hei...|J Am Coll Nutr. 2...|type+1+diabetes|  0.0|[objective, the, ...|[a, studi, examin...|
|[Sun H, Han X, Ya...|Cell Immunol. 201...|type+1+diabetes|  0.0|[relat, studi, de...|[a, novel, mimovi...|
|[Jing YH, Qi CC, ...|Neural

In [21]:
text2vec = spark_ft.Word2Vec(
    vectorSize=100, minCount=5, seed=123, 
    inputCol='text', outputCol='text_vec', 
    windowSize=5, maxSentenceLength=30
)

title2vec = spark_ft.Word2Vec(
    vectorSize=50, minCount=3, seed=123, 
    inputCol='title', outputCol='title_vec', 
    windowSize=5, maxSentenceLength=10
)

assembler = spark_ft.VectorAssembler(inputCols=['text_vec', 'title_vec'], outputCol='features')

feature_pipeline = Pipeline(stages=[text2vec, title2vec, assembler])

In [22]:
features = feature_pipeline.fit(processed).transform(processed)

In [23]:
features.show()

+--------------------+--------------------+---------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|             Authors|              Source|          Topic|label|                text|               title|            text_vec|           title_vec|            features|
+--------------------+--------------------+---------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  []|                    |type+1+diabetes|  0.0|[educ, patient, i...|                  []|[-0.0057487625206...|          (50,[],[])|[-0.0057487625206...|
|[Lindstrom C, Ama...|J Pediatr Nurs. 2...|type+1+diabetes|  0.0|[purpose, to, exp...|[mission, impossi...|[-0.0097414343990...|[-0.0058479524107...|[-0.0097414343990...|
|[Anjana RM, Kesav...|Diabetes Technol ...|type+1+diabetes|  0.0|[aim, to, assess,...|[a, multicent, re...|[-0.0026577643452...|[-0.0105854638386

In [24]:
text2vec_model = text2vec.fit(processed)

In [25]:
text2vec_model.findSynonyms('obes', 10).show()

+--------------+------------------+
|          word|        similarity|
+--------------+------------------+
|    overweight|0.8971319198608398|
|       obesity|0.8856418132781982|
|     carbohydr|0.8844031095504761|
|           bil|0.8821799159049988|
|      sterilis|0.8780583143234253|
|        dysind|0.8773793578147888|
|     pancreata|0.8729743361473083|
|    bradykinin|0.8712244033813477|
|diabetesspecif| 0.871192991733551|
|           stz|0.8676058053970337|
+--------------+------------------+



In [26]:
text2vec_model.findSynonyms('trauma', 10).show()

+---------+------------------+
|     word|        similarity|
+---------+------------------+
|psycholog|0.8305636048316956|
|  traumat|0.8215893507003784|
| problems|0.8119535446166992|
|  violenc| 0.809847891330719|
|   predat|0.7966498732566833|
|    intim| 0.788361132144928|
|  exposur|0.7770265936851501|
|   deploy|0.7699192762374878|
|   improv|0.7689777612686157|
| violence| 0.766805112361908|
+---------+------------------+



In [27]:
mlpc = spark_cls.MultilayerPerceptronClassifier(
    maxIter=100, seed=123, layers=[150, 75, 4]
)

full_pipeline = Pipeline(stages=[preproc_pipeline, feature_pipeline, mlpc, label_deindexer])

In [28]:
model = full_pipeline.fit(train)

In [29]:
preds = model.transform(test)

In [30]:
preds.show()

+--------------------+--------------------+---------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+---------------+
|             Authors|              Source|          Topic|label|                text|               title|            text_vec|           title_vec|            features|prediction|     pred_label|
+--------------------+--------------------+---------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+---------------+
|                  []|                    |type+1+diabetes|  0.0|[few, medic, inte...|                  []|[0.03263338096829...|          (50,[],[])|[0.03263338096829...|       2.0|  heart+disease|
|                  []|                    |type+1+diabetes|  0.0|[thi, report, rev...|                  []|[-0.0115158061866...|          (50,[],[])|[-0.0115158061866...|       0.0|type+1+diabetes|
|[Hall B, 

In [31]:
pred_df = preds.select('title', 'text', 'label', 'prediction').toPandas()

In [32]:
pred_df.head()

Unnamed: 0,title,text,label,prediction
0,[],"[few, medic, intervent, have, had, a, great, a...",0.0,2.0
1,[],"[thi, report, review, the, scientif, evid, for...",0.0,0.0
2,[],"[object, the, aim, of, the, studi, wa, to, ass...",0.0,0.0
3,"[a, coordin, control, strategi, for, insulin, ...","[typ, diabet, i, an, autoimmun, condit, charac...",0.0,0.0
4,"[acoust, radiat, forc, impuls, elastographi, i...","[objective, the, aim, of, thi, studi, i, to, d...",0.0,0.0


In [33]:
list(enumerate(label_indexer_model.labels))

[(0, 'type+1+diabetes'),
 (1, 'post+traumatic+stress+disorder'),
 (2, 'heart+disease'),
 (3, 'creutzfeldt+jakob+disease')]

In [34]:
pd.DataFrame(
    data=skmetrics.confusion_matrix(pred_df['label'], pred_df['prediction']),
    columns=['pred ' + l for l in label_indexer_model.labels],
    index=['true ' + l for l in label_indexer_model.labels]
)

Unnamed: 0,pred type+1+diabetes,pred post+traumatic+stress+disorder,pred heart+disease,pred creutzfeldt+jakob+disease
true type+1+diabetes,112,0,12,1
true post+traumatic+stress+disorder,0,99,4,1
true heart+disease,5,1,69,1
true creutzfeldt+jakob+disease,0,0,3,91


In [35]:
print(skmetrics.classification_report(pred_df['label'], pred_df['prediction'], 
                                      target_names=label_indexer_model.labels))

                                precision    recall  f1-score   support

               type+1+diabetes       0.96      0.90      0.93       125
post+traumatic+stress+disorder       0.99      0.95      0.97       104
                 heart+disease       0.78      0.91      0.84        76
     creutzfeldt+jakob+disease       0.97      0.97      0.97        94

                   avg / total       0.94      0.93      0.93       399

