In [355]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

memory = '8g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

Start a new spark session. 

In [356]:
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/scopus.author") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/scopus.author") \
    .getOrCreate()

Extract document(articles) from MongoDB. 

In [357]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option('uri', 'mongodb://127.0.0.1/scopus.document').load()

Create a view to be used with Spark SQL. 

In [358]:
df.createOrReplaceTempView("document")

Read serial data from MongoDB. 

In [359]:
s_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option('uri', 'mongodb://127.0.0.1/scopus.serial').load()
s_df.createOrReplaceTempView("serial")
serial_df = spark.sql('''
    SELECT `source-id` AS serial_id, `dc:title` AS serial, `prism:issn` AS serial_issn, `prism:eIssn` AS serial_eissn, openaccess As open_access
    FROM serial
    WHERE `prism:aggregationType` = 'journal'
''')
serial_df = serial_df.withColumn('serial_lc', lower(col('serial')))

Check a few samples. 

In [360]:
serial_df.show(5, False)

+-----------+------------------------------------------+-----------+------------+-----------+------------------------------------------+
|serial_id  |serial                                    |serial_issn|serial_eissn|open_access|serial_lc                                 |
+-----------+------------------------------------------+-----------+------------+-----------+------------------------------------------+
|10000153402|Scandinavian Journal of Management Studies|0281-7527  |null        |0          |scandinavian journal of management studies|
|1000147123 |Business: Theory and Practice             |1648-0627  |1822-4202   |1          |business: theory and practice             |
|100141     |Journal of Field Archaeology              |0093-4690  |2042-4582   |null       |journal of field archaeology              |
|100147030  |Current Sociology                         |0011-3921  |1461-7064   |null       |current sociology                         |
|100147316  |Business Strategy Review    

## Analysis of references

Extract journal titles in where references were published. 

In [361]:
ref_journal_df = spark.sql('''
    SELECT references["sourcetitle"] AS ref_journal, references['title'] AS ref_title
    FROM document 
    WHERE 
        coredata["prism:aggregationType"]="Journal"
''')

Scopus is not very consistent when a reference is a monograph. Sometimes, the sourcetitle field is 'null'; sometimes, the title field is 'null'; sometimes, they both are not null but are the same (note, convert case first). Therefore, we keep only the references with different sourcetitle and title. To do that, first we define a UDF (user defined function) helper. 

In [362]:
def my_zip(x, y):
    if isinstance(x, list) and isinstance(y, list):
        return zip(x, y)
    else:
        return [x, y]
zip_ = udf(
  lambda x, y: list(my_zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", StringType()),
      StructField("second", StringType())
  ]))
)

Now we use the zip_ udf to list source title and title in two separate columns and filter the data frame. 

In [363]:
ref_journal_df = ref_journal_df.withColumn("tmp", zip_("ref_journal", "ref_title"))\
    .withColumn("tmp", explode("tmp"))\
    .select(col("tmp.first").alias("ref_journal"), col("tmp.second").alias("ref_title"))
ref_journal_df = ref_journal_df.withColumn('ref_journal', ref_journal_df.ref_journal.cast(StringType()))
ref_journal_df = ref_journal_df.withColumn('ref_title', ref_journal_df.ref_title.cast(StringType()))
ref_journal_df = ref_journal_df.withColumn('ref_journal_lc', lower(col('ref_journal')))
ref_journal_df = ref_journal_df.withColumn('ref_title_lc', lower(col('ref_title')))
ref_journal_df = ref_journal_df\
    .filter("ref_journal_lc!='null'" and "ref_title_lc!='null'" and 'ref_journal_lc!=ref_title_lc')\
    .select('ref_journal', 'ref_journal_lc').filter("ref_journal_lc!='null'")

Check a few samples. 

In [364]:
ref_journal_df.show(5, False)

+--------------------------+--------------------------+
|ref_journal               |ref_journal_lc            |
+--------------------------+--------------------------+
|Health                    |health                    |
|Health                    |health                    |
|Social Policy and Society |social policy and society |
|Theory, Culture & Society |theory, culture & society |
|Journal of Political Power|journal of political power|
+--------------------------+--------------------------+
only showing top 5 rows



