## Scientific Publications Data Warehouse

Project 1: A Data Cube on top of Delta Lake (ETL)
#### *Purpose*
The purpose is to extract data about scientific publications from JSON data that describe, title, topic, authors, etc., about a large number of papers and populate a data warehouse to issue analytics queries using SQL.

We will use Spark DataFrames to extract and transform the data.

We will also use Spark tables (delta tables) to be used for dimensions and fact tables as will be shown below.

### *DWH Schema*

We will follow the proposed schema as shown:

DBLP Fact Table:
    - Date_ID (FK)
    - Keyword_ID (FK)
    - Type_ID (FK)
    - Publication_ID (FK)
    - Venue_ID (FK)
    - FOS_ID (FK)
    - ORG_ID (FK)
    - Author_ID (FK)
    - Lange_ID (FK)
    - AuthorRank

Keyword Table:
    - ID
    - Text

Type Table:
    - ID
    - Description

Publication Table:
    - ID
    - Title
    - Year
    - PageStart
    - PageEnd
    - DOI
    - PDF
    - URL
    - Abstract
    - IndexedAbstract
    - N_Citation

Venue Table:
    - ID
    - Name
    - City
    - Country

Date Table:
    - ID
    - Year
    - Month
    - Day

Language Table:
    - ID
    - Name

FOS Table:
    - ID
    - Field

ORG Table:
    - ID
    - Name
    - City
    - Country

Author Table:
    - ID
    - FirstName
    - LastName
    - MiddleName


### *Dataset*

The data source is https://www.aminer.org/citation, version 13, as it is the most detailed one in JSON
format. You can also check the schema of the respective data set on the same page under the  "Description" link – note that the schema may not correspond to the schema in the JSON file.


#### Dataschema of V13

_Backed to v11 schema, where id and references are in String form._*


| --- | --- | --- | ---
| Field Name | Field Type | Description | Example
| id | string | paper ID | 43e17f5b20f7dfbc07e8ac6e
| title | string | paper title | Data mining: concepts and techniques
| authors.name | string | author name | Jiawei Han
| authors.org | string | author affiliation | Department of Computer Science, University of Illinois at Urbana-Champaign
| authors.id | string | author ID | 53f42f36dabfaedce54dcd0c
| venue.id | string | paper venue ID | 53e17f5b20f7dfbc07e8ac6e
| venue.raw | string | paper venue name | Inteligencia Artificial, Revista Iberoamericana de Inteligencia Artificial
| year | int | published year | 2000
| keywords | list of strings | keywords | ["data mining", "structured data", "world wide web", "social network", "relational data"]
| fos.name | string | paper fields of study | Web mining
| fos.w | float | fields of study weight | 0.659690857
| references | list of strings | paper references | ["4909282", "16018031", "16159250",  "19838944", ...]
| n_citation | int | citation number | 40829
| page_start | string | page start | 11
| page_end | string | page end | 18
| doc_type | string | paper type: journal, book title... | book
| lang | string | detected language | en
| publisher | string | publisher | Elsevier
| volume | string | volume | 10
| issue | string | issue | 29
| issn | string | issn | 0020-7136
| isbn | string | isbn | 1-55860-489-8
| doi | string | doi | 10.4114/ia.v10i29.873
| pdf | string | pdf URL | //static.aminer.org/upload/pdf/1254/ 370/239/53e9ab9eb7602d970354a97e.pdf
| url | list | external links | ["http://dx.doi.org/10.4114/ia.v10i29.873", "http://polar.lsi.uned.es/revista/index.php/ia/ article/view/479"]
| abstract | string | abstract | Our ability to generate...
| indexed_abstract | dict | indexed abstract | {"IndexLength": 164, "InvertedIndex": {"Our": [0], "ability": [1], "to": [2, 7, ...]}}


### Extract

In [6]:
# let's fetch the data
# !wget https://originalstatic.aminer.cn/misc/dblp.v13.7z
# !7z x dblp.v13.7z


