In [1]:
import os
import findspark

os.environ['PATH'] = os.environ['PATH'] + ":/usr/java/jdk1.8.0_162/bin"
os.environ['PYSPARK_PYTHON'] = '/home/tozeng/anaconda3/bin/python'

import findspark
os.environ['PYSPARK_SUBMIT_ARGS'] = \
    "--packages com.databricks:spark-xml_2.11:0.5.0 pyspark-shell"
findspark.init('/opt/cloudera/parcels/SPARK2/lib/spark2/')

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
    appName('query').\
    config('spark.driver.memory', '20g').\
    config('spark.network.timeout', '600s').\
    config('spark.driver.maxResultSize', '30g').\
    config('spark.executor.memory', '15g').\
    config('spark.kryoserializer.buffer.max', '1g').\
    config('spark.cores.max', '50').\
    config('spark.rpc.message.maxSize', '256').\
    getOrCreate()

sc = spark.sparkContext

from pyspark.sql import functions as fn

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.auto import tqdm 
import pickle



In [3]:
# Loading Dimensions tables

authorship = spark.read.parquet('/user/lliang06/daniel-dimensions/authorship.parquet')
citations = spark.read.parquet('/user/lliang06/daniel-dimensions/citations.parquet')
datasets = spark.read.parquet('/user/lliang06/daniel-dimensions/datasets')
clinical_trials = spark.read.parquet('/user/lliang06/daniel-dimensions/clinical_trials')
grants = spark.read.parquet('/user/lliang06/daniel-dimensions/grants')
grid = spark.read.parquet('/user/lliang06/daniel-dimensions/grid')
patents = spark.read.parquet('/user/lliang06/daniel-dimensions/patents')
policy_documents = spark.read.parquet('/user/lliang06/daniel-dimensions/policy_documents')
publications = spark.read.parquet('/user/lliang06/daniel-dimensions/publications')
reports = spark.read.parquet('/user/lliang06/daniel-dimensions/reports')
researchers = spark.read.parquet('/user/lliang06/daniel-dimensions/researchers')
dimension_energy_pi = spark.read.parquet('./sloan/dimension_energy_pi.parquet')
authorship = spark.read.parquet('/user/lliang06/daniel-dimensions/authorship.parquet')
potential_researchers = spark.read.parquet('./sloan/potential_researchers.parquet')

In [None]:
# Getting Dimensions ID for funded researchers

dimension_df = pd.read_excel('./data/Dimensions_ID_final.xlsx')

with open('./data/dup_ids.txt') as f:
    lines = f.readlines()
    
dimension_dict = {}
dimension_dict['Award_number'] = []
dimension_dict['PI'] = []
dimension_dict['researcher_id'] = []
p = re.compile(r'ur.\d+.\d+')

for l in lines:
    if 'Award_number' in l:
        dimension_dict['Award_number'].append(l.split(' ')[-1][:-1])
    if (len(l.split(' ')) <= 4) & ('Award_number' not in l) & (l is not '\n') & ('combine' not in l) & ('Combine' not in l):
        dimension_dict['PI'].append(l.split('\t')[-1].split('\n')[0])
    if (('combine' in l) | ('Combine' in l)) & ('&' in l):
        if 'combine' in l:
            dimension_dict['researcher_id'].append(l.split('combine')[-1][:-1].split('&'))
        elif 'Combine' in l:
            dimension_dict['researcher_id'].append(l.split('Combine')[-1][:-1].split('&'))
                    
ids_match = pd.DataFrame(dimension_dict).explode('researcher_id')[['PI', 'researcher_id']].drop_duplicates()

dimension_ids = pd.concat([dimension_df[dimension_df['researcher_id'] != '-999'], ids_match])

# dimension_ids.to_csv('PI_dimension_ids.csv', index = False)      

In [81]:
dimension_ids_df = spark.createDataFrame(pd.read_csv('./data/PI_dimension_ids.csv'))

In [None]:
# Getting publications funded researchers authored

authorship.\
    join(dimension_ids_df, 'researcher_id', 'inner').\
    join(publications.select(publications.id, publications.abstract.preferred.alias('abstract')).withColumnRenamed('id', 'publication_id'), 'publication_id', 'inner').\
    select('publication_id', 'abstract').dropna().drop_duplicates().write.parquet('./sloan/dimension_energy_pi.parquet', mode = 'overwrite')

In [9]:
dimension_energy_pi = spark.read.parquet('./sloan/dimension_energy_pi.parquet')

In [None]:
# Authors citing publications authored by funded researchers

citing_authors = dimension_energy_pi.select('publication_id').\
    join(citations, 'publication_id', 'inner').\
    select(fn.col('citing_publication_id').alias('publication_id')).\
    join(authorship, 'publication_id', 'inner').\
    select('researcher_id')

# citing_authors.write.parquet('./sloan/citing_authors.parquet')

In [None]:
# Authors referenced by publications authored by funded researchers