Count the times that journals were cited. Sort them by cited. Count the number of reference journals. 

In [365]:
ref_journal_df = ref_journal_df.groupBy(['ref_journal', 'ref_journal_lc']).count().sort('count', ascending=False)
ref_journal_df.show(5, False)
print("Unique Reference Journals: ", ref_journal_df.count())

+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------+-----+
|ref_journal                                                                    |ref_journal_lc                                                                 |count|
+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------+-----+
|Astrophysical Journal                                                          |astrophysical journal                                                          |1249 |
|New England Journal of Medicine                                                |new england journal of medicine                                                |774  |
|Nature                                                                         |nature                                                                         

Use inner join to join the data frames to create a dataframe of ref journals with issn. 

In [366]:
ref_journal_df = ref_journal_df.join(serial_df, ref_journal_df.ref_journal_lc == serial_df.serial_lc, 'inner')

Show samples. There are less rows in the dataframe. This is because many rows were not serials. They might be monographs or conference proceedings. 

In [367]:
ref_journal_df = ref_journal_df.select('serial_id', 'ref_journal', 'ref_journal_lc', 'serial_issn', 'serial_eissn', 'open_access', 'count')
ref_journal_df.show(5, False)
ref_journal_df.count()

+-----------+----------------------------------+----------------------------------+-----------+------------+-----------+-----+
|serial_id  |ref_journal                       |ref_journal_lc                    |serial_issn|serial_eissn|open_access|count|
+-----------+----------------------------------+----------------------------------+-----------+------------+-----------+-----+
|27065      |Accident and Emergency Nursing    |accident and emergency nursing    |0965-2302  |null        |0          |1    |
|24710      |Annals of Applied Probability     |annals of applied probability     |1050-5164  |null        |null       |3    |
|13600154724|Biomicrofluidics                  |biomicrofluidics                  |1932-1058  |null        |null       |1    |
|13643      |BMC Ear, Nose and Throat Disorders|bmc ear, nose and throat disorders|1472-6815  |null        |1          |1    |
|12599      |Breast Journal                    |breast journal                    |1075-122X  |1524-4741   |nul

6082

Load journals that JHU has subscribed - This data is exported from 360 SS RM. 

In [368]:
ss_df = spark.read.format("csv").option("header", "true").load('../data/Tracked_eJournals_JHE-2018-04-30-66826.csv')
exprs = [col(column).alias(column.replace(' ', '_').replace('-', '_')) for column in ss_df.columns]
ss_df = ss_df.select(*exprs).select('Title', 'ISSN', 'eISSN').withColumn('Title', lower(col('Title'))).withColumn('Title', regexp_replace('Title', '&', 'and'))
ss_df.show(5, False)
print('Total Subscription: ', ss_df.count())
ss_df = ss_df.dropDuplicates()
print('Total Subscription After Duplicates Dropped: ', ss_df.count())

+------------------------------------------------------------+---------+---------+
|Title                                                       |ISSN     |eISSN    |
+------------------------------------------------------------+---------+---------+
|government health it                                        |null     |null     |
|environmental geosciences                                   |1075-9565|1526-0984|
|aapg bulletin                                               |0149-1423|1558-9153|
|american association of petroleum geologists bulletin       |0002-7464|null     |
|bulletin of the american association of petroleum geologists|0883-9247|null     |
+------------------------------------------------------------+---------+---------+
only showing top 5 rows

Total Subscription:  265408
Total Subscription After Duplicates Dropped:  163756


In [369]:
ref_journal_df = ref_journal_df.join(ss_df, 
    (ref_journal_df.ref_journal_lc==ss_df.Title)|((ref_journal_df.serial_issn!='null')&((ref_journal_df.serial_issn==ss_df.ISSN)|(ref_journal_df.serial_issn==ss_df.eISSN)))|((ref_journal_df.serial_eissn!='null')&((ref_journal_df.serial_eissn==ss_df.eISSN)|(ref_journal_df.serial_eissn==ss_df.ISSN))),
    "left_outer")
