In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType, IntegerType, 
                               FloatType, TimestampType, DateType, ArrayType, BooleanType)
from pyspark.sql.functions import (lit, col, upper, explode, size, array, array_min, array_max, array_except, isnan, row_number, quarter, max)
from pyspark.sql import Window
import os

from datetime import datetime
import numpy as np
from tqdm import tqdm
import random

from IPython.core.display import HTML
#HTML("<style>pre { white-space: pre !important; }</style>")
display(HTML("""<style>
    .output-plaintext, .output-stream, .output{
        white-space: pre !important;
        font-family: Monaco; # Any monospaced font should work
    }</style>"""))
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-memory 12G --executor-memory 12G pyspark-shell'

In [2]:
temp_dir = '/Users/prashantkanth/Desktop/Masters/Semester III/CS543/Project1.nosync/temp'
conf = pyspark.SparkConf().setAll([('spark.driver.memory', '30G'), ('spark.executor.memory', '30G'), ('spark.local.dir', temp_dir)])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

22/11/01 23:50:34 WARN Utils: Your hostname, Prashants-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.38 instead (on interface en0)
22/11/01 23:50:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/01 23:50:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/01 23:50:35 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [2]:
openalex_data = []
raw_path = 'datalake/raw'
for updates in os.listdir(raw_path):
    if updates != '.DS_Store':
        updated_path = os.path.join(raw_path, updates)
        for file in os.listdir(updated_path):
            openalex_data.append(os.path.join(updated_path, file))
print(f'Total files to process: {len(openalex_data)}')
# random.shuffle(openalex_data)
data_size = 0
for file in openalex_data:
    data_size += os.path.getsize(file)
print(f'Total data_size to process: {round(data_size/(1024*1024*1024),4)} GB')

In [None]:
# define json schema
jsonSchema = StructType([
    StructField('id', StringType(), True),
    StructField('display_name', StringType(), True),
    StructField('publication_year', IntegerType(), True),
    StructField('publication_date', DateType(), True),
    StructField('host_venue', StructType([
        StructField('id', StringType(), True),
        StructField('issn_l', StringType(), True),
        StructField('issn', ArrayType(StringType(), True)),
        StructField('display_name', StringType(), True),
        StructField('publisher', StringType(), True),
        StructField('type', StringType(), True),
        StructField('url', StringType(), True),
        StructField('is_oa', BooleanType(), True),
        StructField('version', StringType(), True),
        StructField('license', StringType(), True)
    ])),
    StructField('type', StringType(), True),
    StructField('authorships', ArrayType(StructType([
        StructField('author_position', StringType(), True),
        StructField('author', StructType([
            StructField('id', StringType(), True),
            StructField('display_name', StringType(), True),
            StructField('orcid', StringType(), True)
        ])),
        StructField('institutions', ArrayType(StructType([
            StructField('id', StringType(), True),
            StructField('display_name', StringType(), True),
            StructField('ror', StringType(), True),
            StructField('country_code', StringType(), True),
            StructField('type', StringType(), True)
        ]), True)),
        StructField('raw_affiliation_string', StringType(), True)
    ]), True)),
    StructField('cited_by_count', IntegerType(), True),
    StructField('is_retracted', BooleanType(), True),
    StructField('concepts', ArrayType(StructType([
        StructField('id', StringType(), True),
        StructField('wikidata', StringType(), True),
        StructField('display_name', StringType(), True),
        StructField('level', IntegerType(), True),
        StructField('score', FloatType(), True)
    ]), True)),
    StructField('referenced_works', ArrayType(StringType(), True)),
    StructField('related_works', ArrayType(StringType(), True)),
    StructField('counts_by_year', ArrayType(StructType([
        StructField('year', IntegerType(), True),
        StructField('cited_by_count', IntegerType(), True)
    ]), True)),
    StructField('cited_by_api_url', StringType(), True),
    StructField('updated_date', TimestampType(), True)
])

In [None]:
# read all json using defined schema
works = spark.read.option("multiline", "true")\
            .json(openalex_data, schema = jsonSchema)

In [None]:
#works.groupBy('publication_year').count().collect()
# Drop rows which have publication_year > 2022, this is a known issue with openalex_data ['Questionable Dates'] https://docs.openalex.org/known-issues
invalid_rows = works.filter(col('publication_year') > 2022).count()
print(f'Inavlid_dates count: {invalid_rows}')

# Dropping invalid date rows
works = works.filter(col('publication_year') <= 2022)


In [4]:
# further preprocessing on data
cols = ['id', 'display_name', 'publication_year', 'publication_date', 'host_venue', 
        'type', 'authorships', 'cited_by_count', 'referenced_works', 'counts_by_year']