--2023-04-20 12:21:31--  https://originalstatic.aminer.cn/misc/dblp.v13.7z
Resolving originalstatic.aminer.cn (originalstatic.aminer.cn)... 159.27.2.14
Connecting to originalstatic.aminer.cn (originalstatic.aminer.cn)|159.27.2.14|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2568255035 (2.4G) [application/x-7z-compressed]
Saving to: 'dblp.v13.7z'


2023-04-20 12:25:43 (9.82 MB/s) - 'dblp.v13.7z' saved [2568255035/2568255035]


7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=C,Utf16=off,HugeFiles=on,64 bits,16 CPUs AMD Ryzen 9 5900HS with Radeon Graphics         (A50F00),ASM,AES-NI)

Scanning the drive for archives:
1 file, 2568255035 bytes (2450 MiB)

Extracting archive: dblp.v13.7z
--
Path = dblp.v13.7z
Type = 7z
Physical Size = 2568255035
Headers Size = 130
Method = LZMA2:24
Solid = -
Blocks = 1

Everything is Ok

Size:       17352640799
Compressed: 2568255035


In [18]:
# we will read and process the data while cleaning it simultaneously

import json
import ast
import os
# import tqdm.notebook

def process_json(file_name, split_size, output_prefix, offset=0, file_number=0):
    with open(file_name, 'r', encoding='utf-8') as ifh:
        # seek to the second line in the file
        if offset > 0:
            ifh.seek(offset)
        else:
            ifh.seek(1) # skip the first '['
        file_number = file_number
        checkpoint = []
        file_sizes = []
        end_of_file = False
        # we will keep looping until all the lines are read
        while not ifh or not end_of_file:
            file_number += 1
            # json_objects = [ast.literal_eval(build_json_object(ifh)) for _ in tqdm.notebook.tqdm(range(split_size))]
            json_objects = []
            while len(json_objects) < split_size:
                try:
                    json_objects.append(ast.literal_eval(build_json_object(ifh)))
                except:
                    end_of_file = True
                    break # we reached the end of the file
            print(f"Checkpoint {file_number}: {ifh.tell()}, objects processed: {len(json_objects)}")
            # write each json object to a file
            with open(f"{output_prefix}{file_number}.json", 'w', encoding='utf-8') as ofh:
                # this process yields smaller files than using json.dump w/ indent = 4
                for i, json_object in enumerate(json_objects):
                    if i == len(json_objects) - 1:
                        ofh.write(json.dumps(json_object) + "]")
                    elif i == 0:
                        ofh.write('[' + json.dumps(json_object) + ",")
                    else:
                        ofh.write(json.dumps(json_object[0]) + ",")
            # get the size of the file
            file_sizes.append(os.path.getsize(f"{output_prefix}{file_number}.json") / 1024 / 1024)
            checkpoint.append(ifh.tell())
            print(f"Checkpoint {file_number}: {checkpoint[-1]}, objects processed: {len(json_objects)}, size of file {file_number}: {file_sizes[-1]} MB")
            # break # for testing purposes
        print(f"Finished processing {file_name}, {file_number} files created.")
        return checkpoint, file_sizes

def clean_line(line):
    if "NumberInt" in line:
        line = line.replace("NumberInt", "") # NumberInt(123) -> (123)
        line = line.replace("(", '"') # (123) -> "123)
        line = line.replace(")", '"') # "123) -> "123"
    if ": null," in line or ": null" in line:
        line = line.replace("null", '""')
    return line

def build_json_object(fh):
    buffer = ''
    line = fh.readline()
    while line != "},\n":
        if not line:
            print("Reached end of file")
            return buffer[:-2]
        buffer += clean_line(line)
        line = fh.readline()
    buffer += line
    return buffer


In [19]:
# results = process_json('dblpv13.json', 100000, 'clean_dataset/dblpv13_clean_', offset=17260419910, file_number=53)
results = process_json('dblpv13.json', 100000, 'clean_dataset/dblpv13_clean_')