referenced_authors = dimension_energy_pi.select(fn.col('publication_id').alias('citing_publication_id')).\
    join(citations, 'citing_publication_id', 'inner').\
    join(authorship, 'publication_id', 'inner').\
    select('researcher_id')

# referenced_authors.write.parquet('./sloan/referenced_authors.parquet')

In [None]:
# Researchers affiliated to US at some point of their career

us_researchers = researchers.\
    where(fn.size(fn.col('research_orgs')) != 0).\
    select(fn.col('id').alias('researcher_id'), fn.explode('research_orgs').alias('grid_id')).\
    join(grid.select(grid.address.country.alias('country'), grid.id.alias('grid_id')), 'grid_id', 'inner').\
    where(fn.col('country') == 'United States').\
    select('researcher_id')

# us_researchers.write.parquet('./sloan/us_researchers.parquet', mode = 'overwrite')

In [None]:
# Researchers currently affiliated with US

current_us_researchers = researchers.\
    where(fn.size(fn.col('research_orgs')) != 0).\
    select(fn.col('id').alias('researcher_id'), fn.element_at('research_orgs', -1).alias('grid_id')).\
    join(grid.select(grid.address.country.alias('country'), grid.id.alias('grid_id')), 'grid_id', 'inner').\
    where(fn.col('country') == 'United States').\
    select('researcher_id').drop_duplicates()

# current_us_researchers.write.parquet('./sloan/current_us_researchers.parquet')

In [None]:
# Researchers affiliated with not just US

not_only_us_researchers = us_researchers.\
    join(researchers.select(fn.col('id').alias('researcher_id'), fn.col('research_orgs')), 'researcher_id', 'inner').\
    select('researcher_id', fn.explode('research_orgs').alias('grid_id')).\
    join(grid.select(grid.address.country.alias('country'), grid.id.alias('grid_id')), 'grid_id', 'inner').\
    where(fn.col('country') != 'United States').\
    select('researcher_id').drop_duplicates()

# not_only_us_researchers.write.parquet('./sloan/not_only_us_researchers.parquet', mode= 'overwrite')

In [None]:
# Researchers who have at least 10 works

ten_work = researchers.\
                select(fn.col('id').alias('researcher_id')).\
                join(authorship, 'researcher_id', 'inner').\
                groupby('researcher_id').count().\
                where(fn.col('count') >= 10).\
                join(researchers.select(fn.col('id').alias('researcher_id'), 'total_publications'), 'researcher_id', 'inner').\
                where(fn.col('total_publications') >= 10).\
                select('researcher_id').drop_duplicates()

# ten_work.write.parquet('./sloan/ten_work.parquet')

In [None]:
# Researchers who have at least one work between 2009 and 2016

active_researchers = publications.\
    where(fn.col('year') >= 2009).\
    where(fn.col('year') <= 2016).\
    select(fn.explode('researcher_ids').alias('researcher_id'), 'year').\
    groupby('researcher_id').\
    count().\
    where(fn.col('count') >= 1).\
    select('researcher_id')

# active_researchers.write.parquet('./sloan/active_researchers.parquet')

In [None]:
citing_authors = spark.read.parquet('./sloan/citing_authors.parquet')
referenced_authors = spark.read.parquet('./sloan/referenced_authors.parquet')
us_researchers = spark.read.parquet('./sloan/us_researchers.parquet')
not_only_us_researchers = spark.read.parquet('./sloan/not_only_us_researchers.parquet')
ten_work = spark.read.parquet('./sloan/ten_work.parquet')
active_researchers = spark.read.parquet('./sloan/active_researchers.parquet')

In [None]:
citing_authors.\
    union(referenced_authors).\
    join(dimension_ids_df, 'researcher_id', 'left_anti').\ # remove funded researchers
    join(us_researchers, 'researcher_id', 'inner').\
    join(not_only_us_researchers, 'researcher_id', 'left_anti').\ # remove researchers who are not affiliated with US at any point of their career
    join(ten_work, 'researcher_id', 'inner').\
    join(active_researchers, 'researcher_id', 'inner').\
    dropna().drop_duplicates().\
    write.parquet('./sloan/potential_researchers.parquet')

In [4]:
potential_researchers = spark.read.parquet('./sloan/potential_researchers.parquet')
# all_PI_idx = pd.read_csv('./data/all_PI_idx.csv')
# sampled_id = all_PI_idx[all_PI_idx['category'] != 'potential']['researcher_id'].unique()
# spark.createDataFrame(pd.DataFrame(sampled_id).rename({0: 'researcher_id'}, axis = 1)).write.parquet('./sloan/sampled_id.parquet')
sampled_id_df = spark.read.parquet('./sloan/sampled_id.parquet')

In [None]:
# granted_researchers = dimension_ids_df.select('researcher_id')

# target_researchers = potential_researchers.select('researcher_id')s.\
#     union(granted_researchers)

