<div id="data-sources-description" style="background-color:#80CDC6 ; padding: 10px 0;">
    <center><h1 style="color:#2F3254; font-weight:bold">PRELIMINARIES</h1></center>
</div>

In [1]:
sc

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1718175985716_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-2>

In [2]:
sc.install_pypi_package('pyarrow')
sc.install_pypi_package('matplotlib')
sc.install_pypi_package('pandas')
sc.install_pypi_package('numpy')
sc.install_pypi_package('langdetect')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pyarrow
  Downloading pyarrow-16.1.0-cp39-cp39-manylinux_2_28_x86_64.whl (40.8 MB)
Collecting numpy>=1.16.6
  Downloading numpy-1.26.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
Installing collected packages: numpy, pyarrow
Successfully installed numpy-1.26.4 pyarrow-16.1.0

Collecting matplotlib
  Downloading matplotlib-3.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (8.3 MB)
Collecting fonttools>=4.22.0
  Downloading fonttools-4.53.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.6 MB)
Collecting pillow>=8
  Downloading pillow-10.3.0-cp39-cp39-manylinux_2_28_x86_64.whl (4.5 MB)
Collecting kiwisolver>=1.3.1
  Downloading kiwisolver-1.4.5-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.6 MB)
Collecting cycler>=0.10
  Downloading cycler-0.12.1-py3-none-any.whl (8.3 kB)
Collecting packaging>=20.0
  Downloading packaging-24.1-py3-none-any.whl (53 kB)
Collecting pyparsing>=2.3.1
  Downloading pyparsing-3.1.2-py3-non

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master("yarn")
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.4", # should match hadoop version
    )  
    .config(
        "spark.executor.extraJavaOptions",
        "-Dcom.amazonaws.services.s3.enableV4=true",
    )
    .config(
        "spark.driver.extraJavaOptions",
        "-Dcom.amazonaws.services.s3.enableV4=true",
    )
    .config(
        "spark.hadoop.fs.s3a.aws.credentials.provider",
        'com.amazonaws.auth.profile.ProfileCredentialsProvider,'
        "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
    )
    .config("spark.sql.repl.eagerEval.enabled", True)  # Enables better visualization of DataFrames in notebooks
    .getOrCreate()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql import functions as F
from langdetect import detect
from langdetect import DetectorFactory
from langdetect import detect_langs
from pyspark.sql.types import StringType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<div id="data-sources-description" style="background-color:#80CDC6 ; padding: 10px 0;">
    <center><h1 style="color:#2F3254; font-weight:bold">DATA CLEANING AND FILTERING</h1></center>
</div>

For this study, only the metadata files were used to implement topic modelling. Each row of the metadata file corresponds to one paper object. For each metadata file, **only the abstracts were analyzed**. The dataset cleaning and filtering process for the abstracts is as follows:

1. **Reading Data:**
The dataset was read from an S3 bucket, specifically from the AI2 Semantic Scholar CORD-19 dataset, which is stored in CSV format. The schema of the dataset was inferred automatically, and headers were used for column names.

2. **Extracting Date Components:**
Two new columns, `year` and `month`, were created by extracting the year and month from the 'publish_time' column. This involved converting the `publish_time` to a date format initially. These information were later used for writing parquet files to improve the execution time of downstream implementations.

3. **Filtering by Year:**
The dataset was filtered to only include documents published between 2018 and 2024, with the intent of partitioning the research narrative into three phases: pre-COVID publications before the pandemic emerged, early COVID publications from the initial outbreak period, and late COVID publications covering research conducted after the pandemic was underway. Dividing the literature this way allows for conducting analysis across the evolving timeline of the COVID-19 crisis. This facilitates studying how scientific understanding progressed from baseline coronavirus knowledge to the first implications of the outbreak to later pandemic research.

4. **Lowercasing and Aliasing:**
The `year`, `month`, `title`, `abstract`, and `journal` columns were converted to lowercase to maintain consistency and avoid duplicates arising from case sensitivity. Each of these fields was then aliased to preserve their original names but in lowercase, enhancing the uniformity of the data.

5. **Null and Quality Checks on Abstracts:**
Entries where the `abstract` was null, less than 100 characters, or contained placeholder texts like 'null' or 'unknown' were excluded. This step ensures that only meaningful and substantial abstracts are retained for analysis.

6. **Title and Date Validity Checks:**
The dataset was further refined by removing any records missing titles, or where `year` or `month` data were missing. This guarantees that the remaining records have complete date and title information.

