# <center style="color: #003b7f"> From Publication to Citation: Understanding the Factors <br/> Driving Research Recognition  </center> <a class='tocSkip'>
    
**MSDS 2023 Term 4 SLT1** | Loraine Menorca, BJ Enrik Yepes, Gregory del Carmen, Rozz Banquerigo

This notebook is dedicated to the details of the data preprocessing steps done and serves as a supplementary to the main report of this project.
    
**! Note:** This was run using PySpark via AWS EMR studio. Rerunning the entire notebook outside a similar environment is not advisable.
    
***

# Introduction

This notebook focuses on data preprocessing, a crucial step in preparing data for further analysis or modeling. The notebook provides a step-by-step guide to transform and clean the data to ensure its quality and suitability for downstream tasks. It is important to note that running this notebook may require a significant amount of time, so it is advised to exercise caution.

# Initializing Spark

Before proceeding, since this notebook is executed in a PySpark kernel within an AWS EMR workspace, we initialize PySpark by calling the spark object.

In [1]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1685269347426_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7ff87b5ca690>

In [8]:
sc.install_pypi_package('numpy')
sc.install_pypi_package('pandas')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Collecting pandas
  Downloading pandas-1.3.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.3 MB)
Collecting python-dateutil>=2.7.3
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.3.5 python-dateutil-2.8.2



# Data Preprocessing

## Loading the Data

The notebook starts by loading the dataset into a PySpark DataFrame. This involves initializing the SparkSession (which we have already done earlier) and reading the data from parquet files, which provide efficient columnar storage.

The dataset is loaded from the following location: s3://s3bucketemr/crossref.parquet/. These parquet files have been transformed from the original JSON files to optimize loading speed by leveraging the columnar format of parquet files.

In [173]:
fpath = 's3://s3bucketemr/crossref.parquet/'

