# **Arxiv metadata Analytics with PySpark DF: JSON case study**

### Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark

### Author: Amin Karami (PhD, FHEA)
#### email: amin.karami@ymail.com

In [2]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=fa763dcba2e9509321270d91091e6547068c1b9ad5e70157ec8908e1f013d4a9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [4]:
# import SparkSession (It unifies and replaces the older SQLContext, HiveContext, and SparkConf configurations)

from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()

spark

In [8]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
# Read and Load Data to Spark
import json

df = spark.read.json("/content/drive/MyDrive/Colab Notebooks/arxiv-metadata-oai-snapshot.json")

df.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



In [10]:
# check the partitions // when working with DFs u need to convert to RDD

df.rdd.getNumPartitions()


25

## Question 1: Create a new Schema

In [11]:
from pyspark.sql.types import *

In [15]:
Schema = StructType([
    StructField('authors', StringType(), True),
    StructField('categories', StringType(), True),
    StructField('license', StringType(), True),
    StructField('comments', StringType(), True),
    StructField('abstract', StringType(), True),
    StructField('versions', ArrayType(StringType()), True),
    StructField('id', StringType(),True)

])

print(Schema)

StructType([StructField('authors', StringType(), True), StructField('categories', StringType(), True), StructField('license', StringType(), True), StructField('comments', StringType(), True), StructField('abstract', StringType(), True), StructField('versions', ArrayType(StringType(), True), True), StructField('id', StringType(), True)])


## Question 2: Binding Data to a Schema

In [21]:
df = spark.read.json("/content/drive/MyDrive/Colab Notebooks/arxiv-metadata-oai-snapshot.json", schema = Schema)

df.show()

+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+---------+
|             authors|       categories|             license|            comments|            abstract|            versions|       id|
+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+---------+
|C. Bal\'azs, E. L...|           hep-ph|                NULL|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|0704.0001|
|Ileana Streinu an...|    math.CO cs.CG|http://arxiv.org/...|To appear in Grap...|  We describe a n...|[{"version":"v1",...|0704.0002|
|         Hongjun Pan|   physics.gen-ph|                NULL| 23 pages, 3 figures|  The evolution o...|[{"version":"v1",...|0704.0003|
|        David Callan|          math.CO|                NULL|            11 pages|  We show that a ...|[{"version":"v1",...|0704.0004|
|Wael Abu-Shammala...|  math.CA math.FA|               

## Question 3: Missing values for "comments" and "license" attributes

In [24]:
#drop specific columns
df = df.drop('id')
#drop null values in a specific column
df = df.dropna(subset='comments')
#replace null values in a specific column
df = df.fillna(value='sifico', subset='license')


df.show()


+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             authors|          categories|             license|            comments|            abstract|            versions|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|              hep-ph|              sifico|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
|Ileana Streinu an...|       math.CO cs.CG|http://arxiv.org/...|To appear in Grap...|  We describe a n...|[{"version":"v1",...|
|         Hongjun Pan|      physics.gen-ph|              sifico| 23 pages, 3 figures|  The evolution o...|[{"version":"v1",...|
|        David Callan|             math.CO|              sifico|            11 pages|  We show that a ...|[{"version":"v1",...|
|Y. H. Pong and C....|   cond-mat.mes-hall|              sifico|6 pages, 4 figure...|  We study the tw..

## Question 4: Get the author names who published a paper in a 'math' category

In [32]:
#register DF to be used in SparkSQL

#start by "math"

df.createOrReplaceTempView('Archive')
sql_query = """
              SELECT authors FROM Archive WHERE categories LIKE "math%"

"""

spark.sql(sql_query).show()

spark.sql(sql_query).count()

+--------------------+
|             authors|
+--------------------+
|Ileana Streinu an...|
|        David Callan|
|  Sergei Ovchinnikov|
|Clifton Cunningha...|
|        Koichi Fujii|
|         Norio Konno|
|Simon J.A. Malham...|
|Robert P. C. de M...|
|  P\'eter E. Frenkel|
|          Mihai Popa|
|   Debashish Goswami|
|      Mikkel {\O}bro|
|Nabil L. Youssef,...|
|         Boris Rubin|
|         A. I. Molev|
| Branko J. Malesevic|
|   John W. Robertson|
|     Yu.N. Kosovtsov|
|        Osamu Fujino|
|Stephen C. Power ...|
+--------------------+
only showing top 20 rows



304590

## Question 5: Get linceses with 5 or more letters in the "abstract"

In [38]:
sql_query = """ SELECT distinct(license) FROM Archive
                WHERE abstract REGEXP '%\(([A-Za-z][^_ /\\<>]{5,})\)%'
            """
spark.sql(sql_query).show()

+--------------------+
|             license|
+--------------------+
|http://arxiv.org/...|
|http://creativeco...|
|http://creativeco...|
|http://creativeco...|
|              sifico|
+--------------------+



## Question 6: Extract the statistic of the number of pages for unknown licenses

In [None]:
import re
def get_page(line):
  search= re.findall("\d+ pages", line)
  if search:
    return int(search[0].split(" ")[0]) #return for example "12 pages" then we take the first one by split " "
  else:
    return 0 #rows that have no pages


In [43]:
spark.udf.register("PageNumbers", get_page)

sql_query = """
        SELECT AVG(PageNumbers(comments)) AS AVG, SUM(PageNumbers(comments)) AS SUM FROM Archive
        WHERE license = 'sifico'

"""

spark.sql(sql_query).show()

+------------------+---------+
|               AVG|      SUM|
+------------------+---------+
|13.368011068572079|5642584.0|
+------------------+---------+

