In [1]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import numpy as np
import datetime
import os
import sys

In [2]:
sc = SparkContext()
spark = SparkSession(sc)

In [3]:
parse_results_remote_dir = os.path.join('D:\\bio-ner\\tsv', 'pubmed_data', 'tsv_files')

In [4]:
timestart = datetime.datetime.now()
num_xml_files = 2 
batch_size =50
pubmed_tsv_file = os.path.join(parse_results_remote_dir, 'batch#{}.tsv'.format(1))   
print("Reading file {}".format(pubmed_tsv_file))     
abstracts_batch_df = spark.read.csv(path=pubmed_tsv_file, header=True, inferSchema=True, sep = "\t")

print("\tAdding {} records ...".format(abstracts_batch_df.count()))
abstracts_full_df = abstracts_batch_df

Reading file D:\bio-ner\tsv\pubmed_data\tsv_files\batch#1.tsv
	Adding 26828 records ...


In [5]:
for i in range(1 + batch_size, num_xml_files + 1, batch_size):  
    try:
        pubmed_tsv_file = os.path.join(parse_results_remote_dir, 'batch#{}.tsv'.format(i))   
        print("Reading file {}".format(pubmed_tsv_file))     
        abstracts_batch_df = spark.read.csv(path=pubmed_tsv_file, header=True, inferSchema=True, sep = "\t")

        print("\tAdding {} records ...".format(abstracts_batch_df.count()))
        
        abstracts_full_df = abstracts_full_df.union(abstracts_batch_df)
       
    except:
        print("Skipped" + str(i))

In [6]:
abstracts_full_df.printSchema()
print("abstracts_full_df.count() = {}".format(abstracts_full_df.count()))
print("abstracts_full_df.head() = {}".format(abstracts_full_df.head()))

root
 |-- pmid: integer (nullable = true)
 |-- abstract: string (nullable = true)

abstracts_full_df.count() = 26828
abstracts_full_df.head() = Row(pmid=30978, abstract='The paper describes a modified method of isolating the branching enzyme of amylose isomerase from muscles and a study of the enzyme activity at different stages of purification. By enzyme fractionation on biogel R-150 and Sepharose 6B the fractions containing different RNA amounts have been isolated. The activity of fractions has been shown to depend on their content of RNA. The paper presents a procedure used to isolate a highly purified fraction of amylose isomerase and its properties (pH and temperature optima, enzyme optimal concentration and Michaelis constant).')


In [7]:
timeend = datetime.datetime.now()
timedelta = round((timeend - timestart).total_seconds() / 60, 2)
print("Time taken to execute above cell: " + str(timedelta) + " mins")

abstracts_full_df2 = abstracts_full_df

Time taken to execute above cell: 0.31 mins


In [8]:
from pyspark.sql.functions import regexp_replace, trim, col, lower, udf
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover

In [9]:
print("abstracts_full_df2.head() = {}".format(abstracts_full_df2.head()))

# Convert the content to Lower Case
print("Converting the abstarct to Lower Case ... ")
abstracts_full_df3 = abstracts_full_df2.withColumn("abstractNew", lower(col("abstract"))).\
    withColumn("abstractNew", regexp_replace("abstractNew", '[^\w-_ ]', ""))

abstracts_full_df3.printSchema()
print("abstracts_full_df3.head() = {}".format(abstracts_full_df3.head()))

# Tokenize the Abstracts
print("tokenizating the abstracts... ")
tokenizer = Tokenizer(inputCol="abstractNew", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtWords")

abstracts_full_df4 = tokenizer.transform(abstracts_full_df3)

print("After tokenization: ")
abstracts_full_df4.printSchema()
print("abstracts_full_df4.count() = {}".format(abstracts_full_df4.count()))
print("abstracts_full_df4.head() = {}".format(abstracts_full_df4.head()))

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend - timestart).total_seconds() / 60, 2)
print("Time taken to execute above cell: " + str(timedelta) + " mins")