In [37]:
citations.\
    join(publications.withColumnRenamed('id', 'publication_id').select('publication_id', 'year'), on = 'publication_id', how ='inner').\
    show()

+--------------+---------------------+-------------+----+
|publication_id|citing_publication_id|citation_year|year|
+--------------+---------------------+-------------+----+
|pub.1000000255|       pub.1001799494|         2014|2010|
|pub.1000000255|       pub.1068979015|         2015|2010|
|pub.1000000255|       pub.1141546631|         2021|2010|
|pub.1000000255|       pub.1072255085|         2015|2010|
|pub.1000000255|       pub.1147162789|         2022|2010|
|pub.1000000255|       pub.1117736772|         2012|2010|
|pub.1000000255|       pub.1044147342|         2015|2010|
|pub.1000000255|       pub.1117737630|         2012|2010|
|pub.1000000255|       pub.1022733763|         2016|2010|
|pub.1000000255|       pub.1069052145|         2012|2010|
|pub.1000000255|       pub.1100654402|         2018|2010|
|pub.1000000255|       pub.1072255293|         2014|2010|
|pub.1000000255|       pub.1106476923|         2018|2010|
|pub.1000000255|       pub.1121181968|         2019|2010|
|pub.100000025

In [None]:
# All abstracts for researcher matching

potential_researchers.\
    join(authorship, 'researcher_id', 'inner').\
    join(dimension_energy_pi, 'publication_id', 'left_anti').\
    select('publication_id').drop_duplicates().\
    join(publications.select(publications.id, publications.abstract.preferred.alias('abstract')).withColumnRenamed('id', 'publication_id'), 'publication_id', 'inner').\
    dropna().repartition(1).write.parquet('./sloan/potential_abstracts.parquet')

In [None]:
potential_researchers.sample(fraction = 0.01).\
    union(spark.createDataFrame(dimension_ids_df[['researcher_id']])).\
    write.parquet('./sloan/sampled_energy_PI.parquet')

In [None]:
sampled_energy_PI.\
    join(authorship, 'researcher_id', 'inner').\
    select('publication_id').drop_duplicates().\
    join(publications.select(publications.id, publications.abstract.preferred.alias('abstract')).withColumnRenamed('id', 'publication_id'), 'publication_id', 'inner').\
    dropna().write.parquet('./sloan/sampled_energy_PI_abstracts.parquet', mode = 'overwrite')

In [5]:
potential_abstracts = spark.read.parquet('./sloan/potential_abstracts.parquet')

In [6]:
potential_abstracts

DataFrame[publication_id: string, abstract: string]

In [8]:
potential_researchers.\
    join(authorship, 'researcher_id', 'inner').\
    select('publication_id').drop_duplicates().\
    join(publications.select(publications.id, publications.abstract.preferred.alias('abstract')).withColumnRenamed('id', 'publication_id'), 'publication_id', 'inner').\
    dropna().count()

6036472

In [9]:
dimension_energy_pi.\
    select('publication_id').drop_duplicates().\
    join(publications.select(publications.id, publications.abstract.preferred.alias('abstract')).withColumnRenamed('id', 'publication_id'), 'publication_id', 'inner').\
    dropna().count()

320454

In [107]:
# Get which potential authors cites which funded researcher

potential_author_citing = authorship.\
    join(dimension_ids_df.select('researcher_id'), on = 'researcher_id', how = 'inner').\
    join(citations.drop('citation_year'), on = 'publication_id', how = 'inner').\
    join(authorship.withColumnRenamed('publication_id', 'citing_publication_id').withColumnRenamed('researcher_id', 'citing_researcher_id'), on = 'citing_publication_id', how = 'inner').\
    select('researcher_id', 'citing_researcher_id').\
    withColumnRenamed('citing_researcher_id', 'related_researcher_id').\
    dropDuplicates().toPandas()

potential_author_referenced = authorship.\
    join(dimension_ids_df.select('researcher_id'), on = 'researcher_id', how = 'inner').\
    withColumnRenamed('publication_id', 'citing_publication_id').\
    withColumnRenamed('researcher_id', 'citing_researcher_id').\
    join(citations.drop('citation_year'), on = 'citing_publication_id', how = 'inner').\
    join(authorship, on = 'publication_id', how = 'inner').\
    select('researcher_id', 'citing_researcher_id').\
    withColumnRenamed('researcher_id', 'related_researcher_id').\
    withColumnRenamed('citing_researcher_id', 'researcher_id').\
    dropDuplicates().toPandas()


In [108]:
potential_author_citing.to_csv('./data/potential_author_citing.csv', index = False)
potential_author_referenced.to_csv('./data/potential_author_referenced.csv', index = False)

In [121]:
pd.concat([potential_author_citing, potential_author_referenced]).drop_duplicates().to_csv('./data/potential_author_bib.csv', index = False)

In [122]:
potential_author_bib = pd.read_csv('./data/potential_author_bib.csv')

In [125]:
len(potential_author_bib)

43750730