Reached end of file
Reached end of file
Checkpoint 54: 17352640799, objects processed: 54309
Checkpoint 54: 17352640799, objects processed: 54309, size of file 54: 65.9983720779419 MB
Finished processing dblpv13.json, 54 files created.


### Transform

Here we will begin the transformation part of our pipeline. We will use delta tables and pyspark dataframes to do this. There are a few tasks that we must complete:
1. Drop publications with very short titles (one word, empty authors, etc.)
2. Visualize the number of citations
3. ISSN is sometimes filled with wrong values, we can either drop or make an effor to resolve using DOI for instance.
4. Defining the type of publication (journal, book, conference, etc.)
5. Resolving ambiguous author names
6. Resolving ambiguous or abbreviated conference and journal names using DBLP database.
7. Refining venues
8. Author gender
9. H-index of authors
10. Normalization of the field of study

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import warnings
warnings.filterwarnings("ignore")


In [2]:
path_to_data = 'clean_dataset/dblpv13_cleanv2_'


In [3]:
# change the memory size depending
spark = SparkSession.builder \
    .appName("Project1") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse")\
    .config("spark.driver.memory", "16g")\
    .config("spark.executor.memory", "16g")\
    .config("spark.driver.maxResultSize", "16g")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .getOrCreate()

23/04/20 19:59:56 WARN Utils: Your hostname, LAPTOP-5Q7KN03U resolves to a loopback address: 127.0.1.1; using 172.24.98.66 instead (on interface eth0)
23/04/20 19:59:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/jkat/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jkat/.ivy2/cache
The jars for the packages stored in: /home/jkat/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0c69fcaf-37b6-44cf-9e39-575a37f5d324;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 672ms :: artifacts dl 33ms
	:: modules in use:
	io.delta#delta-core_2.12;2.1.0 from central in [default]
	io.delta#delta-storage;2.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicte

23/04/20 20:00:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [42]:
# Function to merge two schemas
def merge_schemas(schema1, schema2):
    merged_fields = {field.name: field for field in schema1}
    for field in schema2:
        if field.name not in merged_fields:
            merged_fields[field.name] = field
    return StructType(sorted(merged_fields.values(), key=lambda field: field.name))

def preprocess_dataframe(df):
    df = df.dropDuplicates()

    df = df.filter(df.title.isNotNull()) \
        .filter(length(df.title) > 5) \
        .filter(~df.title.rlike(".*Editorial.*")) \
        .filter(~df.title.rlike("Forward.*")) \
        .filter(~df.title.rlike(".*Preface.*")) \
        .filter(~df.title.rlike(".*Conference.*")) \
        .filter(~df.title.rlike(".*Proceedings.*")) \
        .filter(~df.title.rlike(".*Symposium.*")) \
        .filter(~df.title.rlike(".*Workshop.*")) \
        .filter(~df.title.rlike(".*Tutorial.*")) \
        .filter(~df.title.rlike(".*Forum.*"))

    df = df.filter(length(df.abstract) > 0)

    df = df.filter(df.issn.isNotNull()) \
        .filter(length(df.issn) > 5)

    df = df.filter(~df.doi.rlike(".*[a-zA-Z]+.*"))

    return df


In [43]:
# We read JSON files with schema inference and preprocess them
json_files = [f"{path_to_data}{i}.json" for i in range(1, 54)]
dataframes = [preprocess_dataframe(spark.read.option("inferSchema", "true").json(file)) for file in json_files]


                                                                                

In [44]:
# Merge schemas from all dataframes
merged_schema = dataframes[0].schema
for df in dataframes[1:]:
    merged_schema = merge_schemas(merged_schema, df.schema)

# Apply the merged schema to all dataframes
dataframes_with_merged_schema = [df.selectExpr(*merged_schema.fieldNames()) for df in dataframes]

