In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd

In [2]:
spark = SparkSession.builder.\
    config('spark.driver.memory', '28g').\
    getOrCreate()

# Load visits

In [3]:
data1 = pd.read_excel('data/LA_1308_Nuccore_Top_1000_by_Sessions.xlsx')
data2 = pd.read_excel('data/LA_1308_Protein_Top_1000_by_Sessions.xlsx')

In [4]:
ref_doi = spark.read.csv('data/DOI.merge.csv', header=True, inferSchema=True)

In [5]:
doi_refid = ref_doi.select('DOI', 'refid').where('DOI <> "NA"').toPandas()

In [6]:
visits = data1.assign(refid = data1['Nuccore_GI#']).merge(doi_refid)[['refid', 'Hits']].drop_duplicates()

In [7]:
visits_2015 = spark.createDataFrame(visits)

# Systematic run of the algorithm

In [8]:
vertices_new_id = spark.read.parquet('data/genbank_vertices_new_id.parquet').\
    withColumn('age', 2018-fn.col('year'))
edges_new_id = spark.read.parquet('data/genbank_edges_new_id.parquet')

In [9]:
# run the experiments below

# import os

# for pub_decay_time in [1, 5, 20]:
#     for data_decay_time in [1, 5, 20]:
#         for alpha in [0.25, 0.5]:
#             print(pub_decay_time, data_decay_time, alpha)
#             filename = 'data/results/ranks_p{}_d{}_a{}.parquet'.format(pub_decay_time, data_decay_time, int(alpha*100))
#             if not os.path.isdir(filename):
#                 rho, transitions = datarank.compute_rho_transitions(vertices_new_id, edges_new_id, 
#                     pub_decay_time=pub_decay_time, data_decay_time=data_decay_time)
#                 ranks = datarank.estimate_datarank(rho, transitions, alpha=alpha, 
#                     tol=0.1, max_iter=10, verbose=True)
#                 print('saving ... ', filename)
#                 ranks.write.parquet(filename)
#                 ranks.unpersist()

In [10]:
# example of the most important submissions by data rank
filename = 'data/results/ranks_p{}_d{}_a{}.parquet'.format(1, 5, int(0.5*100))
print("Loading {}".format(filename))
ranks = spark.read.parquet(filename)

Loading data/results/ranks_p1_d5_a50.parquet


In [11]:
doi_extract = fn.regexp_extract('DOI_i', r'^10.\d{4,9}/[-._;()/:A-Z0-9]+$',0)

In [12]:
is_doi = fn.length(doi_extract) > 0

In [13]:
is_refid = ~fn.isnull(fn.col('DOI_i').cast('int'))

In [14]:
n_citations = edges_new_id.\
    groupBy('DOI_j').\
    agg(fn.count('*').alias('n_citations')).\
    withColumnRenamed('DOI_j', 'DOI_i').\
    where(is_doi | is_refid)

In [15]:
plotting_data = ranks.join(vertices_new_id, 'i').\
                join(visits_2015.selectExpr('refid as DOI_i', 'Hits'), 'DOI_i').\
                join(n_citations, 'DOI_i').toPandas()

In [16]:
plotting_data[['value', 'n_citations', 'type', 'Hits']].groupby('type').corr()

Unnamed: 0_level_0,Unnamed: 1_level_0,Hits,n_citations,value
type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
data,Hits,1.0,0.188737,-0.377998
data,n_citations,0.188737,1.0,0.128379
data,value,-0.377998,0.128379,1.0
