In [1]:
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import array, udf, col, explode, concat_ws, collect_list, sort_array, count, size, coalesce, expr, regexp_replace, array_contains
spark= SparkSession.builder.config('spark.ui.port','4040').getOrCreate()

In [2]:
spark.conf.set('spark.sql.caseSensitive', 'True') 

In [3]:
WoS = spark.read.format("parquet").load("/WoSraw_2020_all/parquet/part-00826-5476e87f-146e-4916-913b-522638c0d728-c000.snappy.parquet")

In [20]:
WoS = WoS3.filter(WoS3.static_data.summary.pub_info._pubyear.contains('2020'))
#WoS.count()
#WoS = WoS2.filter(WoS2.static_data.fullrecord_metadata.addresses.address_name.address_spec.organizations.organization[1].contains('Indiana'))

In [13]:
WoS2.printSchema()

root
 |-- UID: string (nullable = true)
 |-- _r_id_disclaimer: string (nullable = true)
 |-- dynamic_data: struct (nullable = true)
 |    |-- cluster_related: struct (nullable = true)
 |    |    |-- identifiers: struct (nullable = true)
 |    |    |    |-- identifier: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |    |    |-- _type: string (nullable = true)
 |    |    |    |    |    |-- _value: string (nullable = true)
 |    |-- ic_related: struct (nullable = true)
 |    |    |-- oases: struct (nullable = true)
 |    |    |    |-- _count: long (nullable = true)
 |    |    |    |-- _is_OA: string (nullable = true)
 |    |    |    |-- oas: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |    |    |-- _type: string (nullable = true)
 |-- static_data: str

In [4]:
wos = WoS.select(WoS.UID.alias("wosID"),
                 WoS.static_data.summary.pub_info._pubyear.alias("Year"),
                 WoS.static_data.summary.pub_info._pubtype.alias("publicationType"),
                 WoS.static_data.summary.doctypes.doctype[0].alias("documentType"),
                 WoS.static_data.summary.publishers.publisher.names["name"]["full_name"].alias("publisher"),
                 WoS.static_data.summary.publishers.publisher.names["name"].full_name.alias("publicationName"),
                 WoS.static_data.summary.pub_info.page._page_count.alias("pageCount"),
                 concat_ws(', ', WoS.static_data.fullrecord_metadata.fund_ack.fund_text.p).alias("fundingText"), 
                 concat_ws(', ', WoS.static_data.fullrecord_metadata.addresses.address_name.names["name"][0].full_name).alias("reprintAuthor"),
                 WoS.static_data.fullrecord_metadata.reprint_addresses.address_name.address_spec.full_address[0].alias("reprintAuthorAddress"),
                 WoS.static_data.fullrecord_metadata.addresses.address_name.address_spec['full_address'].alias('full_address')
                            )              
wos = wos.withColumn("full_address", concat_ws(", ",  col("full_address")))                                                          
                                                                  
wos.show(5)     

+-------------------+----+---------------+------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|              wosID|Year|publicationType|documentType|           publisher|     publicationName|pageCount|         fundingText|       reprintAuthor|reprintAuthorAddress|        full_address|
+-------------------+----+---------------+------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|WOS:000591598700044|2021|        Journal|     Article|CROATIAN METALLUR...|CROATIAN METALLUR...|        4|                    |        Baisanov, A.|Chem & Met Inst, ...|Chem & Met Inst, ...|
|WOS:000575759200001|2021|        Journal|     Article|FEDERACION ESPANO...|FEDERACION ESPANO...|        5|                    |Barresi Leloutre,...|Global Experience...|Global Experience...|
|WOS:000596077600005|2021|        Journa

In [72]:
wos_test =  WoS.select(WoS.UID.alias("wosID"),
                       WoS.static_data.fullrecord_metadata.addresses.address_name.address_spec.organizations.organization[1].alias("org"))

wos_test2 = wos_test.withColumn("org1", wos_test["org"].getItem(0))
                            
wos_test2 = wos_test2.withColumn("org2", wos_test["org"].getItem(1))

wos_test2 = wos_test2.withColumn("org3", wos_test["org"].getItem(2))

wos_test2 = wos_test2.withColumn("org4", wos_test["org"].getItem(3))

