*** VERSION 3 ***

**** NOTES ****

In [1]:
import os, sys, stat
from stat import *
import re
import pandas as pd
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
import unicodedata
from pyspark.sql.functions import *
from py4j.java_gateway import java_import, JavaGateway, GatewayClient

In [2]:
# Function to normalize the lines of a textfile. This function ensures that there are no unicode error
def normalize(filename):
    lines=[]
    with open(filename, 'r') as f:
        all_lines = f.readlines()
        #print all_lines
        for i in range(len(all_lines)):
            l = all_lines[i].decode('utf-8').strip(' \t\n')
            l.strip(',')
            if len(l) == 0:
                continue
            lines.append(unicodedata.normalize("NFKD", l).encode('ascii','ignore'))
    
    return lines

In [3]:
def normalize(line):
    l = line.decode('utf-8')
    l = unicodedata.normalize("NFKD", l).encode('ascii','ignore')
    
    return l


In [4]:
def format_key(key) :
    if key is None:
        return 'UNKNOWN_KEY'
    else:
        key = re.sub('[^0-9a-zA-Z ]+','',key)
        key = key.lower()
        key = key.replace(' ','_')

    return key


In [5]:
def parse(pathname, save_range=False):
    d = dict()
    with open(pathname, 'r') as f:
        heading = None
        section = ''
        section_start = None
        line = 0
        l = ""
        for l in f :
            l = normalize(l)
            l = l.strip(" \n")
            if len(l) == 0:
                continue

            if len(l.split(' ')) < 4:
                # possible next heading
                if len(section) > 0:
                    if save_range :
                        d[heading] = (section_start, line)
                    else:
                        heading = format_key(heading)
                        d[heading] = section
                    heading = None
                    section = ''
                    section_start = None

                heading = l
                continue

            if section_start is None:
                section_start = line

            section += l
            line += 1

        # save last section as well
        if heading is not None:
            if len(section) > 0:
                if save_range :
                    d[heading] = (section_start, line)
                else:
                    heading = format_key(heading)
                    d[heading] = section

    return d

In [6]:
# Function that fetches pmid key from the fulltext_id file (contains list of all files that have VEGF) shared by Sean
def mkf2id(full_text_file):
    f2id=dict()
    with open(full_text_file, 'r') as f:
        for l in f:
            tokens = l.strip(' \t\n').split(' ')
            filename = tokens[-1].replace('.tar.gz', '.txt')
            uniq_id = tokens[0].strip(' \t')
            f2id[filename] = uniq_id
    return f2id

In [7]:
# Function that creates a dataframe with pmid_key and different subsections as columns. 
# Each row represents each txt file

def mkdf(path, f2id):
    items = os.listdir(path)
    uniqueKeys = set()
    docList = []
    
    for item in items:
        pathname = os.path.join(path, item)
        mode = os.stat(pathname).st_mode
        if not stat.S_ISREG(mode):
            continue

        if not pathname.endswith('.txt'):
            continue

        print pathname
        d = parse(pathname)
        d['PMID_KEY'] = f2id[item]
        uniqueKeys.update(d.keys())
        docList.append(d)
        print len(docList)

    df = pd.DataFrame(columns=uniqueKeys)
    for d in docList:
        df=df.append(d, ignore_index=True)

    df = df.fillna('') #or ""
    return df

In [8]:
tmp_path = '/Users/haynes/Desktop/independent study/sample_run/fulltext/'
tmp_fulltextid = '/Users/haynes/Desktop/independent study/sample_run/fulltext_id.txt'

# invoking the function to fetch pmid key
f2id=mkf2id(tmp_fulltextid)
# invoking the function to create pandas dataframe
df = mkdf(tmp_path, f2id)