w = Window.partitionBy('id')
works = works.withColumn('min_level', array_min(array_except(col('concepts.level'), array(lit(0)))))\
           .select(cols+[explode('concepts').alias('concepts')])\
           .withColumn('concept', col('concepts.display_name'))\
           .withColumn('level', col('concepts.level'))\
           .withColumn('score', col('concepts.score'))\
           .drop('concepts')\
           .filter(col('level') == col('min_level'))\
           .withColumn('maxScore', max('score').over(w))\
           .filter((col('maxScore') > 0.0) & (col('score') == col('maxScore')))\
           .drop('maxScore', 'min_level')\
           .dropDuplicates(['id', 'score'])\
           .drop('level', 'score')\
           .filter((col('type').contains('journal')) | (col('type').contains('proceedings')))\
           .withColumn('quarter', quarter(col('publication_date')))

# write to parquet in curated layer
curated = 'datalake/curated_new'
works.write.partitionBy("publication_year", "quarter").parquet(curated)
#works.write.partitionBy(partition_on).parquet(curated)
print("Data written in Parquet format at: {}".format(curated))

In [4]:
read_works = spark.read.parquet('datalake/curated_new')
read_works.printSchema()

root
 |-- id: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- publication_date: date (nullable = true)
 |-- host_venue: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- issn_l: string (nullable = true)
 |    |-- issn: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- display_name: string (nullable = true)
 |    |-- publisher: string (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- url: string (nullable = true)
 |    |-- is_oa: boolean (nullable = true)
 |    |-- version: string (nullable = true)
 |    |-- license: string (nullable = true)
 |-- type: string (nullable = true)
 |-- authorships: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_position: string (nullable = true)
 |    |    |-- author: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- display_name: string (nullable = true)
 |    | 

100

### Rough Work

In [None]:
print(works.filter(col('publication_year') > 2022).count())
works.filter(col('publication_year') <= 2022).groupBy('publication_year').count().collect()
#invalid_dates = works.withColumn('year', year(works.publication_date)).filter(col('year')>2022).count()
#valid_dates = works.withColumn('year', year(works.publication_date)).filter(col('year')<=2022).groupBy('year').count().collect()
#print(f'inavlid count: {invalid_dates}')
#valid_dates

In [None]:
# authorships
authorships = works.select('id','authorships')
authorships.select('id', explode('authorships').alias('authorships'))\
    .select('id', 'authorships.author_position', 'authorships.author', 'authorships.institutions', 'authorships.raw_affiliation_string').show(5, False)

In [None]:
# retrievce latest updated_date record for a work id
def reducer(x, y):
    #X_date = datetime.strptime(x.updated_date, '%Y-%m-%dT%H:%M:%S.%f')
    #Y_date = datetime.strptime(y.updated_date, '%Y-%m-%dT%H:%M:%S.%f')
    if x.updated_date > y.updated_date:
        return x
    else:
        return y

In [None]:
# process files in chunks, remove duplicates if any based on updated_date
chunks = np.linspace(0, len(openalex_data), len(openalex_data)//6, dtype=int)
from_chunk = chunks[:-1]
to_chunk = chunks[1:]
assert len(from_chunk) == len(to_chunk)
for i in tqdm(range(len(chunks)-1),):
    # read json file in chunks
    works = spark.read.option("multiline","true")\
            .json(openalex_data[from_chunk[i]:to_chunk[i]], schema = jsonSchema)
    print(f'row_count[{from_chunk[i]}:{to_chunk[i]}]: {works.count()}')
    # get column names of dataframes
    cols = works.columns
    # retrievce latest updated_date record for a work id
    works = works.rdd.map(lambda x: (x.id, x))\
            .reduceByKey(lambda x,y: reducer(x,y))\
            .map(lambda x: x[1])\
            .toDF(jsonSchema)
    print(f'After merging dups => row_count[{from_chunk[i]}:{to_chunk[i]}]: {works.count()}')

In [None]:
dt_obj = datetime.strptime("2022-06-11T18:52:04.838486", '%Y-%m-%dT%H:%M:%S.%f')
print(dt_obj)

In [None]:
dt_obj2 = datetime.strptime("2022-06-12T18:52:04.838486", '%Y-%m-%dT%H:%M:%S.%f')
print(dt_obj2)
if dt_obj2 > dt_obj:
    print('success')

In [None]:
# Null/Nan Values
works.select(*[
    (
        count(when(col(c).isNull(), c)) if t in ("timestamp", "date", "boolean") or 'struct' in t
        else (count(when(size(col(c))==0, c)) if 'array' in t
        else count(when((isnan(c) | col(c).isNull()), c)))
    ).alias(c)
    for c, t in works.dtypes
]).show()

In [None]:
# change blank array columns to null
for c in works.dtypes:
    if "array" in c[1]:
        works = works.withColumn(c[0], when((size(col(c[0])) == 0), lit(None)).otherwise(col(c[0])))