# Union all dataframes to create a single dataframe with a consistent schema
combined_df = dataframes_with_merged_schema[0]
for df in dataframes_with_merged_schema[1:]:
    combined_df = combined_df.unionByName(df, allowMissingColumns=True)

# Save the combined dataframe to a Delta table
combined_df.write.format("delta").mode("append").save("clean_dataset/delta/dblpv13")

23/04/20 22:14:39 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.




23/04/20 22:21:15 WARN DAGScheduler: Broadcasting large task binary with size 1985.8 KiB


                                                                                

In [54]:
combined_df = combined_df.withColumn("publication_id", monotonically_increasing_id()) \
    .withColumn("author_id", monotonically_increasing_id()) \
    .withColumn("venue_id", monotonically_increasing_id()) \
    .withColumn("fos_id", monotonically_increasing_id()) \
    .withColumn("org_id", monotonically_increasing_id()) \
    .withColumn("date_id", monotonically_increasing_id()) \
    .withColumn("keyword_id", monotonically_increasing_id()) \
    .withColumn("type_id", monotonically_increasing_id()) \
    .withColumn("lang_id", monotonically_increasing_id())

dblp_fact_table = combined_df.select("date_id", "keyword_id", "type_id", "publication_id", "venue_id",
                                      "fos_id", "org_id", "author_id", "lang_id")

keyword_table = combined_df.select("keyword_id",
                                   col("keywords").alias("text"))


venue_table = combined_df.select("venue_id",
                                 col("venue.name_d").alias("name"),
                                 col("venue.type").alias("type"),
                                 col("venue.raw").alias("raw"),
                                 col("venue._id").alias("vid"))

date_table = combined_df.select("date_id",
                                year("year").alias("year"))

language_table = combined_df.select("lang_id",
                                    col("lang").alias("name"))


fos_table = combined_df.select("fos_id",
                               col("fos").alias("field"))

author_table = combined_df.select("author_id",
                                  col("authors.name").alias("name"),
                                  col("authors.org").alias("org"),
                                  col("authors.gid").alias("gid"),
                                  col("authors.orgid").alias("orgid"))

publication_table = combined_df.select("publication_id",
                                       col("title").alias("name"),
                                       col("abstract").alias("description"),
                                       col("doi").alias("doi"),
                                       col("issn").alias("issn"),
                                       col("isbn").alias("isbn"),
                                       col("url").alias("url"),
                                       col("pdf").alias("pdf"),
                                       col("page_start").alias("page_start"),
                                       col("page_end").alias("page_end"),
                                       col("volume").alias("volume"),
                                       col("issue").alias("issue"),
                                       col("n_citation").alias("n_citation"))


In [55]:
dblp_fact_table.write.format("delta").mode("append").save("clean_dataset/delta/dblp_fact_table")
keyword_table.write.format("delta").mode("append").save("clean_dataset/delta/keyword_table")
venue_table.write.format("delta").mode("append").save("clean_dataset/delta/venue_table")
date_table.write.format("delta").mode("append").save("clean_dataset/delta/date_table")
language_table.write.format("delta").mode("append").save("clean_dataset/delta/language_table")
fos_table.write.format("delta").mode("append").save("clean_dataset/delta/fos_table")
author_table.write.format("delta").mode("append").save("clean_dataset/delta/author_table")
publication_table.write.format("delta").mode("append").save("clean_dataset/delta/publication_table")




23/04/20 22:55:39 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB




23/04/20 23:02:41 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB




23/04/20 23:09:54 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB




23/04/20 23:16:57 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB




23/04/20 23:24:07 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB




23/04/20 23:31:25 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB




23/04/20 23:39:13 WARN DAGScheduler: Broadcasting large task binary with size 1893.1 KiB




23/04/20 23:48:28 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


                                                                                

In [None]:
# load table from delta lake
dblp_fact_table = spark.read.format("delta").load("clean_dataset/delta/dblp_fact_table")