wosEorg = wos_test2.withColumn("org1", explode(array("org1"))).select("*", col("org1")["_VALUE"].alias("Eorg1"))                       
wosEorg = wosEorg.withColumn("org2", explode(array("org2"))).select("*", col("org2")["_VALUE"].alias("Eorg2"))
wosEorg = wosEorg.withColumn("org3", explode(array("org3"))).select("*", col("org3")["_VALUE"].alias("Eorg3"))                                                                           
wosEorg = wosEorg.withColumn("org4", explode(array("org4"))).select("*", col("org4")["_VALUE"].alias("Eorg4"))                           



wosEorg = wosEorg.select('wosID', 'Eorg1', 'Eorg2', 'Eorg3', 'Eorg4')


#wosEorg = wosEorg.filter(col("Eorg").like('Indiana%'))

wosEorg.show(50, truncate = True)

+-------------------+--------------------+--------------------+--------------------+-----+
|              wosID|               Eorg1|               Eorg2|               Eorg3|Eorg4|
+-------------------+--------------------+--------------------+--------------------+-----+
|WOS:000591598700044|Karaganda State T...|Karaganda Technic...|                null| null|
|WOS:000575759200001|  Inst Catalan Salud|                null|                null| null|
|WOS:000596077600005|                null|                null|                null| null|
|WOS:000593375100004| Fed Univ Piaui UFPI|Universidade Fede...|                null| null|
|WOS:000596884600012|                null|                null|                null| null|
|WOS:000555999800001|                null|                null|                null| null|
|WOS:000570974900042|         URI Erechim|Universidade Regi...|                null| null|
|WOS:000575750400001|        Univ Granada|University of Gra...|                null| null|

In [8]:
#BUILD ISSN/EISSN/DOI TABLE

wosIdPlus = WoS.select(WoS.UID.alias("wosID"),
                       WoS.dynamic_data.cluster_related.identifiers.identifier.alias('ids'))

wosIdPlus2 = wosIdPlus.withColumn("ids", explode("ids")).select("*", col("ids")["_type"].alias("altID"))

wosIdDf = wosIdPlus.select(wosIdPlus.wosID.alias("wosID"))

wosIdPlus3 = wosIdPlus2.select('wosID', 'ids.*')



issn   = wosIdPlus3.filter(wosIdPlus3._type == "issn")
issn2  = issn.groupBy("wosID").agg(sort_array(collect_list("_value")).alias("_value"))
issn3  = issn2.withColumn("_value", concat_ws("| ", issn2._value))
issn4  = issn3.select(col('wosID').alias("wosID"), col('_value').alias('issn'))

doi   = wosIdPlus3.filter(wosIdPlus3._type == "doi")
doi2  = doi.groupBy("wosID").agg(sort_array(collect_list("_value")).alias("_value"))
doi3  = doi2.withColumn("_value", concat_ws("| ", doi2._value))
doi4   = doi3.select(col('wosID').alias('wosID'), col('_value').alias('doi'))

eissn   = wosIdPlus3.filter(wosIdPlus3._type == "eissn")
eissn2  = eissn.groupBy("wosID").agg(sort_array(collect_list("_value")).alias("_value"))
eissn3  = eissn2.withColumn("_value", concat_ws("| ", eissn2._value))
eissn4  = eissn3.select(col('wosID').alias('wosID'), col('_value').alias('eissn'))

isbn   = wosIdPlus3.filter(wosIdPlus3._type == "isbn")
isbn2  = isbn.groupBy("wosID").agg(sort_array(collect_list("_value")).alias("_value"))
isbn3  = isbn2.withColumn("_value", concat_ws("| ", isbn2._value))
isbn4  = isbn3.select(col('wosID').alias('wosID'), col('_value').alias('isbn'))    

pmid   = wosIdPlus3.filter(wosIdPlus3._type == "pmid")
pmid2  = pmid.groupBy("wosID").agg(sort_array(collect_list("_value")).alias("_value"))
pmid4  = pmid2.select(col('wosID').alias('wosID'), concat_ws("| ", pmid2._value).alias('pmid'))