/Users/haynes/Desktop/independent study/sample_run/fulltext/Am_J_Physiol_Heart_Circ_Physiol_2012_Mar_15_302(6)_H1261-H1273.txt
1
/Users/haynes/Desktop/independent study/sample_run/fulltext/Arch_Med_Sci_2014_Aug_29_10(4)_837-845.txt
2
/Users/haynes/Desktop/independent study/sample_run/fulltext/Arthritis_Res_Ther_2010_Feb_11_12(1)_R22.txt
3
/Users/haynes/Desktop/independent study/sample_run/fulltext/Biochem_Biophys_Res_Commun_2007_Sep_21_361(2-2)_468-473.txt
4
/Users/haynes/Desktop/independent study/sample_run/fulltext/Biomed_Res_Int_2013_Sep_16_2013_580135.txt
5
/Users/haynes/Desktop/independent study/sample_run/fulltext/Biomed_Res_Int_2014_Jul_17_2014_107526.txt
6
/Users/haynes/Desktop/independent study/sample_run/fulltext/BMC_Biochem_2011_Apr_15_12_15.txt
7
/Users/haynes/Desktop/independent study/sample_run/fulltext/Br_J_Cancer_2002_Mar_18_86(6)_858-863.txt
8
/Users/haynes/Desktop/independent study/sample_run/fulltext/Br_J_Cancer_2008_Nov_4_99(9)_1415-1425.txt
9
/Users/haynes/Desktop/

In [None]:
# path to the folder containing all the txt files
path = "/Users/haynes/Desktop/independent study/files containg VEGF/fulltext"
# path to the txt file fulltext_id.txt file that contains the mapping of pmid key and filenames
path_fulltextid = "/Users/haynes/Desktop/independent study/files containg VEGF/fulltext_id.txt"


# invoking the function to fetch pmid key
f2id=mkf2id(path_fulltextid)
# invoking the function to create pandas dataframe
df = mkdf(path, f2id)

In [9]:
print df.shape
df.head()

(15, 85)


Unnamed: 0,Unnamed: 1,5_future_directions,expression_omnibus,laboratory_findings,set,cytotoxic_agents,construction_of_bsscfv,cell_lines,results,survival_,...,treatment,survival_analysis,rna_isolation,competing_interests,leiomyosarcoma,statistical_analysis,abbreviations,PMID_KEY,1_introduction,combination_chemotherapy_regimens
0,,,,,,,,,CD1 mice are an outbred strain and are prolifi...,,...,,,,,,Three-factor ANOVA was used to assess variance...,,22268107,,
1,,,,,,,,,,,...,,,,,,,,25276172,,
2,,,,,,,,,,,...,,,,The authors declare that they have no competin...,,,A2M: -2-macroglobulin; AC: articular cartilage...,20149220,,
3,,,,,,,,,Expression of facilitative glucose transporter...,,...,,,,,,,,17658463,,
4,,,,,,,,,,,...,,,,,,,,24151609,Despite intensive investigations during the pa...,


In [10]:
# Remove a column that has value only in one row and the value is new line alone
for col in df.columns.tolist():
    if df[df[col]==''].shape[0] == df.shape[0]:
        print col
        df.drop([col],inplace=True,axis=1)

In [11]:
df.columns.tolist()

