# Notebook for simplifying and cleaning Web of Science data 
- In the xml to parquet step, we just dumped the xml into parquet format, without cleaning it up or anything. It is very messy and redundant, and now that it is in a fast format, we can clean it up.
- This notebook specifically gets a cleanly formated table of authors out of the data. For publications and their information, there is a separate notebook (as a single paper may have multiple authors).
- Most of the description in this notebook is in comments rather than markdown. 

## Setup

In [2]:
from pyspark.sql import SQLContext
#import pandas as pd
from pyspark.sql.functions import *
import pyspark.sql
import string

In [3]:
from pyspark.sql.types import NullType

In [4]:
sqlC = SQLContext(sc)

In [5]:
raw = sqlC.read.parquet("wos_core.parquet")

# Step 0: select author data
- Here we use the explode function to get all the fields nested inside author out and into a workable format. We will end up with own row per author, and one column per variable about the author. 

In [6]:
#select the name data from static data, one row per name per paper
names_summary = raw.select('UID', explode('static_data.summary.names.name').alias('n')
            ).select('UID', 'n.*'
                    )
#set aside the people with no address
ns = names_summary.filter(col('_addr_no').isNull())

#this takes the rows with multiple addresses and splits them into multiple rows
#thus they match the other data
ns2 = names_summary.filter(col('_addr_no').isNotNull()
                          ).select('UID','_dais_id', '_orcid_id_tr', '_r_id', '_reprint',
                     '_role', '_seq_no', 'display_name', 'email_addr',
                     'first_name', 'full_name', 'last_name', 'suffix',
                     'wos_standard', 
                     explode(split('_addr_no', ' ')).alias('_addr_no'))

#concat them back together
names_summary = ns.unionAll(ns2.select(ns.columns))

#drop duplicates and add an ID column
names_summary = names_summary.dropDuplicates().withColumn("row_id2", 
                                                          monotonically_increasing_id())

#save the whole thing for faster access
names_summary.write.parquet('names_summary_tmp.parquet', mode='overwrite')

In [7]:
names_addr = raw.select('UID', explode('static_data.fullrecord_metadata.addresses.address_name.names.name').alias('n')
            ).select('UID', explode('n').alias('n2')
                     ).select('UID', 'n2.*'
                     ).dropDuplicates().withColumn("row_id", monotonically_increasing_id())

names_addr.write.parquet('names_addr_tmp.parquet', mode='overwrite')
#names2.show()

In [8]:
#read our new temporary files so that we have faster access for the mess of queries below
names_summary = sqlC.read.parquet('names_summary_tmp.parquet')
names_addr = sqlC.read.parquet('names_addr_tmp.parquet')

# step 1: align authors from both tables
- This is particularly tricky since authors have many different IDs and their names are neither unique nor consistently spelled in the data. Author matching and disambiguation are open problems with no simple solution. Here I take a cautious approach and match only those I can verify.
- Adding to the confusion, two separate parts of any WoS publication record might have information about the authors. They are inconsistently filled out. In what follows, I also try to match up / combine information across them, to get one definitive list of authors for each paper. 

In [9]:
#names of columns we're working with
cols = [
 '_dais_id',
 '_orcid_id_tr',
 '_r_id',
 '_reprint',
 '_role',
 '_seq_no',
 'display_name',
 'email_addr',
    '_addr_no',
 'first_name',
 'full_name',
 'last_name',
 'suffix']

In [10]:
# this is our to-do list of data from the address section that needs to be 
# matched with data from names
to_match = names_addr
to_match.count()

126608479

In [11]:
# column I am matching on
cname = 'email_addr'

# base data I'm merging in
names_2 = names_summary
#rename the columns I am not merging on so they are distinct in the result
for c in cols:
    if c != cname:
        names_2 = names_2.withColumnRenamed(c, c+'_2')
    else:
        names_2 = names_2.withColumn(c+'_2', lit(None).cast(NullType()))

#merge on given columns
all_names = to_match.filter(col(cname).isNotNull()).join(names_2, 
                                                           on=['UID', 'wos_standard', cname ], 
                                                           how='inner')

# remove matches we found from our candidate set
to_match = to_match.join(all_names, on=['row_id'], how='left_anti')

