In [1]:
import json

#from google.colab import files
#
#license_keys = files.upload()
#
#with open(list(license_keys.keys())[0]) as f:
#    license_keys = json.load(f)

with open('spark_license.json', 'r') as license:
    license_keys = json.load(license)

In [2]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
    print('You are using a low-RAM runtime!')
else:
    print('You are using a high-RAM runtime!')

Your runtime has 37.8 gigabytes of available RAM

You are using a high-RAM runtime!


In [3]:
%%capture
for k,v in license_keys.items(): 
    %set_env $k=$v

!wget https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/jsl_colab_setup.sh
!bash jsl_colab_setup.sh

! pip install spark-nlp-display

# for Spark 2.4.x and Spark NLP 2.x, do the following
# !wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/colab_setup.sh
# !bash colab_setup.sh -p 2.4.x -s 2.7.x

In [4]:
# if you want to start the session with custom params as in start function above
def start(secret):
    builder = SparkSession.builder \
        .appName("Spark NLP Licensed") \
        .master("local[*]") \
        .config("spark.driver.memory", "32G") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000M") \
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:"+version) \
        .config("spark.jars", "https://pypi.johnsnowlabs.com/"+secret+"/spark-nlp-jsl-"+jsl_version+".jar")
      
    return builder.getOrCreate()

#spark = start(secret)

In [5]:
import json
import os
from pyspark.ml import Pipeline,PipelineModel
from pyspark.sql import SparkSession

from sparknlp.annotator import *
from sparknlp_jsl.annotator import *
from sparknlp.base import *
import sparknlp_jsl
import sparknlp

params = {"spark.driver.memory":"32G",
"spark.kryoserializer.buffer.max":"2000M",
"spark.driver.maxResultSize":"2000M"}

spark = sparknlp_jsl.start(license_keys['SECRET'],params=params)

print ("Spark NLP Version :", sparknlp.version())
print ("Spark NLP_JSL Version :", sparknlp_jsl.version())

Spark NLP Version : 3.1.0
Spark NLP_JSL Version : 3.1.1


**PRETRAINED PIPELINE FOR CLINICAL NER**

In [6]:
# Annotator that transforms a text column from dataframe into an Annotation ready for NLP
documentAssembler = DocumentAssembler()\
        .setInputCol("text")\
        .setOutputCol("document")

# Sentence Detector annotator, processes various sentences per line

#sentenceDetector = SentenceDetector()\
        #.setInputCols(["document"])\
        #.setOutputCol("sentence")
sentenceDetector = SentenceDetectorDLModel.pretrained("sentence_detector_dl_healthcare","en","clinical/models")\
        .setInputCols(["document"])\
        .setOutputCol("sentence")
 
# Tokenizer splits words in a relevant format for NLP
tokenizer = Tokenizer()\
        .setInputCols(["sentence"])\
        .setOutputCol("token")

# Clinical word embeddings trained on PubMED dataset
word_embeddings = WordEmbeddingsModel.pretrained("embeddings_clinical","en","clinical/models")\
        .setInputCols(["sentence","token"])\
        .setOutputCol("embeddings")

# NER model trained on i2b2 (sampled from MIMIC) dataset
clinical_ner = MedicalNerModel.pretrained("ner_clinical_large","en","clinical/models")\
        .setInputCols(["sentence","token","embeddings"])\
        .setOutputCol("ner")

ner_converter = NerConverter()\
        .setInputCols(["sentence","token","ner"])\
        .setOutputCol("ner_chunk")

nlpPipeline = Pipeline(stages=[
        documentAssembler,
        sentenceDetector,
        tokenizer,
        word_embeddings,
        clinical_ner,
        ner_converter])


empty_data = spark.createDataFrame([[""]]).toDF("text")

model = nlpPipeline.fit(empty_data)


sentence_detector_dl_healthcare download started this may take some time.
Approximate size to download 363.9 KB
[OK!]
embeddings_clinical download started this may take some time.
Approximate size to download 1.6 GB
[OK!]
ner_clinical_large download started this may take some time.
Approximate size to download 13.9 MB
[OK!]


In [7]:
model.stages

[DocumentAssembler_df8a03e8df92,
 SentenceDetectorDLModel_d2546f0acfe2,
 REGEX_TOKENIZER_a2331202159d,
 WORD_EMBEDDINGS_MODEL_9004b1d00302,
 MedicalNerModel_1a8637089929,
 NerConverter_96e0534b6ee9]

In [8]:
clinical_ner.getClasses()

['O',
 'B-TREATMENT',
 'I-TREATMENT',
 'B-PROBLEM',
 'I-PROBLEM',
 'B-TEST',
 'I-TEST']

In [9]:
clinical_ner.extractParamMap()