7. **Removing Duplicates:**
Duplicate entries based on `title` and `abstract` were removed to prevent redundancy in the dataset. This step is crucial for maintaining the integrity of any subsequent analyses.

8. **Caching:**
To optimize performance for downstream operations, the resulting DataFrame was cached. This avoids re-computation of the DataFrame in subsequent actions and speeds up the data processing workflow.

9. **Language Detection and Filtering:**
A custom function `get_most_probable_language` was used to detect the most probable language for each abstract, retaining only those written in English.

10. **Saving as Parquet Files:**
After filtering and cleaning, the dataset was saved as Parquet files, partitioned by year and month. This storage format reduces the file size but also optimizes read and write efficiency when handling large datasets. Note that the partitioning by month was due to the initial intention of implementing topic modelling bi-annually. In the end, the researchers decided to do an annual analysis to capture a broader set of themes.

In [6]:
df_meta = (spark
           .read
           .csv('s3://ai2-semanticscholar-cord-19/????-??-??/metadata.csv',
                header=True, inferSchema=True)
           .withColumn('year', F.year(F.to_date( F.col('publish_time'))))
           .withColumn('month', F.month(F.to_date(F.col('publish_time'))))
           .filter(F.col('year').isin(2018, 2019, 2020, 2021, 2022, 2023,
                                      2024))
           .select(F.lower(F.col('year')).alias('year'),
                   F.lower(F.col('month')).alias('month'),
                   F.lower(F.col('title')).alias('title'),
                   F.lower(F.col('abstract')).alias('abstract'),
                   F.lower(F.col('journal')).alias('journal')
                  )
           .filter((F.col('abstract').isNotNull()) &
                   (F.length(F.col('abstract')) > 100) &
                   (F.col('abstract') != 'null') &
                   (F.col('abstract') != 'unknown') &
                   (F.col('title').isNotNull()) &
                   (F.col('year').isNotNull()) &
                   (F.col('month').isNotNull())
                  )
           .dropDuplicates(['title', 'abstract'])
           .cache()
          )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<div id="data-sources-description" style="background-color:#2F3254 ; padding: 10px 0;">
    <center><h3 style="color:white; font-weight:bold">Select English Abstracts</h3></center>
</div> 

In [7]:
# Reproducibility
DetectorFactory.seed = 42

def get_most_probable_language(abstract):
    """
    Detect and return the most probable language of the abstract.

    Parameters
    ----------
    abstract : str
        The abstract for which to detect the language.

    Returns
    -------
    str
        The most probable language represented by its ISO code.
        Returns 'unknown' if language detection fails.
    """
    try:
        # Get a list of detected languages with their probabilities
        languages = detect_langs(abstract)
        # Sort the list based on probability
        # Then select the most probable language
        if languages:
            most_probable_language = max(languages,
                                         key=lambda lang: lang.prob)
            return most_probable_language.lang
    except:
        return "unknown"

# Register the UDF with a return type of StringType
get_language_udf = F.udf(get_most_probable_language, StringType())

# Apply the UDF to the 'abstract' column to detect languages
df_meta = df_meta.withColumn("language", get_language_udf(F.col("abstract")))

# Only get the abstracts that are in English
df_meta = df_meta.filter(F.col('language') == 'en')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df_abstract.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- journal: string (nullable = true)
 |-- language: string (nullable = true)

<div id="data-sources-description" style="background-color:#2F3254 ; padding: 10px 0;">
    <center><h3 style="color:white; font-weight:bold">Save Filtered Dataset as Parquet Files</h3></center>
</div> 

In [None]:
# Write DataFrame to S3, partitioned by year and month
(df_meta
 .write
 .partitionBy("year", "month")
 .parquet("s3://bdcc-project/parquet/", mode='overwrite'))

<div id="data-sources-description" style="background-color:#2F3254 ; padding: 10px 0;">
    <center><h3 style="color:white; font-weight:bold">Load Final Dataset</h3></center>
</div> 

In [10]:
df_abstract = (spark
               .read
               .parquet('s3://bdcc-project/parquet/',
                        inferSchema=True)
               .cache()
              )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
print('Count of documents per year:')
(df_abstract
 .filter(F.col('year').isNotNull())
 .groupBy('year')
 .count()
 .orderBy('year', ascending=True)
 .show()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count of documents per year:
+----+------+
|year| count|
+----+------+
|2018|  8214|
|2019|  9386|
|2020|289964|
|2021|382923|
|2022|115118|
|2023|    17|
+----+------+