In [1]:
import pyspark
from pyspark .sql import SparkSession
spark = SparkSession.builder.appName("app1").getOrCreate()
from pyspark import SparkFiles

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/05/18 23:57:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/18 23:57:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df_journal = spark.read.format("csv") \
 .option("header", "true") \
 .load('journal_information.csv')

                                                                                

In [3]:
df_large = spark.read.format("json") \
 .option("header", "true") \
 .load('large.json.gz')

                                                                                

In [4]:
df_journal

DataFrame[Journal Name: string, ISSN: string, EISSN: string, Category & Journal Quartiles: string, Citations: string, JCI: string, percentageOAGold: string, IF: string]

In [5]:
df_large

DataFrame[authors: array<struct<authorId:string,name:string>>, citationcount: bigint, corpusid: bigint, externalids: struct<ACL:string,ArXiv:string,CorpusId:string,DBLP:string,DOI:string,MAG:string,PubMed:string,PubMedCentral:string>, influentialcitationcount: bigint, isopenaccess: boolean, journal: struct<name:string,pages:string,volume:string>, publicationdate: string, publicationtypes: array<string>, publicationvenueid: string, referencecount: bigint, s2fieldsofstudy: array<struct<category:string,source:string>>, title: string, url: string, venue: string, year: bigint]

In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def filtered_df(df):
    return df.filter(F.col("journal.name").isNotNull()).where(df['journal.name']!= "")

def filtered_journal(df):
    return df.withColumn("IF",df["IF"].cast(DoubleType()))

#UDF to extract authors 
def extract_authors(df):
    return df.select(F.explode('authors').alias('author'),"journal")

#UDF to count num of authors
def count_authors(df):
    return df.withColumn("num_authors", F.size(F.col("authors")))

In [7]:
#Programmatically confirm that all papers have unique IDs and output the number of papers in the file.

def checkForDuplicates(df):
    if df.select("corpusid").distinct().count() == df.count():
        print("All papers have unique IDs.", df.count())
    else:
        print("There are papers with non-unique IDs.")

checkForDuplicates(df_large)

[Stage 11:>                                                         (0 + 1) / 1]

All papers have unique IDs. 150000


                                                                                

In [8]:
#What is the average number of authors per paper?

# apply the UDF function to the dataframe
df = count_authors(df_large)

# calculate the average number of authors per paper
avg_authors_per_paper = df.agg(F.avg("num_authors")).first()[0]

print("Average number of authors per paper:", avg_authors_per_paper)

[Stage 14:>                                                         (0 + 1) / 1]

Average number of authors per paper: 2.81628


                                                                                

In [10]:
filtered_df = filtered_df(df_large)
num_journals = filtered_df.select(F.col("journal.name")).distinct().count()

# Print the result
print("Number of distinct journals:", num_journals)

[Stage 17:>                                                         (0 + 1) / 1]

Number of distinct journals: 33916


                                                                                

In [11]:
papers_rdd = filtered_df.rdd
journal_names_rdd = papers_rdd.map(lambda paper: paper['journal']['name'])
unique_journal_names_count = journal_names_rdd.distinct().count()

# Print the result
print(f"Number of different journals: {unique_journal_names_count}")

[Stage 23:>                                                         (0 + 1) / 1]

Number of different journals: 33916


                                                                                

In [16]:
authors_df = extract_authors(df_large)
authors = authors_df.groupBy('author.authorid', 'author.name').agg(F.count('*').alias('publications')).orderBy(F.desc('publications'))

# Combine authorid and name back to dictionary format
authors = authors.select(F.struct(authors['authorid'], authors['name']).alias('author'), 'publications').limit(5)

# Display the result
authors.show(5)



+--------------------+------------+
|              author|publications|
+--------------------+------------+
|{2149377746, B. N...|          23|
|{90537224, S. Suk...|          16|
|{88842366, Z. Sor...|          16|
|{49898687, M. Kumar}|          15|
|   {null, Anonymous}|          10|
+--------------------+------------+



[Stage 29:>                                                         (0 + 1) / 1]

In [17]:
paper_df = df_large
journal_df = filtered_journal(df_journal)

# Join paper and journal dataframes
joined_df = paper_df.join(journal_df, paper_df["journal.name"] == journal_df["Journal Name"], "left")

# Group by author and calculate the cumulative impact factor
author_impact_factor = joined_df.withColumn("impact_factor_contribution", F.coalesce(joined_df["IF"], F.lit(0))) \
                                .select(F.explode('authors').alias('author'), 'impact_factor_contribution') \
                                .groupBy("author.authorId", "author.name") \
                                .agg(F.sum("impact_factor_contribution").alias("cumulative_impact_factor")) \
                                .orderBy(F.desc("cumulative_impact_factor")) \
                                .limit(5) \
                                .select(F.struct("authorId", "name").alias("author"), "cumulative_impact_factor")

# Display the result
author_impact_factor.show()



+--------------------+------------------------+
|              author|cumulative_impact_factor|
+--------------------+------------------------+
|{2155504929, Ying...|                  93.832|
|{144797099, M. Vi...|                  92.238|
|{5152451, L. Andr...|                  92.238|
| {49900836, H. Wood}|                  90.422|
|{7695437, A. M. R...|                  87.899|
+--------------------+------------------------+



[Stage 29:>                                                         (0 + 1) / 1]

In [20]:
# Paper information DataFrame
paper_df = df_large.select("corpusid","journal.name","year").withColumn("journal_name", F.trim(F.col("name"))).filter(F.col("journal_name")!="")

# Journal information DataFrame
journal_df = df_journal.select("Journal Name","IF").withColumn("Journal Name", F.trim(F.col("Journal Name"))).filter(F.col("Journal Name")!="")

# Filter papers with impact factor > 1
filtered_df = paper_df.join(journal_df, paper_df['journal_name'] == journal_df['Journal Name'], "inner")
filtered_df = filtered_df.filter((F.col("IF").cast(FloatType())) > 1)

# Extract publication year from publicationdate column
#filtered_df = filtered_df.withColumn("publication_year", F.year(F.col("publicationdate")))

# Count publications for each year between 2010-2020
publications_per_year = filtered_df.filter(F.col("year").between(2010, 2020)) \
                                        .groupBy("year") \
                                        .agg(F.countDistinct("corpusid")) \
                                        .orderBy("year")

# Display the result
publications_per_year.show()

[Stage 29:>                 (0 + 1) / 1][Stage 47:>                 (0 + 1) / 1]

+----+---------------+
|year|count(corpusid)|
+----+---------------+
|2010|            112|
|2011|            139|
|2012|            165|
|2013|            178|
|2014|            241|
|2015|            243|
|2016|            283|
|2017|            329|
|2018|            365|
|2019|            396|
|2020|            444|
+----+---------------+



[Stage 29:>                                                         (0 + 1) / 1]