col_order = all_names.columns
#all_names.count()

In [12]:
cname = '_dais_id'

names_2 = names_summary
for c in cols:
    if c != cname:
        names_2 = names_2.withColumnRenamed(c, c+'_2')
    else:
        names_2 = names_2.withColumn(c+'_2', lit(None).cast(NullType()))

tmp = to_match.filter(col(cname).isNotNull()).join(names_2, 
                                                           on=['UID', 'wos_standard', cname ], 
                                                           how='inner')

#add our new matches to the old ones
all_names = all_names.unionAll(tmp.select(col_order))

to_match = to_match.join(all_names, on=['row_id'], how='left_anti')

#all_names.count()

In [13]:
cname = '_orcid_id_tr'

names_2 = names_summary
for c in cols:
    if c != cname:
        names_2 = names_2.withColumnRenamed(c, c+'_2')
    else:
        names_2 = names_2.withColumn(c+'_2', lit(None).cast(NullType()))

tmp = to_match.filter(col(cname).isNotNull()).join(names_2, 
                                                           on=['UID', 'wos_standard', cname ], 
                                                           how='inner')

#add our new matches to the old ones
all_names = all_names.unionAll(tmp.select(col_order))

to_match = to_match.join(all_names, on=['row_id'], how='left_anti')

#all_names.count()

In [14]:
cname = '_r_id'

names_2 = names_summary
for c in cols:
    if c != cname:
        names_2 = names_2.withColumnRenamed(c, c+'_2')
    else:
        names_2 = names_2.withColumn(c+'_2', lit(None).cast(NullType()))

tmp = to_match.filter(col(cname).isNotNull()).join(names_2, 
                                                           on=['UID', 'wos_standard', cname ], 
                                                           how='inner')

#add our new matches to the old ones
all_names = all_names.unionAll(tmp.select(col_order))

to_match = to_match.join(all_names, on=['row_id'], how='left_anti')

#all_names.count()

In [15]:
cname = 'display_name'

names_2 = names_summary
for c in cols:
    if c != cname:
        names_2 = names_2.withColumnRenamed(c, c+'_2')
    else:
        names_2 = names_2.withColumn(c+'_2', lit(None).cast(NullType()))

tmp = to_match.filter(col(cname).isNotNull()).join(names_2, 
                                                           on=['UID', 'wos_standard', cname ], 
                                                           how='inner')

#add our new matches to the old ones
all_names = all_names.unionAll(tmp.select(col_order))

to_match = to_match.join(all_names, on=['row_id'], how='left_anti')

#all_names.count()

In [16]:
cname = '_addr_no'

names_2 = names_summary
for c in cols:
    if c != cname:
        names_2 = names_2.withColumnRenamed(c, c+'_2')
    else:
        names_2 = names_2.withColumn(c+'_2', lit(None).cast(NullType()))

tmp = to_match.filter(col(cname).isNotNull()).join(names_2, 
                                                           on=['UID', 'wos_standard', cname ], 
                                                           how='inner')

#add our new matches to the old ones
all_names = all_names.unionAll(tmp.select(col_order))

to_match = to_match.join(all_names, on=['row_id'], how='left_anti')

#all_names.count()

In [17]:
#do it with no additional match criteria last, to catch everyone else

names_2 = names_summary
for c in cols:
    names_2 = names_2.withColumnRenamed(c, c+'_2')

tmp = to_match.filter(col('wos_standard').isNotNull()).join(names_2, 
                                                           on=['UID', 'wos_standard'], 
                                                           how='inner')
all_names = all_names.unionAll(tmp.select(col_order))

to_match = to_match.join(all_names, on=['row_id'], how='left_anti')

#all_names.count()

In [18]:
#to_match.count()
#this was zero yay

In [19]:
#all_names.write.parquet('WoS_names.parquet', mode='overwrite')

In [20]:
names_2 = names_summary.join(all_names, on=['row_id2'], how='left_anti')
for c in cols:
    names_2 = names_2.withColumnRenamed(c, c+'_2')

tmp = set(names_2.columns)
for c in col_order:
    if c not in tmp:
        names_2 = names_2.withColumn(c, lit(None).cast(NullType()))