df_all = (spark.read.parquet(
    fpath, header=True,
    inferSchema=True).select('col.*'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here is the schema of the raw files. As you can observe, the dataframe is highly nested. To carry out effective machine learning tasks, we must initially choose the relevant columns and perform data cleaning on the selected columns.

In [6]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- DOI: string (nullable = true)
 |-- ISBN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ISSN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- URL: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- accepted: struct (nullable = true)
 |    |-- date-parts: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: long (containsNull = true)
 |-- alternative-id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- approved: struct (nullable = true)
 |    |-- date-parts: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: long (containsNull = true)
 |-- archive: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- article-number: string (nullable = true)
 |-- assertion: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- URL

## Feature Selection

In this step, we retrieve the dataframe once again, but this time with a careful selection of pertinent features and target tailored for our machine learning objectives.

In [233]:
fpath = 's3://s3bucketemr/crossref.parquet/'

selected_columns = [
    'abstract', 'title', 'author.given', 'container-title', 'published.date-parts', 'subject',
    'publisher', 'is-referenced-by-count'
]

df_project = (spark.read.parquet(fpath,
                                 header=True,
                                 inferSchema=True)
                   .select('col.*').select(selected_columns))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data Cleaning

The subsequent sequence of actions encompasses essential data cleaning processes, encompassing operations such as data transformation, feature extraction/engineering, and null value imputation.

### Data Transformation

Furthermore, certain features in our dataset are currently represented as lists containing textual information. To simplify our machine learning tasks, we aim to convert these features into individual strings. This transformation will allow us to subsequently tokenize and vectorize the text as part of our machine learning pipeline.

In [234]:
from pyspark.sql import functions as F

# Extract abstract as a feature
abstract_feature = df_project['abstract']

# Extract given as a feature
title_feature = F.concat_ws(" ", df_project['title']).alias('title')

# Extract given as a feature
given_feature = F.concat_ws(" ", df_project['given']).alias('given')

# Extract container-title as a feature
container_title_feature = F.concat_ws(" ", df_project['container-title']).alias('container_title')

# Extract date-parts as a feature
date_parts_feature = df_project['date-parts']

# Extract subject as a feature
subject_feature = df_project['subject']

# Extract abstract as a feature
publisher_feature = df_project['publisher']

reference_target = df_project['is-referenced-by-count']

# Select the desired columns from the DataFrame
preprocessed_df = df_project.select(abstract_feature, title_feature, given_feature, container_title_feature,
                                    date_parts_feature, subject_feature, publisher_feature, reference_target)

# Concatenate the subject values into a single string separated by commas
preprocessed_df = preprocessed_df.withColumn("subject", F.concat_ws(", ", "subject"))

# Remove the "<jats:p>" tags from the abstract column using regular expression replacement
preprocessed_df = preprocessed_df.withColumn("abstract", F.regexp_replace("abstract", "<jats:p>", ""))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Feature Extraction

Additionally, the date column in our dataset is currently stored as a list. In order to improve the compatibility with our machine learning model, we extract specific date features such as the year, month, and day. This extraction will enhance the effectiveness of our model in processing temporal information.

In [235]:
# Extract the year, month, and day from the date-parts column and convert them to integers
preprocessed_df = (preprocessed_df.withColumn("year", F.col("date-parts")[0][0].cast("int"))
                                 .withColumn("month", F.col("date-parts")[0][1].cast("int"))
                                 .withColumn("day", F.col("date-parts")[0][2].cast("int"))
                                 .drop("date-parts"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Handling Missing Values

Missing values are a common occurrence in real-world datasets and can pose challenges for data analysis and modeling tasks. In this section, we will discuss various techniques for handling missing values in the dataset to ensure the integrity and accuracy of our analysis.

In the missing value handling phase, we address the null values present in the dataset. The process is as follows:

1. Removing Null Values:
   - Rows with null values in the "abstract" column are removed to ensure complete and reliable data for analysis.
   - Additionally, other columns such as author names are filled with blank values to maintain consistency across the dataset.

2. Handling Missing Date Values:
   - The columns "year," "month," and "day" may contain missing values.
   - To simplify the process, missing values in these columns are imputed with the mean value.
   - Although the ideal approach would be to use the mode (most frequent value), computing the mode in a dataset of this size proves to be time-consuming.
 
This phase focuses solely on addressing and managing missing values, ensuring that the dataset is ready for subsequent analysis and modeling tasks.

In [236]:
# Drop rows with null values in the "abstract" column
preprocessed_df = preprocessed_df.dropna(subset=["abstract"])

# Fill null values in specific columns with empty strings
preprocessed_df = preprocessed_df.fillna('', subset=["given", "container_title", "subject", "publisher"])

# Reorder the DataFrame
preprocessed_df = preprocessed_df.select('abstract', 'title', 'given', 'container_title', 'subject',
                                         'publisher', 'year', 'month', 'day', 'is-referenced-by-count')

# Fill the null values in year, month, and day columns with their respective means
preprocessed_df = preprocessed_df.fillna({'year': int(preprocessed_df.select(F.mean('year')).first()[0]),
                                          'month': int(preprocessed_df.select(F.mean('month')).first()[0]),
                                          'day': int(preprocessed_df.select(F.mean('day')).first()[0])})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Exploring the Dataset

To gain an initial understanding of the dataset, we examine its structure by displaying the first 20 rows. This provides us with a glimpse of the data and helps us identify any potential issues or inconsistencies.

Here are the first 20 rows of the dataset:

In [237]:
preprocessed_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----+---+----------------------+
|            abstract|               title|               given|     container_title|             subject|           publisher|year|month|day|is-referenced-by-count|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----+---+----------------------+
|В статті ми намаг...|СВОБОДА ТА НЕСВОБ...|    Іван Зоряна Анна|      Молодий вчений|                    |Publishing House ...|2021|    5| 31|                     0|
|Seat belt breast ...|Seat belt breast ...|George Stephen Gu...|    BMJ Case Reports|    General Medicine|                 BMJ|2021|    7| 14|                     0|
|There is a necess...|Optimization of A...|               Mbika|            Energies|Energy (miscellan...|             MDPI AG|2021|    7| 21|                     1|
|Rec

After performing the necessary data preprocessing steps, we can now examine the schema of our simplified dataset. The schema provides information about the structure and data types of each column in our DataFrame.

Here is the schema of our processed dataset:

In [238]:
preprocessed_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- abstract: string (nullable = true)
 |-- title: string (nullable = false)
 |-- given: string (nullable = false)
 |-- container_title: string (nullable = false)
 |-- subject: string (nullable = false)
 |-- publisher: string (nullable = false)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- is-referenced-by-count: long (nullable = true)

# Checking Data Quality

To ensure the overall data quality of our dataset, we performed a check for null values. We are pleased to report that after the data preprocessing steps, our dataset is now clean and ready for analysis. There are no null values present in any of the columns.

This ensures that we have reliable and complete data to work with, enabling us to proceed with our analysis confidently.

In [239]:
from pyspark.sql.functions import col, sum as spark_sum

# Count the null values in each column
null_counts = preprocessed_df.select([spark_sum(col(column).isNull().cast('int')).alias(column) for column in preprocessed_df.columns])

# Show the null value counts
null_counts.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+-----+---------------+-------+---------+----+-----+---+----------------------+
|abstract|title|given|container_title|subject|publisher|year|month|day|is-referenced-by-count|
+--------+-----+-----+---------------+-------+---------+----+-----+---+----------------------+
|       0|    0|    0|              0|      0|        0|   0|    0|  0|                     0|
+--------+-----+-----+---------------+-------+---------+----+-----+---+----------------------+

In order to understand the structure and size of our dataset, we examined its shape. The shape of a dataset refers to the number of columns and rows it contains. By performing this analysis, we can gain insights into the dimensions of our data.

After evaluating the shape of our dataset, we determined that it consists of x columns and y rows. To conveniently store this information, we have created a dataframe that captures the dataset shape.

Understanding the shape of our dataset is crucial as it provides a foundational understanding of its structure, enabling us to effectively analyze and manipulate the data.

In [240]:
num_rows = preprocessed_df.count()  # Count the number of rows
num_cols = len(preprocessed_df.columns)  # Get the number of columns

print("Number of rows: ", num_rows)
print("Number of columns: ", num_cols)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of rows:  19351793
Number of columns:  10

In [241]:
# Create a DataFrame with the number of rows and columns
shape_data = [(num_rows, num_cols)]
shape_schema = "num_rows LONG, num_cols INT"
shape_df = spark.createDataFrame(shape_data, shape_schema)

# Show the shape DataFrame
shape_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------+
|num_rows|num_cols|
+--------+--------+
|19351793|      10|
+--------+--------+

# Saving the Data

To avoid repeating the entire preprocessing steps in our main notebook, we made the decision to save the cleaned dataset. By doing so, we can simply load this preprocessed data in our main notebook, saving computational time and effort.

Furthermore, we also saved other auxiliary dataframes to facilitate further analysis. One such dataframe is the null counts dataframe, which provides valuable information about the number of null values in each column of our dataset. This can be useful for understanding data completeness and identifying areas that may require further attention.

Additionally, we saved the dataframe shape, which captures the number of columns and rows in our preprocessed dataset. Having this information readily available can help us gain a quick overview of the dataset's dimensions without the need for extensive computations.

By saving the preprocessed dataset, null counts dataframe, and dataframe shape, we streamline our workflow and make it more efficient when working with the data in our main notebook.

In [248]:
# sc.install_pypi_package('fsspec')
#sc.install_pypi_package('s3fs ')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [254]:
# Define the output paths for saving the dataframes
output_path_preprocessed = "s3://s3bucketemr/cleaned/preprocessed.parquet"
output_path_null_counts = "s3://s3bucketemr/cleaned/null_counts.csv"
output_path_shape = "s3://s3bucketemr/cleaned/df_shape.csv"

# Save preprocessed_df as parquet
preprocessed_df.write.parquet(output_path_preprocessed)

# Save null_counts as CSV
null_counts.toPandas().to_csv(output_path_null_counts, header=True, index=False)

# Save shape_df as CSV
shape_df.toPandas().to_csv(output_path_shape, header=True, index=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Validating Data Persistence: Verifying Successful Saving of Data in S3

Finally, we perform a crucial step to ensure the successful saving of our data in the S3 bucket. We load the cleaned data from the saved location and validate the results. By successfully loading all the data we previously saved in our S3 bucket, we can confidently confirm that the entire process was successful.

In [255]:
fpath_data = 's3://s3bucketemr/cleaned/preprocessed.parquet'

df_cleaned = (spark.read.parquet(
    fpath_data, header=True,
    inferSchema=True))

df_cleaned.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----+---+----------------------+
|            abstract|               title|               given|     container_title|             subject|           publisher|year|month|day|is-referenced-by-count|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----+---+----------------------+
|This paper explor...|Microbial contami...|   M. J. S. M. H. O.|  Hydrology Research|Water Science and...|      IWA Publishing|2013|    1| 23|                    15|
|Our favorite less...|The Back Page: My...|      Alison L. Mike|The Mathematics T...|                    |National Council ...|2013|   12| 14|                     0|
|<jats:title>SUMMA...|Human monoclonal ...|  K M DA SILVA K A B|Clinical and Expe...|Immunology, Immun...|Oxford University...|1995|    8| 14|                    10|
|A m

In [257]:
fpath_nullcount = 's3://s3bucketemr/cleaned/null_counts.csv'

df_null = (spark.read.csv(
    fpath_nullcount, header=True,
    inferSchema=True))

df_null.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+-----+---------------+-------+---------+----+-----+---+----------------------+
|abstract|title|given|container_title|subject|publisher|year|month|day|is-referenced-by-count|
+--------+-----+-----+---------------+-------+---------+----+-----+---+----------------------+
|       0|    0|    0|              0|      0|        0|   0|    0|  0|                     0|
+--------+-----+-----+---------------+-------+---------+----+-----+---+----------------------+

In [258]:
fpath_shape = 's3://s3bucketemr/cleaned/df_shape.csv'

df_shapedf = (spark.read.csv(
    fpath_shape, header=True,
    inferSchema=True))

df_shapedf.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------+
|num_rows|num_cols|
+--------+--------+
|19351793|      10|
+--------+--------+