{Param(parent='MedicalNerModel_1a8637089929', name='batchSize', doc='Size of every batch'): 64,
 Param(parent='MedicalNerModel_1a8637089929', name='classes', doc='get the tags used to trained this MedicalNerModel'): ['O',
  'B-TREATMENT',
  'I-TREATMENT',
  'B-PROBLEM',
  'I-PROBLEM',
  'B-TEST',
  'I-TEST'],
 Param(parent='MedicalNerModel_1a8637089929', name='includeAllConfidenceScores', doc='whether to include all confidence scores in annotation metadata or just the score of the predicted tag'): False,
 Param(parent='MedicalNerModel_1a8637089929', name='includeConfidence', doc='whether to include confidence scores in annotation metadata'): True,
 Param(parent='MedicalNerModel_1a8637089929', name='inferenceBatchSize', doc='number of sentences to process in a single batch during inference'): 1,
 Param(parent='MedicalNerModel_1a8637089929', name='inputCols', doc='previous annotations columns, if renamed'): ['sentence',
  'token',
  'embeddings'],
 Param(parent='MedicalNerModel_1a8637089

In [10]:
from google.colab import files

uploaded = files.upload()

Saving all_text.csv to all_text.csv


In [11]:
import pyspark.sql.functions as F

pubMedDF = spark.read\
                .option("header", "true")\
                .csv("all_text.csv")\
                
pubMedDF.show(truncate=80)

+-----+--------------------------------------------------------------------------------+
|index|                                                                            text|
+-----+--------------------------------------------------------------------------------+
|    0|Diagnostic accuracy of a novel artificial intelligence system for adenoma det...|
|    2|Synthetic pulmonary perfusion images from 4DCT for functional avoidance using...|
|    3|Prediction of chemotherapy response in breast cancer patients at pre-treatmen...|
|    9|Deep learning for sex classification in resting-state and task functional bra...|
|   15|Individualized Glaucoma Change Detection Using Deep Learning Auto Encoder-Bas...|
|   20|A machine learning analysis of risk and protective factors of suicidal though...|
|   24|Using Deep Learning Segmentation for Endotracheal Tube Position Assessment. T...|
|   37|Automated detection and segmentation of sclerotic spinal lesions on body CTs ...|
|   39|Dynamic Routin

In [12]:
pubMedDF.tail(5)

[Row(index='161510', text='On the use of neural network techniques to analyze sleep EEG data. Third communication: robustification of the classificator by applying an algorithm obtained from 9 different networks. This is the third communication on the use of neural network techniques to classify sleep stages. In our first communication we presented the algorithms and the selection of the feature space and its reduction by using evolutionary and genetic procedures. In our second communication we trained the evolutionary optimized networks on the basis of multiple subject data in context with some smoothing algorithms in analogy of Rechtschaffen and Kales (RK). In this third communication we could demonstrate that the robustness concerning individual specific features of automatically generated sleep profiles could be reasonably improved by an additional modification of the procedure used by SASCIA (Sleep Analysis System to Challenge Innovative Artificial Networks). The outputs of nine d

In [13]:
result = model.transform(pubMedDF)
result.show(truncate=80)

+-----+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|index|                                                                            text|                                                                        document|                                                                        sentence|                                                                           token|                                                                      embeddings|             

In [14]:
result.tail(5)

[Row(index='161510', text='On the use of neural network techniques to analyze sleep EEG data. Third communication: robustification of the classificator by applying an algorithm obtained from 9 different networks. This is the third communication on the use of neural network techniques to classify sleep stages. In our first communication we presented the algorithms and the selection of the feature space and its reduction by using evolutionary and genetic procedures. In our second communication we trained the evolutionary optimized networks on the basis of multiple subject data in context with some smoothing algorithms in analogy of Rechtschaffen and Kales (RK). In this third communication we could demonstrate that the robustness concerning individual specific features of automatically generated sleep profiles could be reasonably improved by an additional modification of the procedure used by SASCIA (Sleep Analysis System to Challenge Innovative Artificial Networks). The outputs of nine d

In [13]:
# result.show(n = 1000, truncate = 50)

In [14]:
#uploaded = files.upload()

In [15]:
#index_df = spark.read.option("header", True) \
#  .csv("index.csv")

In [16]:
#from pyspark.sql.functions import col, split
#
#index_df = index_df.withColumn("index", split(col("index"), ",").cast("array<long>"))
#index_df.show()

In [17]:
#from pyspark.sql.window import Window
#from pyspark.sql.functions import monotonically_increasing_id, row_number
#
# since there is no common column between these two dataframes add row_index so that it can be joined
#temp1=index_df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
#temp2=transformed.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
#
#result = temp1.join(temp2, on=["row_index"]).drop("row_index")
#result.show()

In [15]:
labelled = result.select('index','ner_chunk.result','ner_chunk.metadata')
labelled.show(n = 5, truncate=128)

+-----+--------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+
|index|                                                                                                                          result|                                                                                                                        metadata|
+-----+--------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+
|    0|[a novel artificial intelligence system, adenoma, a prospective non-randomized comparative study, Adenoma detection rate (ADR...|[{entity -> TREATMENT, sentence -> 0, chunk -> 0}, {entity -> PROB

In [16]:
labelled.tail(5)

[Row(index='161510', result=['neural network techniques', 'an algorithm', 'neural network techniques', 'the algorithms', 'its reduction', 'evolutionary', 'genetic procedures', 'some smoothing algorithms', 'an additional modification of the procedure', 'Innovative Artificial Networks', 'The outputs of nine different networks', 'the values', 'each output measure', 'individuum-specific variability', 'three channels', 'EEG', 'EMG', 'EOG', 'one EEG channel', 'The feature selections', 'genetic algorithms', 'evolutionary algorithms', 'This algorithm'], metadata=[{'sentence': '0', 'chunk': '0', 'entity': 'TREATMENT'}, {'sentence': '1', 'chunk': '1', 'entity': 'TEST'}, {'sentence': '2', 'chunk': '2', 'entity': 'TREATMENT'}, {'sentence': '3', 'chunk': '3', 'entity': 'TREATMENT'}, {'sentence': '3', 'chunk': '4', 'entity': 'TREATMENT'}, {'sentence': '3', 'chunk': '5', 'entity': 'TREATMENT'}, {'sentence': '3', 'chunk': '6', 'entity': 'TREATMENT'}, {'sentence': '4', 'chunk': '7', 'entity': 'TREATMEN

In [17]:
labelled.toPandas().to_csv('output/labelled.csv')

In [22]:
from pyspark.sql.functions import split, col

array = result.select(split(col("index"),"::").alias("index_array"),
          col("ner_chunk")) \
    .drop("index")

In [23]:
array.show(5)

+-----------+--------------------+
|index_array|           ner_chunk|
+-----------+--------------------+
|        [0]|[{chunk, 23, 60, ...|
|        [2]|[{chunk, 0, 35, S...|
|        [3]|[{chunk, 14, 34, ...|
|        [9]|[{chunk, 128, 171...|
|       [15]|[{chunk, 15, 39, ...|
+-----------+--------------------+
only showing top 5 rows



In [29]:
only_labels = array.select(F.explode(F.arrays_zip('index_array', 'ner_chunk.result', 'ner_chunk.metadata')).alias("cols")) \

only_labels.show(5, truncate = 0)

+---------------------------------------------------------------------------------------------------+
|cols                                                                                               |
+---------------------------------------------------------------------------------------------------+
|{0, a novel artificial intelligence system, {entity -> TREATMENT, sentence -> 0, chunk -> 0}}      |
|{null, adenoma, {entity -> PROBLEM, sentence -> 0, chunk -> 1}}                                    |
|{null, a prospective non-randomized comparative study, {entity -> TEST, sentence -> 0, chunk -> 2}}|
|{null, Adenoma detection rate (ADR, {entity -> PROBLEM, sentence -> 1, chunk -> 3}}                |
|{null, adenoma miss rate (AMR, {entity -> PROBLEM, sentence -> 1, chunk -> 4}}                     |
+---------------------------------------------------------------------------------------------------+
only showing top 5 rows



In [42]:
labels_tabulated = only_labels.select(F.expr("cols['index_array']").alias("index"),
        F.expr("cols['1']").alias("chunk"),
        F.expr("cols['2']['entity']").alias("ner_label"),
        F.expr("cols['2']['chunk']").alias("rank"))

labels_tabulated.show(20, truncate=0)

+-----+----------------------------------------------+---------+----+
|index|chunk                                         |ner_label|rank|
+-----+----------------------------------------------+---------+----+
|0    |a novel artificial intelligence system        |TREATMENT|0   |
|null |adenoma                                       |PROBLEM  |1   |
|null |a prospective non-randomized comparative study|TEST     |2   |
|null |Adenoma detection rate (ADR                   |PROBLEM  |3   |
|null |adenoma miss rate (AMR                        |PROBLEM  |4   |
|null |Artificial intelligence (AI) systems          |TREATMENT|5   |
|null |endoscopic quality                            |TREATMENT|6   |
|null |interval cancer                               |PROBLEM  |7   |
|null |an AI system                                  |PROBLEM  |8   |
|null |real time colonoscopy                         |TEST     |9   |
|null |the ADR                                       |PROBLEM  |10  |
|2    |Synthetic pul

In [43]:
labels_tabulated.toPandas().to_csv('output/labels_tabulated.csv')