In [1]:
import numpy as np
#import findspark
#findspark.init()
import pyspark as ps
import re
import string
from pyspark import SparkContext
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import SQLContext
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Normalizer
from sklearn.metrics.pairwise import cosine_similarity

In [2]:
# File location and type
file_location = "/FileStore/tables/data_old.csv"
#file_location = "/FileStore/tables/data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = "\t"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df.head(3))

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7
id,title,authors,year,journal,abstract,tags,citations
27999001,Glucose Metabolism After Gastric Banding and Gastric Bypass in Individuals With Type 2 Diabetes: Weight Loss Effect.,"['Holter', 'Dutia', 'Stano', 'Prigeon', 'Homel', 'McGinty', 'Belsley', 'Ren', 'Rosen', 'Laferrère']",2017,Diabetes care,The superior effect of Roux-en-Y gastric bypass (RYGB) on glucose control compared with laparoscopic adjustable gastric banding (LAGB) is confounded by the greater weight loss after RYGB. We therefore examined the effect of these two surgeries on metabolic parameters matched on small and large amounts of weight loss.,"['Adult', 'Bariatric Surgery', 'Diabetes Mellitus, Type 2', 'Female', 'Gastric Bypass', 'Glucagon-Like Peptide 1', 'Glucose', 'Humans', 'Incretins', 'Insulin Resistance', 'Longitudinal Studies', 'Male', 'Middle Aged', 'Obesity', 'Postoperative Period', 'Prospective Studies', 'Sweetening Agents', 'Weight Loss']","['26826918', '26132586', '22535748', '24679060', '18650633', '20676394', '23388352', '24114113', '24296713', '18070760', '26186884', '26807004', '20029383', '26681719', '21107106', '23649520', '23652711', '23439632', '21078684', '7677463', '23187122', '12086947', '22449317', '16600932', '24089513', '27289123', '443421', '1551497', '25628424', '24189773', '23610060', '22359255', '24057293', '21339424', '26786780', '16478824', '20716694', '18430778', '17416796', '3899825']"
27999002,Metformin Is Associated With Higher Relative Abundance of Mucin-Degrading Akkermansia muciniphila and Several Short-Chain Fatty Acid-Producing Microbiota in the Gut.,"['de la Cuesta-Zuluaga', 'Mueller', 'Corrales-Agudelo', 'Velásquez-Mejía', 'Carmona', 'Abad', 'Escobar']",2017,Diabetes care,"Recent studies suggest the beneficial effects of metformin on glucose metabolism may be microbially mediated. We examined the association of type 2 diabetes, metformin, and gut microbiota in community-dwelling Colombian adults. On the basis of previous research, we hypothesized that metformin is associated with higher levels of short-chain fatty acid (SCFA)-producing and mucin-degrading microbiota.","['Adolescent', 'Adult', 'Case-Control Studies', 'Colombia', 'Diabetes Mellitus, Type 2', 'Fatty Acids, Volatile', 'Feces', 'Female', 'Gastrointestinal Microbiome', 'Humans', 'Hypoglycemic Agents', 'Male', 'Metformin', 'Middle Aged', 'Mucins', 'RNA, Ribosomal, 16S', 'Verrucomicrobia']",['27910881']


In [3]:
#import findspark
#findspark.init("/usr/local/Cellar/apache-spark/2.3.1/bin")

In [4]:
def remove_punctuation(text):
    """ This method removes the punctuation like commas and quotes from the text (string). 
    We also want to keep contractions together. The method also make the words lower cased.
    It returns a list or words in the text
        Args:
            text (string): the text we want to clean
        Return:
            A list with cleaned words
    """
    # split into words by white space
    words = text.split()
    words_lower = [w.lower() for w in words]
    
    # Remove punctuation from each word
    table = str.maketrans('', '', string.punctuation)
    stripped = [w.translate(table) for w in words_lower]
    return ' '.join(stripped)

In [5]:
# get the similarities for each pair of papers
def get_similarities(paper_rdd, paper_tfidf):
    """ Function that returns the array of similarities between each two papers
        Args:
            paper_rdd (rdd): idd of all papers abstract and titles
            paper_tfidf (pyspark.sql.dataframe.DataFrame): tf-idf vectors for a given paper
        Return:
            similarity_array:  array of cosine similarities for each pair of papers
    """
    
    print("... Computing L2 norm ...")
    labels = paper_rdd.map(lambda x: x[0])
    features = paper_tfidf

    normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
    data = labels.zip(normalizer.transform(features).rdd.map(lambda r: r.normFeatures))
    
    #Using a Cartesian product and the function dot on numpy arrays:
    similarity_array = data.cartesian(data)\
    .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
    .sortByKey()
    
    return similarity_array

In [6]:
# get the n top similar papers for given paper info
def get_neighbors(paper_PMID, similarity_array, n):
    """ Function that returns the 50 most similar papers for given paper
        Args:
            paper_PMID (int): PMID of the paper we want to find similar papers to
            similarity_array (array): cosine similarity array for all papers
            n (int): number of similar papers we are looking for, for a specified paper
        Return:
            list: the list of papers relevant to the given paper based on cosine similarity
    """
    candidates = similarity_array.filter(lambda x: x[0][0]==paper_PMID).sortBy(lambda a: -a[1])
    neighbors = candidates.map(lambda x: x[0][1])

    return neighbors.take(n)