ref_journal_df.show(5, False)

+-----------+----------------------------------+----------------------------------+-----------+------------+-----------+-----+-----------------------------------+---------+---------+
|serial_id  |ref_journal                       |ref_journal_lc                    |serial_issn|serial_eissn|open_access|count|Title                              |ISSN     |eISSN    |
+-----------+----------------------------------+----------------------------------+-----------+------------+-----------+-----+-----------------------------------+---------+---------+
|27065      |Accident and Emergency Nursing    |accident and emergency nursing    |0965-2302  |null        |0          |1    |accident and emergency nursing     |0965-2302|1532-9267|
|24710      |Annals of Applied Probability     |annals of applied probability     |1050-5164  |null        |null       |3    |annals of applied probability      |1050-5164|2168-8737|
|13600154724|Biomicrofluidics                  |biomicrofluidics                  |19

List the top 25 most cited but not subscribed journal titles. However, this list needs to be curated since the subscription data from SerialSolutions is not reliable. 

In [370]:
cited_not_subscribed = ref_journal_df.filter(ref_journal_df.Title.isNull())
cited_not_subscribed.select('serial_id', 'ref_journal', 'serial_issn', 'serial_eissn', 'open_access', 'count')\
    .sort('count', ascending=False).show(25, False)
cited_not_subscribed.count()

+-----------+---------------------------------------------+-----------+------------+-----------+-----+
|serial_id  |ref_journal                                  |serial_issn|serial_eissn|open_access|count|
+-----------+---------------------------------------------+-----------+------------+-----------+-----+
|32522      |American Review of Respiratory Disease       |0003-0805  |null        |null       |14   |
|19700188217|Bioanalysis                                  |1757-6180  |1757-6199   |null       |11   |
|23853      |Journal of Clinical Hypertension             |1524-6175  |1751-7176   |null       |10   |
|17600155130|NCHS data brief                              |1941-4935  |1941-4927   |null       |10   |
|21100773752|Nature Microbiology                          |null       |2058-5276   |null       |10   |
|130029     |Journal of Sexual Medicine                   |1743-6095  |1743-6109   |0          |9    |
|3600148103 |Sexual Health                                |1448-5028  |14

258

## Analysis of Journals of JHU publications

Extract journal titles that the articles were published in. 

In [371]:
journal_df = spark.sql('''
    SELECT coredata["source-id"] AS journal_id, coredata["prism:publicationName"] AS journal
    FROM document 
    WHERE coredata["prism:aggregationType"]="Journal"
''')
journal_df = journal_df.withColumn('journal_lc', lower(col('journal')))

Check with some samples. 

In [372]:
journal_df.show(5, False)

+----------+---------------------------------+---------------------------------+
|journal_id|journal                          |journal_lc                       |
+----------+---------------------------------+---------------------------------+
|20830     |Health (United Kingdom)          |health (united kingdom)          |
|18011     |Neuropsychological Rehabilitation|neuropsychological rehabilitation|
|12260     |Journal of Pediatric Orthopaedics|journal of pediatric orthopaedics|
|15962     |Journal of Medical Ethics        |journal of medical ethics        |
|145172    |Behavioral Sleep Medicine        |behavioral sleep medicine        |
+----------+---------------------------------+---------------------------------+
only showing top 5 rows



Count the times that journals were published in. Sort them by times published. 

In [373]:
journal_df = journal_df.groupBy(['journal', 'journal_lc']).count().sort('count', ascending=0)
journal_df.select('journal', 'count').show(5, False)
print("Unique Journals: ", journal_df.count())