wosIdPlus4 = wosIdDf.join(doi4,   wosIdDf['wosID'] == doi4['wosID'],   how='left') \
                    .join(eissn4, wosIdDf['wosID'] == eissn4['wosID'], how='left') \
                    .join(isbn4,  wosIdDf['wosID'] == isbn4['wosID'],  how='left') \
                    .join(pmid4,  wosIdDf['wosID'] == pmid4['wosID'],  how='left') \
                    .join(issn4,  wosIdDf['wosID'] == issn4['wosID'],  how='left') \
                 .select(coalesce(wosIdDf.wosID, issn4.wosID, doi4.wosID, eissn4.wosID, isbn4.wosID, pmid4.wosID).alias('wosID'), 
                        issn4.issn, doi4.doi, eissn4.eissn, isbn4.isbn, pmid4.pmid)

In [7]:
#BUILD AUTHOR ARRAY
wos5 = WoS.select(WoS.UID.alias("wosID"), WoS.static_data.summary.names['name'].alias("author"))

wos6 = wos5.withColumn("author", explode("author")).select("*", col("author")["_seq_no"].alias("seq_no"),
                                                                col("author")["display_name"].alias("standardNames")
                                                          )

wos_author = wos6.select("wosID", concat_ws(" ", wos6.standardNames, wos6.seq_no).alias("standardNames")
                        )


wos_auth2 = wos_author.groupBy("wosID").agg(sort_array(collect_list("standardNames")).alias("standardNames")
                                           )  
wos_auth2 = wos_auth2.select(
   col('wosID'),
   concat_ws("| ", col('standardNames')).alias('standardNames'),  
   size('standardNames').alias('count')
)

wos_auth2 = wos_auth2.drop('count')

wos_auth2.show(5)

+-------------------+--------------------+
|              wosID|       standardNames|
+-------------------+--------------------+
|WOS:000577436500001|Barrera, Joel 1| ...|
|WOS:000582231600001|da Silva Nunes, M...|
|WOS:000582346400001|Batista, Marco 1|...|
|WOS:000583881400007|Russo, Davide 2| ...|
|WOS:000591838600007|Casal Otero, Lore...|
+-------------------+--------------------+
only showing top 5 rows



In [9]:
wtOutput = wos.join(wosIdPlus4, wos['wosID'] == wosIdPlus4['wosID'], how='full') \
              .join(wosEorg,    wos['wosID'] == wosEorg['wosID'],    how='full') \
              .join(wos_auth2,  wos['wosID'] == wos_auth2['wosID'],  how='full')
            .select(coalesce(wos.wosID,
                             wosIdPlus4.wosID,
                             wosEorg.wosID,
                             wos_auth2.wosID).alias('wosId'),
                   wos.Year,
                   wos.publicationType, 
                   wos.documentType, 
                   wos.publisher, 
                   wos.publicationName, 
                   wos.pageCount, 
                   wos.fundingText, 
                   wos.reprintAuthor, 
                   wos.reprintAuthorAddress,
                   wos.full_address,
                    wos_auth2.standardNames, 
                       wosIdPlus4.issn, 
                       wosIdPlus4.doi, 
                       wosIdPlus4.eissn, 
                       wosIdPlus4.isbn,
                       wosIdPlus4.pmid,
                        wosEorg.Eorg
                           )

wtOutput.printSchema()

root
 |-- wosId: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- publicationType: string (nullable = true)
 |-- documentType: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publicationName: string (nullable = true)
 |-- pageCount: long (nullable = true)
 |-- fundingText: string (nullable = true)
 |-- reprintAuthor: string (nullable = true)
 |-- reprintAuthorAddress: string (nullable = true)
 |-- issn: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- eissn: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- pmid: string (nullable = true)
 |-- Eorg: string (nullable = true)



In [10]:
#wtOutput2 = wtOutput.filter(col("Eorg").contains('Indiana Univ'))

wtOutput.show(5)

Py4JJavaError: An error occurred while calling o423.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1647 in stage 2.0 failed 4 times, most recent failure: Lost task 1647.3 in stage 2.0 (TID 2855, iuni9, executor 156): ExecutorLostFailure (executor 156 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.  2.3 GB of 1.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
wtOutput2.coalesce(100).write.option("header","True") \
                               .option("sep","\t") \
                               .option("quoteAll", True) \
                               .mode("overwrite") \
                               .csv('/wtQuery')