In [7]:
def paper_recommender_title_abstract(paper_in_PMID, text_rdd, n):
    """The main function that gets a PMID for a paper, and recommends n similar papers, using the metadata (title and abstract) of all papers in text_rdd"""
    #sqlCtx = SQLContext(sc)
    
    text_rdd = text_rdd.zipWithIndex()
    
    # Separate the header from the text file
    header = text_rdd.first()
    
    # Remove the header from the text_rdd
    text_rdd_n = text_rdd.filter(lambda x: x != header).map(lambda y: (y[1], y[0].split('\t')))

    # Extract title and abstract for each paper and remove punctuations from them
    print("... Cleaning the Title & Abstract ...")    
    paper_abs_title_rdd = text_rdd_n.map(lambda x: (x[1][0], remove_punctuation(x[1][1] + " " +x[1][5])))       
    sentenceData_df = sqlContext.createDataFrame(paper_abs_title_rdd, ["label", "sentence"])
    
    print("... Tokenizing and StopWords removing ...")
    tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
    wordsData = tokenizer.transform(sentenceData_df)
    remover = StopWordsRemover(inputCol="words", outputCol="filtered")
    wordsData = remover.transform(wordsData)#.show(truncate=False)
    
    print("... Computing TF-IDF ...")
    hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
    tf = hashingTF.transform(wordsData).cache()
    
    idfModel = IDF(inputCol="rawFeatures", outputCol="features")
    idf = idfModel.fit(tf)
    tfidf = idf.transform(tf)

    print("... Calculating similarity array ...")
    data_similarity = get_similarities(paper_abs_title_rdd, tfidf)
    
    print("... Finding n similar papers ...")
    neighbors = get_neighbors(paper_in_PMID, data_similarity, n)
    
    print("Neighbors based on title and abstract =", neighbors)
    
    print("Main paper title: ", text_rdd_n.filter(lambda x: x[1][0]==paper_in_PMID).map(lambda y: y[1][1]).collect())
    
    for i in neighbors:
      print("Similar paper title for PMID", i, ":", text_rdd_n.filter(lambda x: x[1][0]==i).map(lambda y: y[1][1]).collect())

    return()

In [8]:
def paper_recommender_tags(paper_in_PMID, text_rdd, n):
    """The main function that gets a PMID for a paper, and recommends n similar papers, using the tags of all papers in text_rdd"""
    #sqlCtx = SQLContext(sc)
    
    text_rdd = text_rdd.zipWithIndex()
    
    # Separate the header from the text file
    header = text_rdd.first()
    
    # Remove the header from the text_rdd
    text_rdd_n = text_rdd.filter(lambda x: x != header).map(lambda y: (y[1], y[0].split('\t')))

    # Extract the tags for each paper and remove punctuations from them
    print("... Cleaning the tags ...")
    
    paper_tags_rdd = text_rdd_n.map(lambda x: (x[1][0],x[1][6].lower()))
    
    sentenceData_df = sqlContext.createDataFrame(paper_tags_rdd, ["label", "sentence"])
    
    print("... Tokenizing and StopWords removing ...")
    
    tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
    wordsData = tokenizer.transform(sentenceData_df)
    remover = StopWordsRemover(inputCol="words", outputCol="filtered")
    wordsData = remover.transform(wordsData)
    
    print("... Computing TF-IDF ...")
    
    hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
    tf = hashingTF.transform(wordsData).cache()
    
    idfModel = IDF(inputCol="rawFeatures", outputCol="features")
    idf = idfModel.fit(tf)
    tfidf = idf.transform(tf)
    
    print("... Calculating similarity array ...")
    data_similarity = get_similarities(paper_tags_rdd, tfidf)
    
    print("... Finding n similar papers ...")
    neighbors = get_neighbors(paper_in_PMID, data_similarity, n)
    
    print("Neighbors based on tags =", neighbors)
    
    print("Main paper tags: ", text_rdd_n.filter(lambda x: x[1][0]==paper_in_PMID).map(lambda y: y[1][6]).collect())
    
    for i in neighbors:
      print("Similar paper tags for PMID", i, ":", text_rdd_n.filter(lambda x: x[1][0]==i).map(lambda y: y[1][6]).collect())

    return()

In [9]:
if __name__ == '__main__':
    #sc.stop()
    #sc = ps.SparkContext('local[1]')
    text_rdd = sc.textFile(file_location)

    print("... Finding similar papers based on title and abstract ...")
    #paper_recommender_title_abstract('26990000', text_rdd, 10)    
    paper_recommender_title_abstract('27999001', text_rdd, 10) 
    
    print("--------------------------------------------------")
    print("... Finding similar papers based on tags ...")
    #paper_recommender_tags('26990000', text_rdd, 10) 
    paper_recommender_tags('27999001', text_rdd, 10) 