In [21]:
all_names = all_names.unionAll(names_2.select(col_order))
#all_names.count()

In [22]:
all_names.write.parquet('WoS_names_tmp.parquet', mode='overwrite')

In [23]:
all_names = sqlC.read.parquet('WoS_names_tmp.parquet')

In [24]:
#df = all_names.describe().toPandas()
#df = df.set_index('summary').T
#df

In [25]:
#(df['count'].astype(int)  / 294182770).round(3).sort_values(ascending=False)

## Check results

In [26]:
cols = ['email_addr',
 '_addr_no',
 '_dais_id',
 '_orcid_id_tr',
 '_r_id',
 '_reprint',
 '_role',
 '_seq_no',
 'display_name',
 'first_name',
 'full_name',
 'last_name',
 'suffix']

In [27]:
all_names.columns

['UID',
 'wos_standard',
 'email_addr',
 '_addr_no',
 '_dais_id',
 '_orcid_id_tr',
 '_r_id',
 '_reprint',
 '_role',
 '_seq_no',
 'display_name',
 'first_name',
 'full_name',
 'last_name',
 'suffix',
 'row_id',
 '_addr_no_2',
 '_dais_id_2',
 '_orcid_id_tr_2',
 '_r_id_2',
 '_reprint_2',
 '_role_2',
 '_seq_no_2',
 'display_name_2',
 'first_name_2',
 'full_name_2',
 'last_name_2',
 'suffix_2',
 'row_id2',
 'email_addr_2']

In [28]:
# prints number of non-missing values for various fields

for c in cols:
    n = all_names.filter(col(c).isNull()
                ).filter(col(c+'_2').isNotNull()
                        ).count()
    print(c, n)
    all_names = all_names.withColumn(c, 
                     when(col(c).isNull(), 
                          col(c+'_2')
                         ).otherwise(col(c)))

('email_addr', 7544014)
('_addr_no', 1317203)
('_dais_id', 83271931)
('_orcid_id_tr', 11371)
('_r_id', 5031781)
('_reprint', 23737838)
('_role', 144563008)
('_seq_no', 144563008)
('display_name', 144553653)
('first_name', 135503389)
('full_name', 144553653)
('last_name', 138893007)
('suffix', 3164123)


## save intermediate result

In [29]:
#raw.select('UID', 'static_data.fullrecord_metadata.addresses').printSchema()

In [30]:
addr = raw.select('UID', explode('static_data.fullrecord_metadata.addresses.address_name.address_spec').alias('a')
                 ).select('UID', 'a.*')
addr.printSchema()