+-------------------------------------------------------------------------------+-----+
|journal                                                                        |count|
+-------------------------------------------------------------------------------+-----+
|Astrophysical Journal                                                          |45   |
|PLoS ONE                                                                       |41   |
|Monthly Notices of the Royal Astronomical Society                              |31   |
|Proceedings of the National Academy of Sciences of the United States of America|22   |
|Scientific Reports                                                             |19   |
+-------------------------------------------------------------------------------+-----+
only showing top 5 rows

Unique Journals:  1332


Join the serial dataframe to add ISSN to the dataframe. 

In [374]:
journal_df = journal_df.join(serial_df, journal_df.journal_lc == serial_df.serial_lc, 'inner')

Show samples. Dropped a few titles. They are not journals (not in serials).  

In [375]:
journal_df = journal_df.select('serial_id', 'journal', 'journal_lc', 'serial_issn', 'serial_eissn', 'open_access', 'count')
journal_df.show(5, False)
journal_df.count()

+-----------+--------------------------------+--------------------------------+-----------+------------+-----------+-----+
|serial_id  |journal                         |journal_lc                      |serial_issn|serial_eissn|open_access|count|
+-----------+--------------------------------+--------------------------------+-----------+------------+-----------+-----+
|24267      |British Journal of Dermatology  |british journal of dermatology  |0007-0963  |1365-2133   |null       |2    |
|13736      |British Journal of Ophthalmology|british journal of ophthalmology|0007-1161  |1468-2079   |null       |1    |
|25465      |Current Diabetes Reports        |current diabetes reports        |1534-4827  |1539-0829   |null       |1    |
|18458      |European Respiratory Journal    |european respiratory journal    |0903-1936  |1399-3003   |null       |2    |
|21100407195|Optica                          |optica                          |2334-2536  |null        |1          |1    |
+-----------+---

1281

Join the data with 

In [376]:
journal_df = journal_df.join(ss_df,
    (journal_df.journal_lc==ss_df.Title)|((journal_df.serial_issn!='null')&((journal_df.serial_issn==ss_df.ISSN)|(journal_df.serial_issn==ss_df.eISSN)))|((journal_df.serial_eissn!='null')&((journal_df.serial_eissn==ss_df.eISSN)|(journal_df.serial_eissn==ss_df.ISSN))),
    "left_outer")

In [377]:
journal_df.show(5, False)

+---------+--------------------------------+--------------------------------+-----------+------------+-----------+-----+--------------------------------+---------+---------+
|serial_id|journal                         |journal_lc                      |serial_issn|serial_eissn|open_access|count|Title                           |ISSN     |eISSN    |
+---------+--------------------------------+--------------------------------+-----------+------------+-----------+-----+--------------------------------+---------+---------+
|24267    |British Journal of Dermatology  |british journal of dermatology  |0007-0963  |1365-2133   |null       |2    |british journal of dermatology  |null     |null     |
|24267    |British Journal of Dermatology  |british journal of dermatology  |0007-0963  |1365-2133   |null       |2    |british journal of dermatology  |0007-0963|1365-2133|
|13736    |British Journal of Ophthalmology|british journal of ophthalmology|0007-1161  |1468-2079   |null       |1    |british jo

Show journals that were published in but not subscribed

In [378]:
published_not_subscribed = journal_df.filter(journal_df.Title.isNull())
published_not_subscribed.select('serial_id', 'journal', 'serial_issn', 'serial_eissn', 'open_access', 'count')\
    .sort('count', ascending=False).show(25, False)
published_not_subscribed.count()

+-----------+--------------------------------------------------------------------------------------+-----------+------------+-----------+-----+
|serial_id  |journal                                                                               |serial_issn|serial_eissn|open_access|count|
+-----------+--------------------------------------------------------------------------------------+-----------+------------+-----------+-----+
|21100454916|Abdominal Radiology                                                                   |2366-004X  |2366-0058   |null       |2    |
|21100773739|Materials Research Letters                                                            |null       |2166-3831   |1          |2    |
|23853      |Journal of Clinical Hypertension                                                      |1524-6175  |1751-7176   |null       |2    |
|12358      |International Journal of Fuzzy Systems                                                |1562-2479  |2199-3211   |null       

17