In [None]:
# test reading one split
df = spark.read.option("multiline", True).json('dbfs:/user/dblpv13/dblpv13.1.json.gz')

In [None]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: string (nullable = true)
 |    |    |-- bio: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- gid: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- name_zh: string (nullable = true)
 |    |    |-- oid: string (nullable = true)
 |    |    |-- oid_zh: string (nullable = true)
 |    |    |-- orcid: string (nullable = true)
 |    |    |-- org: string (nullable = true)
 |    |    |-- org_zh: string (nullable = true)
 |    |    |-- orgid: string (nullable = true)
 |    |    |-- orgs: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- orgs_zh: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- sid: string (nullable = true)
 |-- doi: string (nulla

In [None]:
import pyspark.sql.functions as F
from delta.tables import *

In [None]:
#Select the desired columns
df1 = df.select(F.col('_id').alias('Publication ID'), F.explode(F.col('authors._id')).alias('Author ID'), F.col('authors.name').alias('Name'))
display(df1.limit(10))

Publication ID,Author ID,Name
53e99b16b7602d97023a8dda,53f43cefdabfaeb22f4a6e7a,"List(Wilf R. Lalonde, John R. Pugh)"
53e99b16b7602d97023a8dda,53f63434dabfae44f73fa724,"List(Wilf R. Lalonde, John R. Pugh)"
53e99b16b7602d97023a92dd,5440b88ddabfae7f9b34b97d,"List(Yu Zhang, Dit-Yan Yeung)"
53e99b16b7602d97023a92dd,562c837e45cedb3398c48342,"List(Yu Zhang, Dit-Yan Yeung)"
53e99b16b7602d97023a92e5,53f4386bdabfaee43ec42f51,"List(Kenneth Hoganson, Jon Preston, Mario Guimaraes)"
53e99b16b7602d97023a92e5,53f47082dabfaee0d9c55e3f,"List(Kenneth Hoganson, Jon Preston, Mario Guimaraes)"
53e99b16b7602d97023a92e5,53f45c84dabfaedf43621a85,"List(Kenneth Hoganson, Jon Preston, Mario Guimaraes)"
53e99b16b7602d97023a8f68,5405f608dabfae91d3020bdc,List(John Elliott)
53e99b16b7602d97023a8f69,53f45930dabfaee4dc81cf5c,List(Thomas E. Murray)
53e99b16b7602d97023a8f72,,"List(Bapat, S., J. P. Cohoon)"


In [None]:
df2 = df1.select(F.col('Publication ID').alias('PublicationID'), F.col('Author ID').alias('AuthorID'), F.explode(F.col('Name')).alias('Names'))

In [None]:
df2 = (df2.withColumn('FirstName', F.split(df2['Names'], ' ').getItem(0)) 
       .withColumn('MiddleName_Arr', F.split(df2['Names'], ' ')) 
       .withColumn('LastName', F.reverse(F.split(df2['Names'], ' '))[0])).distinct()

In [None]:
df2 = df2.withColumn('MNArr_Len', F.size('MiddleName_Arr')-2)

In [None]:
df3 = df2.withColumn('MiddleName', F.slice(F.lit(df2.MiddleName_Arr), F.lit(2), F.lit(df2.MNArr_Len)))

In [None]:
display(df3.limit(5))

PublicationID,AuthorID,Names,FirstName,MiddleName_Arr,LastName,MNArr_Len,MiddleName
53e99b16b7602d97023a952c,53f55883dabfae3856f8045b,Cyril Fonlupt,Cyril,"List(Cyril, Fonlupt)",Fonlupt,0,List()
53e99b16b7602d97023a9680,5405856adabfae92b41df473,Klaus Samelson,Klaus,"List(Klaus, Samelson)",Samelson,0,List()
53e99b16b7602d97023a981d,54849e4bdabfae8a11fb2140,Sariel Har-Peled,Sariel,"List(Sariel, Har-Peled)",Har-Peled,0,List()
53e99b16b7602d97023a983d,53f479dbdabfae8a6845c5af,Shao-Hua Tan,Shao-Hua,"List(Shao-Hua, Tan)",Tan,0,List()
53e99b16b7602d97023a924f,53f4c8bcdabfaee57977e37b,Huiya Yan,Huiya,"List(Huiya, Yan)",Yan,0,List()


In [None]:
#MiddleName array to string type
df3 = df3.withColumn("MiddleName", F.concat_ws(",", F.col("MiddleName")))

In [None]:
df4 = df3.drop('Names', 'MiddleName_Arr', 'MNArr_Len')

In [None]:
df5 = df4.withColumn('ID', F.monotonically_increasing_id())

In [None]:
first_filter = df.withColumn("PublicationID", F.col("_id"))

In [None]:
#Join on the original dataframe with PublicationID
last_df = first_filter.join(df5.select('ID', 'PublicationID', 'AuthorID', 'FirstName', 'LastName', 'MiddleName'), on=['PublicationID'])

In [None]:
last_df.write.format("delta").mode("overwrite").saveAsTable("authors_table")