root
 |-- UID: string (nullable = true)
 |-- _addr_no: long (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- organizations: struct (nullable = true)
 |    |-- _count: long (nullable = true)
 |    |-- organization: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |-- _pref: string (nullable = true)
 |-- state: string (nullable = true)
 |-- street: string (nullable = true)
 |-- suborganizations: struct (nullable = true)
 |    |-- _count: long (nullable = true)
 |    |-- suborganization: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- zip: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _location: string (nullable = true)



In [31]:
addr.write.parquet('addr_tmp.parquet', mode='overwrite')

In [32]:
addr = sqlC.read.parquet('addr_tmp.parquet')

In [33]:
addr.count()

106979216

In [34]:
addr = addr.dropDuplicates()
addr.count()

101131660

In [35]:
#df = addr.describe().toPandas()
#df.T

## organize address information

In [36]:
addr = addr.withColumn('zipcode', concat_ws(", ", "zip._VALUE")
                      ).withColumn('zip_location', concat_ws(", ", "zip._location")
                                  )
addr = addr.select(['UID', '_addr_no', 'city', 'country', 'full_address',
                    'organizations', 'state', 'street', 'suborganizations',
                    'zipcode', 'zip_location'])
addr.printSchema()

root
 |-- UID: string (nullable = true)
 |-- _addr_no: long (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- organizations: struct (nullable = true)
 |    |-- _count: long (nullable = true)
 |    |-- organization: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |-- _pref: string (nullable = true)
 |-- state: string (nullable = true)
 |-- street: string (nullable = true)
 |-- suborganizations: struct (nullable = true)
 |    |-- _count: long (nullable = true)
 |    |-- suborganization: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- zipcode: string (nullable = false)
 |-- zip_location: string (nullable = false)



In [37]:
#df = addr.describe().toPandas()
#df.set_index('summary').T

# merge things

In [38]:
names = all_names.join(addr, on=['UID', '_addr_no'], how='left')

In [39]:
tmp = raw.withColumn('pub_year', col("static_data.summary.pub_info._pubyear")
                    ).select("UID", 'pub_year')
names = names.join(tmp, on=['UID'], how='left')

names.printSchema()

root
 |-- UID: string (nullable = true)
 |-- _addr_no: string (nullable = true)
 |-- wos_standard: string (nullable = true)
 |-- email_addr: string (nullable = true)
 |-- _dais_id: long (nullable = true)
 |-- _orcid_id_tr: string (nullable = true)
 |-- _r_id: string (nullable = true)
 |-- _reprint: string (nullable = true)
 |-- _role: string (nullable = true)
 |-- _seq_no: long (nullable = true)
 |-- display_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- suffix: string (nullable = true)
 |-- row_id: long (nullable = true)
 |-- _addr_no_2: string (nullable = true)
 |-- _dais_id_2: long (nullable = true)
 |-- _orcid_id_tr_2: string (nullable = true)
 |-- _r_id_2: string (nullable = true)
 |-- _reprint_2: string (nullable = true)
 |-- _role_2: string (nullable = true)
 |-- _seq_no_2: long (nullable = true)
 |-- display_name_2: string (nullable = true)
 |-- first_name_2: string 

In [40]:
names.columns

['UID',
 '_addr_no',
 'wos_standard',
 'email_addr',
 '_dais_id',
 '_orcid_id_tr',
 '_r_id',
 '_reprint',
 '_role',
 '_seq_no',
 'display_name',
 'first_name',
 'full_name',
 'last_name',
 'suffix',
 'row_id',
 '_addr_no_2',
 '_dais_id_2',
 '_orcid_id_tr_2',
 '_r_id_2',
 '_reprint_2',
 '_role_2',
 '_seq_no_2',
 'display_name_2',
 'first_name_2',
 'full_name_2',
 'last_name_2',
 'suffix_2',
 'row_id2',
 'email_addr_2',
 'city',
 'country',
 'full_address',
 'organizations',
 'state',
 'street',
 'suborganizations',
 'zipcode',
 'zip_location',
 'pub_year']

In [41]:
names = names.dropDuplicates()

## save results

In [42]:
names.write.parquet('WoS_names.parquet', mode='overwrite')

In [43]:
names = sqlC.read.parquet('WoS_names.parquet')

In [44]:
names.count()

328611134

## Peak at summary of results
- describe gets a statistical summary of every row. 
- toPandas puts it in pandas for ease of use. note this is only possible because the description is small. We can't put the whole author list into pandas because it is too big. 

In [45]:
df = names.describe().toPandas()

In [46]:
df = df.set_index('summary').T
df

summary,count,mean,stddev,min,max
UID,328611134,,,WOS:000003907500001,WOS:A1998YL55600003
_addr_no,185362321,8.278618889326488,30.64584458018425,0,999
wos_standard,327137816,4.0,,"&AElig;gidius, K","zure Loye, HC"
email_addr,56537133,,,%NuD-@itpcas.ac.cn,�GE
_dais_id,121074947,10072784.361219352,5043974.538325539,100,16770519
_orcid_id_tr,1160599,,,0000-0000-0000-0000,9000-0002-7610-2893
_r_id,15103774,,,,Z-2155-2018
_reprint,64404955,,,Y,Y
_role,328611134,,,anon,corp
_seq_no,328611134,39.02554399754453,236.51087121515067,1,5156


## Percent not missing, by column

In [47]:
((df[['count']].astype(int) / 380841690) * 100).round(1)

summary,count
UID,86.3
_addr_no,48.7
wos_standard,85.9
email_addr,14.8
_dais_id,31.8
_orcid_id_tr,0.3
_r_id,4.0
_reprint,16.9
_role,86.3
_seq_no,86.3
