In [1]:
import pandas as pd
import numpy as np
import networkx as nx

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

appName = "papers page rank"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

df = spark.read.json('hep_records.json')
df.show()

+--------------------+--------------------+--------------------+--------------------+-------------+-------------+-----+----------+---------------------+--------------------+
|            abstract|             authors|           citations|          co-authors|creation_date|free_keywords|recid|references|standardized_keywords|               title|
+--------------------+--------------------+--------------------+--------------------+-------------+-------------+-----+----------+---------------------+--------------------+
|Introduction Part...|   [Wong, Yung-Chow]|[452267, 1520868,...|                  []|         1961|           []|    1|        []|                   []|Isoclinic N plane...|
|                    |   [Breymayer, K.E.]|                  []|[Mallory, Kenneth...|      1963-03|           []|   17|        []|                   []|SYSTEM INTERCONNE...|
|                    |[Johnston, Lawren...|           [1374812]|[Murray, Joseph J...|      1977-08|           []|   18|   [48647]|

In [3]:
df.count()

1356014

In [4]:
from pyspark.sql.functions import size
df = df.filter(size('references') > 0)
df = df.filter(size('citations') > 0)
df = df.filter(size('authors') > 0)
df

DataFrame[abstract: string, authors: array<string>, citations: array<bigint>, co-authors: array<string>, creation_date: string, free_keywords: array<string>, recid: bigint, references: array<bigint>, standardized_keywords: array<string>, title: string]

In [5]:
df.count()

745143

In [6]:
df.show()

+--------------------+--------------------+--------------------+--------------------+-------------+-------------+-----+--------------------+---------------------+--------------------+
|            abstract|             authors|           citations|          co-authors|creation_date|free_keywords|recid|          references|standardized_keywords|               title|
+--------------------+--------------------+--------------------+--------------------+-------------+-------------+-----+--------------------+---------------------+--------------------+
|                    |[Johnston, Lawren...|           [1374812]|[Murray, Joseph J...|      1977-08|           []|   18|             [48647]|                   []|A target beam mon...|
|Critical examinat...|   [Noyes, H.Pierre]|[107392, 48129, 7...|                  []|         1963|           []|   51|[47202, 43619, 47...|                   []|Neutron-Proton Sc...|
|                    |      [Berman, S.M.]|[779968, 89700, 5...|       [Drell, S

In [7]:
columns_to_drop = ['free_keywords', 'standardized_keywords', 'abstract', 'co-authors']
df = df.drop(*columns_to_drop)

In [8]:
df.show()

+--------------------+--------------------+-------------+-----+--------------------+--------------------+
|             authors|           citations|creation_date|recid|          references|               title|
+--------------------+--------------------+-------------+-----+--------------------+--------------------+
|[Johnston, Lawren...|           [1374812]|      1977-08|   18|             [48647]|A target beam mon...|
|   [Noyes, H.Pierre]|[107392, 48129, 7...|         1963|   51|[47202, 43619, 47...|Neutron-Proton Sc...|
|      [Berman, S.M.]|[779968, 89700, 5...|         1963|   55|           [1475824]|Coherent Producti...|
|      [Berman, S.M.]|[1431040, 60929, ...|      1963-08|   61|[46912, 46531, 16...|SPECULATIONS ON T...|
|      [Berman, S.M.]|[89093, 80998, 37...|      1963-10|   63|[47712, 21794, 47...|INTERMEDIATE BOSO...|
|    [Brown, Karl L.]|[64130, 48747, 18...|      1964-01|   64|           [1406076]|FIRST AND SECOND ...|
|       [Drell, S.D.]|[62977, 54786, 54...|   

In [9]:
from pyspark.sql.functions import split
split_col = split(df['creation_date'], '-')
df = df.withColumn('creation_date', split_col.getItem(0))

In [10]:
df.show()

+--------------------+--------------------+-------------+-----+--------------------+--------------------+
|             authors|           citations|creation_date|recid|          references|               title|
+--------------------+--------------------+-------------+-----+--------------------+--------------------+
|[Johnston, Lawren...|           [1374812]|         1977|   18|             [48647]|A target beam mon...|
|   [Noyes, H.Pierre]|[107392, 48129, 7...|         1963|   51|[47202, 43619, 47...|Neutron-Proton Sc...|
|      [Berman, S.M.]|[779968, 89700, 5...|         1963|   55|           [1475824]|Coherent Producti...|
|      [Berman, S.M.]|[1431040, 60929, ...|         1963|   61|[46912, 46531, 16...|SPECULATIONS ON T...|
|      [Berman, S.M.]|[89093, 80998, 37...|         1963|   63|[47712, 21794, 47...|INTERMEDIATE BOSO...|
|    [Brown, Karl L.]|[64130, 48747, 18...|         1964|   64|           [1406076]|FIRST AND SECOND ...|
|       [Drell, S.D.]|[62977, 54786, 54...|   

In [11]:
from_year = input("from the year: ")
to_year = input("to the year: ") 

from the year:  2000
to the year:  2001


In [12]:
df_conditional = df.filter(df['creation_date'] >= from_year)
df_conditional = df_conditional.filter(df_conditional['creation_date'] <= to_year)
df_conditional.show()

+--------------------+--------------------+-------------+------+--------------------+--------------------+
|             authors|           citations|creation_date| recid|          references|               title|
+--------------------+--------------------+-------------+------+--------------------+--------------------+
|[Joglekar, Satish...|[552878, 623944, ...|         2000| 42490|[439939, 110023, ...|Understanding of ...|
|[Steinmetz, Matth...|[825312, 660177, ...|         2000| 42492|[413185, 518914, ...|Numerical simulat...|
|     [Schmelzer, I.]|[1664417, 750341,...|         2000| 42497|[467969, 844651, ...|General Ether Theory|
|       [Fiorini, E.]|[551736, 636649, ...|         2000| 42498|[429473, 429192, ...|Weak interaction ...|
|          [Hall, G.]|[1635195, 688045,...|         2000| 42499|[536323, 876390, ...|LHC front end ele...|
|        [Duflot, V.]|            [533188]|         2000| 42501|[1457829, 464028,...|Role of the surfa...|
|     [Garnett, D.R.]|[603137, 575554

In [13]:
df_conditional.count()

38538

In [14]:
df_tmp = df_conditional.select(df_conditional['citations'], df_conditional['recid'])
df_tmp.show()

+--------------------+------+
|           citations| recid|
+--------------------+------+
|[552878, 623944, ...| 42490|
|[825312, 660177, ...| 42492|
|[1664417, 750341,...| 42497|
|[551736, 636649, ...| 42498|
|[1635195, 688045,...| 42499|
|            [533188]| 42501|
|[603137, 575554, ...| 42505|
|[844328, 763385, ...| 42517|
|    [817396, 640518]| 42551|
|[552962, 611844, ...|163205|
|[668137, 541809, ...|444504|
|[627200, 526851, ...|496291|
|[560624, 534825, ...|496425|
|[1645217, 553891,...|496747|
|[523601, 722695, ...|496874|
|[657004, 520429, ...|496972|
|[511331, 531813, ...|497092|
|[501158, 616231, ...|497227|
|[1409935, 884116,...|497879|
|[620641, 507170, ...|498007|
+--------------------+------+
only showing top 20 rows



## 10 papers with highest page rank

In [15]:
def getPairs(row):
    result = []
    for p in row['citations']:
        result.append((row['recid'],p))
    return result
df_tmp2 = df_tmp.rdd.flatMap(getPairs)
df_tmp2 = df_tmp2.map(lambda x : (x[1], x[0])).toDF(("citations", "recid"))
df_tmp2.show()

+---------+-----+
|citations|recid|
+---------+-----+
|   552878|42490|
|   623944|42490|
|   524814|42490|
|   707823|42490|
|   793672|42490|
|   759923|42490|
|   553524|42490|
|   526325|42490|
|  1209143|42490|
|   748220|42490|
|   825312|42492|
|   660177|42492|
|  1333340|42492|
|   856813|42492|
|  1664417|42497|
|   750341|42497|
|   709510|42497|
|   682567|42497|
|   838344|42497|
|   595860|42497|
+---------+-----+
only showing top 20 rows



In [16]:
pd_df = df_tmp2.toPandas()
pd_df

Unnamed: 0,citations,recid
0,552878,42490
1,623944,42490
2,524814,42490
3,707823,42490
4,793672,42490
...,...,...
1509449,1328315,1760627
1509450,1481946,1760636
1509451,1620564,1760643
1509452,1306731,1760645


In [17]:
graph = nx.from_pandas_edgelist(pd_df, 'citations', 'recid')
print(nx.is_directed(graph))

False


In [18]:
pr = nx.pagerank(graph, alpha=0.9)
# pr

In [19]:
sorted_keys = sorted(pr, key=pr.get, reverse=True)
top_10 = []
count = 0
for r in sorted_keys:
    top_10.append((r, pr[r]))
    if count == 10:
        break
    else:
        count += 1
top_10

[(530024, 0.0010789076556183786),
 (541477, 0.0010351781600662637),
 (537586, 0.0010227554633589243),
 (534486, 0.0008838264465870694),
 (560129, 0.0008116378464611918),
 (556767, 0.0006907372821515942),
 (558620, 0.0006459894596109214),
 (526784, 0.0006398876726576157),
 (541364, 0.0005786864311297645),
 (555937, 0.0005601344064497948),
 (549209, 0.0005555322468012821)]

In [20]:
output = []
for item in top_10:
    result = df.where(df.recid == item[0]).select("title")
    tmp = (result.select("title").rdd.flatMap(list).collect()[0], item[0], item[1])
    output.append(tmp)
output

[('Review of particle physics. Particle Data Group',
  530024,
  0.0010789076556183786),
 ('The Sloan Digital Sky Survey: Technical Summary',
  541477,
  0.0010351781600662637),
 ('HERWIG 6: An Event generator for hadron emission reactions with interfering gluons (including supersymmetric processes)',
  537586,
  0.0010227554633589243),
 ('High-energy physics event generation with PYTHIA 6.1',
  534486,
  0.0008838264465870694),
 ('The EvtGen particle decay simulation package',
  560129,
  0.0008116378464611918),
 ('The BaBar detector', 556767, 0.0006907372821515942),
 ('Measurement of the rate of $\\nu_e+d \\to p+p+e^-$ interactions produced by $^8B$ solar neutrinos at the Sudbury Neutrino Observatory',
  558620,
  0.0006459894596109214),
 ('4-D gravity on a brane in 5-D Minkowski space',
  526784,
  0.0006398876726576157),
 ('The Belle Detector', 541364, 0.0005786864311297645),
 ('Evidence for neutrino oscillations from the observation of anti-neutrino(electron) appearance in a anti-

## 10 most cited papers

In [21]:
def getNumOfCite(row):
    return (len(row['citations']),row['recid'])
df_tmp3 = df_tmp.rdd.map(getNumOfCite)
df_tmp3 = df_tmp3.map(lambda x : (x[0], x[1])).toDF(("citations_Num", "recid"))
df_tmp3.show()

+-------------+------+
|citations_Num| recid|
+-------------+------+
|           10| 42490|
|            4| 42492|
|            9| 42497|
|            3| 42498|
|            4| 42499|
|            1| 42501|
|           18| 42505|
|            5| 42517|
|            2| 42551|
|           95|163205|
|            7|444504|
|           54|496291|
|            4|496425|
|            8|496747|
|            5|496874|
|            4|496972|
|            8|497092|
|           10|497227|
|           29|497879|
|           14|498007|
+-------------+------+
only showing top 20 rows



In [22]:
sorted_based_on_cite = df_tmp3.sort(df_tmp3.citations_Num.desc()).collect()
top_10_cited = sorted_based_on_cite[0:10]
top_10_cited

[Row(citations_Num=4602, recid=541477),
 Row(citations_Num=4023, recid=530024),
 Row(citations_Num=3386, recid=537586),
 Row(citations_Num=3167, recid=534486),
 Row(citations_Num=2804, recid=558620),
 Row(citations_Num=2801, recid=526784),
 Row(citations_Num=2515, recid=560129),
 Row(citations_Num=2401, recid=549209),
 Row(citations_Num=2275, recid=546156),
 Row(citations_Num=2231, recid=556767)]

In [23]:
top_10_cited[0][1]
most_cite_output = []
for i in range(10):
    result = df.where(df.recid == top_10_cited[i][1]).select("title")
    tmp = (result.select("title").rdd.flatMap(list).collect()[0], top_10_cited[i][1], top_10_cited[i][0])
    most_cite_output.append(tmp)
most_cite_output

[('The Sloan Digital Sky Survey: Technical Summary', 541477, 4602),
 ('Review of particle physics. Particle Data Group', 530024, 4023),
 ('HERWIG 6: An Event generator for hadron emission reactions with interfering gluons (including supersymmetric processes)',
  537586,
  3386),
 ('High-energy physics event generation with PYTHIA 6.1', 534486, 3167),
 ('Measurement of the rate of $\\nu_e+d \\to p+p+e^-$ interactions produced by $^8B$ solar neutrinos at the Sudbury Neutrino Observatory',
  558620,
  2804),
 ('4-D gravity on a brane in 5-D Minkowski space', 526784, 2801),
 ('The EvtGen particle decay simulation package', 560129, 2515),
 ('Final results from the Hubble Space Telescope key project to measure the Hubble constant',
  549209,
  2401),
 ('A Fundamental relation between supermassive black holes and their host galaxies',
  546156,
  2275),
 ('The BaBar detector', 556767, 2231)]