In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, lit, monotonically_increasing_id
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, StringType, ArrayType, StructType, StructField
from pyspark.sql import Row, Column
from pyspark.ml.feature import Word2Vec
from pyspark.ml.clustering import KMeans
from sklearn.cluster import DBSCAN
from spark_sklearn.keyed_models import KeyedEstimator
import numpy as np
from pyspark.ml.linalg import Vectors

sc = SparkContext(appName="Medical")
sqlContext = SQLContext(sc)
data = sc.wholeTextFiles("file:///C:/DataSet_MedicalRecords_2019.txt").toDF(['file', 'text'])

In [2]:
def extract(text):
    lines = text.split('\n')
    lines = [line.replace('\n', '') for line in lines]
    lines = [line for line in lines if line]
    lines = [line.lower() for i, line in enumerate(lines) if i > 0 and lines[i-1] == 'Анамнез болезни']
    return lines

extract_udf = udf(extract, ArrayType(StringType()))

In [3]:
data = data.withColumn('text', extract_udf('text')).select('text')
print('Extracted "Анамнез болезни" to cell of DF')
data.show(20)

Extracted "Анамнез болезни" to cell of DF
+--------------------+
|                text|
+--------------------+
|[ заболела в 12:0...|
+--------------------+



In [4]:
data = data.withColumn('text', F.explode('text'))
print('Convert the cell to rows')
data.show(20)

Convert the cell to rows
+--------------------+
|                text|
+--------------------+
| заболела в 12:00...|
| заболела остро у...|
| заболела остро 2...|
| заболела остро 1...|
| заболела остро 1...|
| заболела 22.10.2...|
| заболела остро 2...|
| заболела остро 1...|
| заболел остро 28...|
| заболел остро 28...|
| заболела остро 2...|
| заболела 19.12.1...|
| заболела 13.12.1...|
| заболела остро 0...|
| заболел остро 07...|
| заболел остро 03...|
| заболела остро 0...|
| заболел 16.06.16...|
| заболела утром, ...|
| заболел 15.06.17...|
+--------------------+
only showing top 20 rows



In [5]:
data = data.withColumn('text', F.split('text', ',|\.'))
number_of_texts = data.count()
print('Splitted each text into sentences')
data.show(20)

Splitted each text into sentences
+--------------------+
|                text|
+--------------------+
|[ заболела в 12:0...|
|[ заболела остро ...|
|[ заболела остро ...|
|[ заболела остро ...|
|[ заболела остро ...|
|[ заболела 22, 10...|
|[ заболела остро ...|
|[ заболела остро ...|
|[ заболел остро 2...|
|[ заболел остро 2...|
|[ заболела остро ...|
|[ заболела 19, 12...|
|[ заболела 13, 12...|
|[ заболела остро ...|
|[ заболел остро 0...|
|[ заболел остро 0...|
|[ заболела остро ...|
|[ заболел 16, 06,...|
|[ заболела утром,...|
|[ заболел 15, 06,...|
+--------------------+
only showing top 20 rows



In [6]:
data = data.withColumn('all_sentences', F.explode('text'))
print('Converted sentences to rows')
data.show()

Converted sentences to rows
+--------------------+--------------------+
|                text|       all_sentences|
+--------------------+--------------------+
|[ заболела в 12:0...| заболела в 12:00 05|
|[ заболела в 12:0...|                  10|
|[ заболела в 12:0...|16г - появился озноб|
|[ заболела в 12:0...| повысилась темпе...|
|[ заболела в 12:0...|                   0|
|[ заболела в 12:0...| принимала колдре...|
|[ заболела в 12:0...| вечером около 24...|
|[ заболела в 12:0...| чувство распиран...|
|[ заболела в 12:0...| появилась красно...|
|[ заболела в 12:0...| вызвала бригаду ...|
|[ заболела в 12:0...| доставлена в кли...|
|[ заболела в 12:0...|                    |
|[ заболела остро ...| заболела остро у...|
|[ заболела остро ...|                  10|
|[ заболела остро ...|               2016г|
|[ заболела остро ...|      появился озноб|
|[ заболела остро ...| повышение темпер...|
|[ заболела остро ...|                  5c|
|[ заболела остро ...|       головная боль|
|[ з

In [7]:
data.select('all_sentences').distinct().toPandas().to_csv('unique_sentences.csv')
data = data.withColumn('all_sentences_splitted', F.split('all_sentences', ' ')) 

In [8]:
def filter_empty_strings(data_list):
    return [d for d in data_list if d.replace('\s+', '')]

filter_empty_strings_udf = udf(filter_empty_strings, ArrayType(StringType()))

data = data.withColumn('all_sentences_splitted', filter_empty_strings_udf('all_sentences_splitted'))
data = data.filter(F.size('all_sentences_splitted') > 0)
word2vec = Word2Vec(vectorSize=5, minCount=0, inputCol="all_sentences_splitted", outputCol="features")
model = word2vec.fit(data)
data = model.transform(data)
print('texts, sentences, all sentences splitted to words, w2v')
data.show()

texts, sentences, all sentences splitted to words, w2v
+--------------------+--------------------+----------------------+--------------------+
|                text|       all_sentences|all_sentences_splitted|            features|
+--------------------+--------------------+----------------------+--------------------+
|[ заболела в 12:0...| заболела в 12:00 05|  [заболела, в, 12:...|[-0.0078419214114...|
|[ заболела в 12:0...|                  10|                  [10]|[-0.0639954134821...|
|[ заболела в 12:0...|16г - появился озноб|  [16г, -, появился...|[0.04686746373772...|
|[ заболела в 12:0...| повысилась темпе...|  [повысилась, темп...|[0.05132200475782...|
|[ заболела в 12:0...|                   0|                   [0]|[0.07541151344776...|
|[ заболела в 12:0...| принимала колдре...|  [принимала, колдр...|[-0.0399321440607...|
|[ заболела в 12:0...| вечером около 24...|  [вечером, около, ...|[-0.0404887950151...|
|[ заболела в 12:0...| чувство распиран...|  [чувство, распира...

In [9]:
clust_model = DBSCAN(eps=0.01, min_samples=1)
clust_data = data.toPandas()
vectors = clust_data['features'].apply(lambda x: np.array(x.toArray())).values
vectors = np.stack(vectors)
vectors = clust_model.fit_predict(vectors)
clust_data['cluster'] = vectors
print(clust_data[['all_sentences', 'cluster']])

                                         all_sentences  cluster
0                                  заболела в 12:00 05        0
1                                                   10        1
2                                 16г - появился озноб        2
3                         повысилась температура до 39        3
4                                                    0        4
5          принимала колдрекс для снижения температуры        5
6     вечером около 24:00 появились боли в левой па...        6
7                    чувство распирания в левой голени        7
8     появилась краснота на коже голени и внутренне...        8
9                      вызвала бригаду "скорой помощи"        9
10          доставлена в клинику инфекционных болезней       10
11                             заболела остро утром 12       11
12                                                  10        1
13                                               2016г       12
14                                      

In [10]:
clust_data = sqlContext.createDataFrame([(x,) for x in clust_data['cluster'].values.tolist()], ['cluster'])
data = data.withColumn("row_idx", monotonically_increasing_id())
clust_data = clust_data.rdd.zipWithIndex().toDF(['cluster', 'row_idx'])
data = data.join(clust_data, data.row_idx == clust_data.row_idx).drop("row_idx")
print('after joining')
data.show()

after joining
+--------------------+--------------------+----------------------+--------------------+-------+
|                text|       all_sentences|all_sentences_splitted|            features|cluster|
+--------------------+--------------------+----------------------+--------------------+-------+
|[ заболела остро ...| обратилась в ск/...|  [обратилась, в, с...|[0.02617141131001...|   [25]|
|[ заболела остро ...|   заболела остро 22|  [заболела, остро,...|[0.06925934180617...|   [28]|
|[ заболела остро ...|   заболела остро 15|  [заболела, остро,...|[0.05576257035136...|   [63]|
|[ заболела 19, 12...|    16 с недомогания|  [16, с, недомогания]|[0.01587591382364...|  [148]|
|[ заболела остро ...|                2016|                [2016]|[-0.0230788942426...|  [172]|
|[ заболел остро 0...|       головная боль|      [головная, боль]|[-0.0020724097266...|   [16]|
|[ заболела остро ...| когда утром появ...|  [когда, утром, по...|[0.02361336117610...|  [216]|
|[ заболела остро ...| с д

In [11]:
data.select('all_sentences', 'cluster').toPandas().to_csv('groups.csv')
data = data.groupBy('cluster').agg(F.countDistinct('text'))

@udf('float')
def count_to_prcng(x):
    return x /  number_of_texts * 100.

data = data.withColumn('count(DISTINCT text)', count_to_prcng('count(DISTINCT text)'))
data.show()
data.toPandas().to_csv('occur.csv')

+-------+--------------------+
|cluster|count(DISTINCT text)|
+-------+--------------------+
|   [29]|           14.285714|
|   [26]|           4.7619047|
|  [191]|           4.7619047|
|   [65]|           4.7619047|
|  [222]|           4.7619047|
|  [243]|           4.7619047|
|   [19]|           14.285714|
|   [54]|           4.7619047|
|    [0]|           4.7619047|
|  [112]|           4.7619047|
|  [113]|            9.523809|
|  [167]|           4.7619047|
|  [155]|           4.7619047|
|  [241]|           4.7619047|
|  [237]|           4.7619047|
|   [22]|           4.7619047|
|  [198]|           4.7619047|
|  [130]|            9.523809|
|  [196]|           4.7619047|
|   [77]|           4.7619047|
+-------+--------------------+
only showing top 20 rows

