In [1]:
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

In [3]:
import pandas as pd
import numpy as np

In [4]:
final_topics = pd.read_csv('/content/data_topics.csv')

In [5]:
final_topics

Unnamed: 0,Description
0,a 58 year old african american woman presents ...
1,an 8 year old male presents in march to the er...
2,a 58 year old nonsmoker white female with mild...
3,a 2 year old boy is brought to the emergency d...
4,a 56 year old female on 20th day post left mas...
...,...
160,the patient is a 34 year old obese woman who c...
161,the patient is a 16 year old girl recently dia...
162,the patient is a 3 day old female infant with ...
163,the patient is a 53 year old man complaining o...


In [6]:
!pip install py7zr



In [7]:
import py7zr

In [8]:
with py7zr.SevenZipFile('/content/data.7z', mode='r') as z:
    z.extractall()

In [9]:
data_combined = pd.read_csv('/content/data.csv')

In [10]:
data_combined

Unnamed: 0,nct_id,combined
0,NCT00000102,congenital adrenal hyperplasia calcium channel...
1,NCT00000104,does lead burden alter neuropsychological deve...
2,NCT00000107,body water content in cyanotic congenital hear...
3,NCT00000108,effects of training intensity on the chd risk ...
4,NCT00000110,influence of diet and endurance running on int...
...,...,...
60217,NCT00989703,glpg0259 multiple ascending dose and methotrex...
60218,NCT00989729,preoperative methylprednisolone in endovascula...
60219,NCT00989742,doxycycline in lymphangioleiomyomatosis lam t...
60220,NCT00989755,center for disease control cdc fax to quit aca...


In [11]:
config = SparkSession.builder.master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").appName("sampleCodeForReference").getOrCreate()

In [12]:
sqlContext = config

In [13]:
df_topic = sqlContext.createDataFrame(final_topics)
df_nct = sqlContext.createDataFrame(data_combined)

In [14]:
df_nct.show()

+-----------+--------------------+
|     nct_id|            combined|
+-----------+--------------------+
|NCT00000102|congenital adrena...|
|NCT00000104|does lead burden ...|
|NCT00000107|body water conten...|
|NCT00000108|effects of traini...|
|NCT00000110|influence of diet...|
|NCT00000113|correction of myo...|
|NCT00000114|randomized trial ...|
|NCT00000115|randomized trial ...|
|NCT00000116|randomized trial ...|
|NCT00000117|intravenous immun...|
|NCT00000118|ganciclovir impla...|
|NCT00000120|clinical trial of...|
|NCT00000121|the prism adaptat...|
|NCT00000122|fluorouracil filt...|
|NCT00000123|the berkeley orth...|
|NCT00000125|ocular hypertensi...|
|NCT00000127|ischemic optic ne...|
|NCT00000129|prospective evalu...|
|NCT00000130|endophthalmitis v...|
|NCT00000131|central vein occl...|
+-----------+--------------------+
only showing top 20 rows



In [15]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline

In [16]:
columns = ['combined']

In [17]:
pre_process = []

for col in columns:
    re_token = RegexTokenizer(gaps=False, pattern='\w+', inputCol=col, outputCol='combined_tok')
    stopword_remover = StopWordsRemover(inputCol='combined_tok', outputCol='combined_sw_clean')
    pre_process += [re_token, stopword_remover]

pipeline = Pipeline(stages=pre_process)

In [18]:
data = df_nct
model = pipeline.fit(df_nct)
data_transformed = model.transform(df_nct)

In [19]:
data_transformed.show()

+-----------+--------------------+--------------------+--------------------+
|     nct_id|            combined|        combined_tok|   combined_sw_clean|
+-----------+--------------------+--------------------+--------------------+
|NCT00000102|congenital adrena...|[congenital, adre...|[congenital, adre...|
|NCT00000104|does lead burden ...|[does, lead, burd...|[lead, burden, al...|
|NCT00000107|body water conten...|[body, water, con...|[body, water, con...|
|NCT00000108|effects of traini...|[effects, of, tra...|[effects, trainin...|
|NCT00000110|influence of diet...|[influence, of, d...|[influence, diet,...|
|NCT00000113|correction of myo...|[correction, of, ...|[correction, myop...|
|NCT00000114|randomized trial ...|[randomized, tria...|[randomized, tria...|
|NCT00000115|randomized trial ...|[randomized, tria...|[randomized, tria...|
|NCT00000116|randomized trial ...|[randomized, tria...|[randomized, tria...|
|NCT00000117|intravenous immun...|[intravenous, imm...|[intravenous, imm...|

In [20]:
data_transformed.printSchema()

root
 |-- nct_id: string (nullable = true)
 |-- combined: string (nullable = true)
 |-- combined_tok: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- combined_sw_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [21]:
df = data_transformed.select('nct_id','combined_sw_clean').toPandas()

In [23]:
df.to_csv('data_spark.csv')