[Google Colab](https://colab.research.google.com/drive/1FfHOLLED_uv7T8RDfz602XWOSI1S1NCX?usp=sharing)

In [None]:
!pip install pyspark
!pip install sparknlp

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 49.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=fd425dbc462d6c7fe9189c93b6f8f71d7bf05f134049657cee79f4d92d767175
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Collecting sparknlp
  Downloading sparknlp-1.0.0-py3-none-any.whl (1.4 kB)
Collecting spark-nlp
  Downloading spark_nlp-3.4.2-py2.py3-none-any.whl (142 kB)
[K     |████████████████████████████████| 142 kB 23.9 MB/s 
I

In [None]:
# Install java
import os
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-8u312-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)


In [None]:
import sparknlp
spark = sparknlp.start() 
# sparknlp.start(gpu=True) >> for training on GPU
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from sparknlp.common import *
import pandas as pd
from pyspark.sql.functions import *
print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

Spark NLP version 3.4.2
Apache Spark version: 3.2.1


In [None]:
df = spark.read.option("header",True).csv("mtsample_report.csv")
df = df.drop("_c0")
df.show(5)

+--------------------+
|      medical report|
+--------------------+
|"CHIEF COMPLAINT:...|
|"HISTORY OF PRESE...|
|HISTORY OF PRESEN...|
|CHIEF COMPLAINT: ...|
|REASON FOR CONSUL...|
+--------------------+
only showing top 5 rows



In [None]:
df = df.withColumn('medical report', lower(col('medical report')))
df = df.withColumn('medical report', regexp_replace('medical report','[^ ^a-z^0-9^,^.]',' '))
df = df.withColumn('medical report', regexp_replace('medical report','\.','. '))
df.take(1)

[Row(medical report=' chief complaint  abdominal pain. history of present illness  the patient is a 71 year old female patient of dr.  x.  the patient presented to the emergency room last evening with approximately 7  to 8 day history of abdominal pain which has been persistent.  she was seen 3 to 4 days ago at abc er and underwent evaluation and discharged and had a ct scan at that time and she was told it was   normal.    she was given oral antibiotics of cipro and flagyl.  she has had no nausea and vomiting but has had persistent associated anorexia.  she is passing flatus but had some obstipation symptoms with the last bowel movement two days ago.  she denies any bright red blood per rectum and no history of recent melena.  her last colonoscopy was approximately 5 years ago with dr.  y.  she has had no definite fevers or chills and no history of jaundice.  the patient denies any significant recent weight loss. past medical history  significant for history of atrial fibrillation und

In [None]:
document= DocumentAssembler()\
.setInputCol("medical report")\
.setOutputCol("document")

sentence = SentenceDetector()\
.setInputCols(["document"])\
.setOutputCol("sentence")

token = Tokenizer()\
.setInputCols(["sentence"])\
.setOutputCol("token")

# stop_words = StopWordsCleaner.pretrained("stopwords_en", "en") \
# .setInputCols(["token"]) \
# .setOutputCol("cleanTokens")\
# .setCaseSensitive(False)

# lemmatizer = LemmatizerModel.pretrained("lemma","en") \
# .setInputCols(["token"])\
# .setOutputCol("lemma")\
# .setCaseSensitive(False)

# finisher = Finisher() \
# .setInputCols(["lemma"])\
# .setOutputCols(["token_features"])\
# .setOutputAsArray(True) \
# .setCleanAnnotations(False)

finisher = Finisher() \
.setInputCols(["token"])\
.setOutputCols(["token_features"])\
.setOutputAsArray(True) \
.setCleanAnnotations(False)

In [None]:
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, IndexToString
hashTF = HashingTF(inputCol = "token_features", outputCol = "raw_features")
idf = IDF(inputCol = "raw_features", outputCol = "features")

In [None]:
nlpPipeline = Pipeline(stages=[
 document, 
 sentence,
 token,
 finisher,
 hashTF,
 idf
 ])

empty_df = spark.createDataFrame([['']]).toDF("medical report")

pipelineModel = nlpPipeline.fit(empty_df)

In [None]:
result = pipelineModel.transform(df)
result.show(truncate=20)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      medical report|            document|            sentence|               token|      token_features|        raw_features|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| chief complaint ...|[{document, 0, 34...|[{document, 1, 32...|[{token, 1, 5, ch...|[chief, complaint...|(262144,[2611,293...|(262144,[2611,293...|
| history of prese...|[{document, 0, 24...|[{document, 1, 18...|[{token, 1, 7, hi...|[history, of, pre...|(262144,[161,1365...|(262144,[161,1365...|
|history of presen...|[{document, 0, 20...|[{document, 0, 91...|[{token, 0, 6, hi...|[history, of, pre...|(262144,[9413,114...|(262144,[9413,114...|
|chief complaint  ...|[{document, 0, 24...|[{document, 0, 23...|[{token, 0, 4, ch...|[chief, complaint...|

In [None]:
vectorized_tokens = lda_model.transform(result.select('token_features'))

IllegalArgumentException: ignored