abstracts_full_df2.head() = Row(pmid=30978, abstract='The paper describes a modified method of isolating the branching enzyme of amylose isomerase from muscles and a study of the enzyme activity at different stages of purification. By enzyme fractionation on biogel R-150 and Sepharose 6B the fractions containing different RNA amounts have been isolated. The activity of fractions has been shown to depend on their content of RNA. The paper presents a procedure used to isolate a highly purified fraction of amylose isomerase and its properties (pH and temperature optima, enzyme optimal concentration and Michaelis constant).')
Converting the abstarct to Lower Case ... 
root
 |-- pmid: integer (nullable = true)
 |-- abstract: string (nullable = true)
 |-- abstractNew: string (nullable = true)

abstracts_full_df3.head() = Row(pmid=30978, abstract='The paper describes a modified method of isolating the branching enzyme of amylose isomerase from muscles and a study of the enzyme activity at dif

In [10]:
timestart = datetime.datetime.now()
model = None
window_size = 5
vector_size = 50
min_count =1000
print("Start training the model ...")
word2Vec = Word2Vec(windowSize = window_size, vectorSize = vector_size, minCount=min_count, numPartitions=10, inputCol="words", outputCol="result")
model = word2Vec.fit(abstracts_full_df4)

Start training the model ...


In [11]:
timeend = datetime.datetime.now()
timedelta = round((timeend - timestart).total_seconds() / 60, 2)
print("model.getVectors().count() = {}".format(model.getVectors().count()))
print("model.getVectors().head() = {}".format(model.getVectors().head()))
print("Time taken to train the word2Vec model: " + str(timedelta) + " mins")

model.getVectors().count() = 442
model.getVectors().head() = Row(word='rate', vector=DenseVector([3.5834, 2.8361, 2.8417, -1.8788, -1.4995, 1.1798, -4.0886, 0.9998, -1.3232, 1.5332, -1.7244, 5.4131, 3.4348, 1.5389, -3.0207, 1.1223, 1.3175, -2.66, -4.5728, -1.7008, -1.0857, -4.5549, 1.8341, -2.1264, 1.2162, -0.8343, 3.2214, -2.81, -0.8226, 1.0041, 4.9361, 2.5978, -0.9266, -0.717, -1.2915, 1.1835, 2.6452, 0.2605, 0.4492, 1.3232, -0.4564, 4.4845, -0.3336, -2.4449, 3.8965, 1.3426, 0.0811, 3.5372, -0.7994, 0.0486]))
Time taken to train the word2Vec model: 0.22 mins


In [12]:
print("findSynonyms('heart') = {}".format(model.findSynonyms("heart", 20).select("word").head(20)))

findSynonyms('heart') = [Row(word='fetal'), Row(word='pressure'), Row(word='flow'), Row(word='blood'), Row(word='arterial'), Row(word='rate'), Row(word='oxygen'), Row(word='decreased'), Row(word='renal'), Row(word='mean'), Row(word='subjects'), Row(word='increased'), Row(word='significantly'), Row(word='changes'), Row(word='muscle'), Row(word='urine'), Row(word='decrease'), Row(word='reduced'), Row(word='during'), Row(word='levels')]


In [13]:
print(abstracts_full_df4.count())
print(model.getVectors().count())
model.getVectors().printSchema()

26828
442
root
 |-- word: string (nullable = true)
 |-- vector: vector (nullable = true)



In [87]:
from pyspark.sql.functions import (
    col, max as max_, size, struct, monotonically_increasing_id
)

# with_id = abstracts_full_df4.withColumn("_id", monotonically_increasing_id())
# i = with_id.select(max_("_id")).first()[0]
# print(abstracts_full_df4.select("pmid").head())
# len(with_id.where(col("_id") == i).drop("_id").select("words").head().words)
abstracts_full_df4.where(size(col("words")) == 442).head()