['',
 '5_future_directions',
 'expression_omnibus',
 'laboratory_findings',
 'set',
 'cytotoxic_agents',
 'construction_of_bsscfv',
 'cell_lines',
 'results',
 'survival_',
 'authors_contributions',
 'biodistribution',
 'grants',
 'silhouette034',
 'human_samples',
 'adverse_events',
 'disclosures',
 'from_all_patients',
 'phytochemical_investigation',
 'thyroid_volume',
 'anatomic_outcomes',
 'goitre_and_acromegaly',
 'status_available',
 '2_il12',
 'available_by_',
 'conclusions',
 'introduction',
 'cdna_microarrays',
 'discussion',
 '4_discussion',
 'patients',
 '26_statistical_analysis',
 'principal_component_analysis',
 'side_effects',
 'forwardagggcactctgggaacctat_and',
 'in_vitro_selectivity',
 'samples',
 '4_il23',
 'tyrosine_kinase_inhibitors',
 'author_contributions',
 'material_and_methods',
 'conclusion',
 'identical',
 'materials_and_methods',
 'cell_cultures',
 'angiosarcoma',
 'clinical_outcomes',
 'bovine_samples',
 'in_vitro_efficacy',
 'stereology',
 '3__il27',
 'quan

In [12]:
df.rename(columns={'''''': 'unknown'}, inplace=True)

In [13]:
df.shape

(15, 85)

In [14]:
# Creating a spark dataframe
sdf = sqlContext.createDataFrame(df)

# Register the DataFrame as a SQL temporary view
sdf.registerTempTable("VEGF")

In [15]:
# Count the no:of times the word VEGF occurs in different columns

# creating a user defined lambda function that calculates the count of vegf
count_vegf = lambda c : ((length(c)-length(regexp_replace(c,'VEGF','')))/4)
#Getting a list of column names
columns = df.columns.tolist()

# Creating/ Initializing a new dataframe 
# New columns are cresaed to store the count of vegf. 
#The column names follows the naming convention count_vegf_'columnname'
sdf_new = sdf.withColumn('count_VEGF_'+columns[0], count_vegf(sdf[columns[0]]))

# looping through all the columsn except pmid_key and saving the count to new columns
for c in columns[1:]:
    if c not in ['PMID_KEY']:
        sdf_new = sdf_new.withColumn('count_VEGF_'+c, count_vegf(sdf_new[c]))

In [16]:
sdf_new[['PMID_KEY','introduction','count_VEGF_introduction']].show(15)

+--------+--------------------+-----------------------+
|PMID_KEY|        introduction|count_VEGF_introduction|
+--------+--------------------+-----------------------+
|22268107|                    |                    0.0|
|25276172|Acromegaly, a chr...|                    0.0|
|20149220|Low back pain (LB...|                    1.0|
|17658463|                    |                    0.0|
|24151609|                    |                    0.0|
|25136551|                    |                    0.0|
|21496221|                    |                    0.0|
|11953815|                    |                    0.0|
|18841159|                    |                    0.0|
|20571495|                    |                    0.0|
|22568967|                    |                    0.0|
|24518078|Neovascular age-r...|                    5.0|
|21837668|Soft tissue sarco...|                    0.0|
|20885915|                    |                    0.0|
|25258583|Angiogenesis play...|                 

In [17]:
# Finding the total count of vegf in one document i.e, finding the sum of all the 'count' columns

sdf_new.registerTempTable("VEGF_NEW")

# Constructing the sql query
query = 'select '
columns = sdf_new.columns
for i in range(len(columns)) :
    last = i == (len(columns)-1)
    if columns[i].startswith('count'):
        query += columns[i] + (' ' if last else ' + ')
query += 'as total_VEGF_count from VEGF_NEW'
#print query

# Invoking the query to find the total count of vegf
results = spark.sql(query)

# Converting to pandas dataframe
sdf_new_df = sdf_new.toPandas()
print sdf_new_df.shape
r_pdf=results.toPandas()
print r_pdf.shape

# Concatenating the 2 pandas df to create a final pandas dataframe
final_pdf = pd.concat([sdf_new_df,r_pdf],axis=1)
print final_pdf.shape

# Converting to spark dataframe
final_sdf = sqlContext.createDataFrame(final_pdf)

(15, 169)
(15, 1)
(15, 170)


In [18]:
final_sdf.registerTempTable("VEGF_NEW_1")
spark.sql("select sum(total_VEGF_count) from VEGF_NEW_1").collect()

[Row(sum(total_VEGF_count)=82.0)]

In [19]:
spark.sql("select PMID_KEY, total_VEGF_count from VEGF_NEW_1").collect()

[Row(PMID_KEY=u'22268107', total_VEGF_count=14.0),
 Row(PMID_KEY=u'25276172', total_VEGF_count=4.0),
 Row(PMID_KEY=u'20149220', total_VEGF_count=3.0),
 Row(PMID_KEY=u'17658463', total_VEGF_count=1.0),
 Row(PMID_KEY=u'24151609', total_VEGF_count=1.0),
 Row(PMID_KEY=u'25136551', total_VEGF_count=3.0),
 Row(PMID_KEY=u'21496221', total_VEGF_count=2.0),
 Row(PMID_KEY=u'11953815', total_VEGF_count=26.0),
 Row(PMID_KEY=u'18841159', total_VEGF_count=2.0),
 Row(PMID_KEY=u'20571495', total_VEGF_count=3.0),
 Row(PMID_KEY=u'22568967', total_VEGF_count=6.0),
 Row(PMID_KEY=u'24518078', total_VEGF_count=12.0),
 Row(PMID_KEY=u'21837668', total_VEGF_count=3.0),
 Row(PMID_KEY=u'20885915', total_VEGF_count=1.0),
 Row(PMID_KEY=u'25258583', total_VEGF_count=1.0)]