# Project Title
### Data Engineering Capstone Project

#### Project Summary
This is an exploratory project to understand how metadata produced from the production of academic articles could be modelled and stored using a star schema design to enable efficient querying for analytics purposes.  The primary data source is the Crossref Public Data file (https://www.crossref.org/blog/2022-public-data-file-of-more-than-134-million-metadata-records-now-available/) which is a collection of more than 134 million metadata records. These records are produced from metadata submitted to Crossref by academic publishers as part of their publication process.  

Additionally I have integrated GDP (gross domestic product) data to allow end users to try an answer questions such as how macroeconomic factors may influence the publishing environment. For example, is there a correlation between high GDP output and the number of academic documents produced?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
I have written an ETL program that extracts information from journal article metadata and loads them into a star schema style database. In addition I have also loaded GDP data. The data from both sources has been integrated by using conformed dimensions for year and country. This allows end users to join facts about academic articles to GDP amounts. 
For the purposes of this project I have used a locally hosted PostgreSQL database, a Jupyter notebook and Python as the programming language of choice. 


#### Describe and Gather Data 
The Crossref Public Data file which is the source of this data was downloaded from here: https://academictorrents.com/details/4dcfdf804775f2d92b7a030305fa0350ebef6f3e
The full dataset includes 26,810 compressed JSON files. An example of one of these JSON files can be viewed here: https://raw.githubusercontent.com/jimswainston/udacity-data-engineering-capstone/main/data/Crossref/0.json. The type of information that is included is when a document was published, who it was published by, who the authors are, which institutions the authors are affiliated with at time of publication, the subject areas that the content covers and information about the journal that the article is contained in.

The GDP data was sourced from the World Bank https://data.worldbank.org/indicator/NY.GDP.MKTP.CD. It is a CSV file that contains tabular data on the US$ GDP amounts for all countries that it keeps records for. 


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

One of the key pieces of information of interest is who authored a paper and the institution they were affiliated with. The institutional affiliation usually includes the country which allows us to understand the geography of where academic research is being produced. Unfortunately the affiliation isn't always present. See below JSON snippet for example. Missing affiliations need to be handled.



In [None]:
{
"author": [
        {
          "given": "Richard",
          "family": "Arkaifie",
          "sequence": "first",
          "affiliation": []
        }
      ]
    }



Another core issue is that no standard taxonomy is used for entering country information for an affiliation. The affiliation is typically provided by the document author when they submit their article to a publisher. You could have different values being used to refer to the same geoographic region. For example, USA and North America. The other issue with the country information is that it is unstructured and is a free text part of the affiliation. See the below snippet as an example:



In [None]:
{
"author": [
        {
          "given": "Dale D.",
          "family": "Tang",
          "sequence": "first",
          "affiliation": [
            {
              "name": "Department of Cellular and Integrative Physiology, Indiana University School of Medicine, 635 Barnhill Drive, Indianapolis, IN 46202, USA"
            }
          ]
        },
      }


In [1]:
%load_ext autoreload
%autoreload 2

In [4]:
# Performing cleaning tasks here
#Country information is extracted and cleansed using the following Python method that is found in the dataExtractUtils.py module
#This is an example of it working

import pandas as pd
import findspark
import gzip
import json
import dataExtractUtils as de
import uuid
import constants
import pandas.io.sql as sqlio
import psycopg2

with gzip.open("/home/jswainston/Downloads/April2022CrossrefPublicDataFile/0.json.gz", 'r') as fin:        
    json_bytes = fin.read()                      

json_str = json_bytes.decode('utf-8')            
data = json.loads(json_str)  


for item in data["items"]:
    print("DOI " + item["DOI"])
    if "author" in item:
        for author in item["author"]:
            for affiliation in author["affiliation"]:
                print(affiliation)
                affiliationCountries = de.match_countries_in_affiliation(affiliation["name"])
                print(affiliationCountries)
    break 




DOI 10.1149/1.1392467
{'name': 'Dipartimento di Chimica Fisica Applicata‐Politecnico di Milano, 20131 Milano, Italy'}
['Italy']
{'name': 'Dipartimento di Chimica Fisica Applicata‐Politecnico di Milano, 20131 Milano, Italy'}
['Italy']
{'name': 'Dipartimento di Chimica Fisica Applicata‐Politecnico di Milano, 20131 Milano, Italy'}
['Italy']
{'name': 'Dipartimento di Chimica Fisica Applicata‐Politecnico di Milano, 20131 Milano, Italy'}
['Italy']



#### Cleaning Steps
Document steps necessary to clean the data

To extract and standardise the countries used in affiliations I first needed a standard list of countries. I manually created the COUNTRIES dictionary in the constants module using the standard ISO 3166 short country names and alpha-3 codes. This was then used as a lookup so that affiliation strings could be seached against the country names. I also extended the list of ISO countries to be able to group regions. For example, England, Scotland, Wales and Northern Ireland all map  to the same country code 'GBR'. Mapping them to the same country code means they can all be transformed to have the same name. In this example the are transformed to United Kingdom. This is important as in the data from the World Bank GDP is stored at the level of the United Kindgom and not the constituent countries that it is made up from. Doing this cleansing and transformation means that the statistics from the Crossref dataset can be aggregated and grouped in the same way as the World Bank data.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The data model I have created is a multi-dimensional snowflake structure. I chose to use the dimensional model to make it simple for ease of use and for fast data retrieval through reducing the number of joins. The primary focus of the model is being able to count the number of author affiliations per country and the GDP per country. I have introduced an element of normalisation by having seperate entities for author and article. This is so that we don't have redundant article data stored against each author and makes it easier to calculate statistics about the articles. 

![title](udacityCapstoneDataModel.png)

#### 3.2 Mapping Out Data Pipelines

1) Firstly data is downloaded from the sources described in the data gathering section
2) The country and year dimensions are created first as they are static data sources. They also need to be in place for the fact tables to reference them. 
3) The fact_gdp table is then loaded from the World Bank csv file. 
4) The Crossref JSON files are then processed to create the author and article dimensions and the author_affiliation fact table. 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

This project relies on a local PostgreSQL instance being installed on the users machine. Before running any of the exectutibles in the project PostgreSQL must be installed and the CONNECTION_STRING constant in the constants.py must be updated to enable connection to the installed database. It is just the user
and password variables that need ammending. 

All of the code use to create the data model can be found in the files; etl.py, sqlQueries.py,createTables.py, constants.py and dataExtractUtils.py. To run the pipeline the folling commands can be issued in the following order:
1) python3 createTables.py 
2) python3 etl.py

After running these python files your database should be populated with the dables listed below with the number of rows stated. 

In [42]:
import warnings
import pandas.io.sql as sqlio
import psycopg2

warnings.filterwarnings('ignore')

conn = psycopg2.connect("host=127.0.0.1 dbname=udacityprojectdb user=student password=6GNjBQvF")
cur = conn.cursor()

articles_count= "select count(article_id) from articles"
affiliations_count = "select count(affiliation_id) from author_affiliation"
authors_count = "select count(author_id) from authors"
countries_count = "select count(country_id) from dim_country"
year_count = "select count(year_id) from dim_year"
gdp_count = "select count(id) from fact_gdp"

articles_result = sqlio.read_sql_query(articles_count, conn)
affiliations_result = sqlio.read_sql_query(affiliations_count, conn)
authors_result = sqlio.read_sql_query(authors_count, conn)
country_result = sqlio.read_sql_query(countries_count, conn)
year_result = sqlio.read_sql_query(year_count, conn)
gdp_result = sqlio.read_sql_query(gdp_count, conn)

print("Total number of records in articles is " + str(articles_result.iloc[0]['count']))
print("Total number of records in author_affiliation is " + str(affiliations_result.iloc[0]['count']))
print("Total number of records in authors is " + str(authors_result.iloc[0]['count']))
print("Total number of records in dim_country is " + str(country_result.iloc[0]['count']))
print("Total number of records in dim_year is " + str(year_result.iloc[0]['count']))
print("Total number of records in fact_gdp is " + str(gdp_result.iloc[0]['count']))


Total number of records in articles is 788947
Total number of records in author_affiliation is 282352
Total number of records in authors is 2604264
Total number of records in dim_country is 247
Total number of records in dim_year is 401
Total number of records in fact_gdp is 1488


In [45]:
import warnings
import pandas.io.sql as sqlio
import psycopg2

warnings.filterwarnings('ignore')

conn = psycopg2.connect("host=127.0.0.1 dbname=udacityprojectdb user=student password=6GNjBQvF")
cur = conn.cursor()

# This example of a query against the resulting database shows how the two datasets have been integrated around geographic temporal data. 
# GDP and journal article publishing data can be combined for different years and countries. 

integrated_query= """
select count(author_affiliation.affiliation_id) as affiliation_count, dim_country.country_name,gdp_amount
from authors
inner join articles
on articles.article_id = authors.article_id
inner join author_affiliation
on author_affiliation.author_id = authors.author_id
inner join dim_country 
ON dim_country.country_id = author_affiliation.country_id
inner join dim_year 
on dim_year.year_id = articles.year_id
inner join fact_gdp ON fact_gdp.year_id = dim_year.year_id AND fact_gdp.country_id = dim_country.country_id
where dim_year.year = 2020
group by dim_country.country_id,gdp_amount
"""

results = sqlio.read_sql_query(integrated_query, conn)

print("The below table shows the number of author contributions per country in 2020")
print("and the total GDP for that year. The affiliation_counts will not be accurate")
print("as we have only processed a small subset of the full crossref database \n")
print(results)

The below table shows the number of author contributions per country in 2020
and the total GDP for that year. The affiliation_counts will not be accurate
as we have only processed a small subset of the full crossref database 

    affiliation_count   country_name    gdp_amount
0                   1      Australia  1.327836e+12
1                   3        Austria  4.332585e+11
2                   1     Azerbaijan  4.269300e+10
3                   1     Bangladesh  3.739021e+11
4                   1         Canada  1.645423e+12
5                   2          China  1.468767e+13
6                   1        Germany  3.846414e+12
7                   6        Denmark  3.560849e+11
8                   1        Algeria  1.450092e+11
9                  20          Spain  1.281485e+12
10                  3        Finland  2.718370e+11
11                  1         France  2.630318e+12
12                  4        Georgia  1.584292e+10
13                 78      Indonesia  1.058689e+12
14      

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [15]:
# Perform quality checks here
""""
Check for unique primary keys in each table. 
Compare total row count with count of distinct identifiers to check that they match
"""

import warnings
import qualityCheckUtils as qc

warnings.filterwarnings('ignore')

conn = psycopg2.connect("host=127.0.0.1 dbname=udacityprojectdb user=student password=6GNjBQvF")
cur = conn.cursor()

qc.primary_key_count_test(conn,"articles","article_id")

Primary key check passed for table articles

total_rows is 788947

total_primary_keys is 788947


In [21]:
qc.primary_key_count_test(conn,"author_affiliation","affiliation_id")

Primary key check passed for table author_affiliation

total_rows is 282352

total_primary_keys is 282352


In [17]:
qc.primary_key_count_test(conn,"authors","author_id")

Primary key check passed for table authors

total_rows is 2604264

total_primary_keys is 2604264


In [18]:
qc.primary_key_count_test(conn,"dim_country","country_id")

Primary key check passed for table dim_country

total_rows is 247

total_primary_keys is 247


In [19]:
qc.primary_key_count_test(conn,"dim_year","year_id")

Primary key check passed for table dim_year

total_rows is 401

total_primary_keys is 401


In [20]:
qc.primary_key_count_test(conn,"fact_gdp","id")

Primary key check passed for table fact_gdp

total_rows is 1488

total_primary_keys is 1488


In [None]:
#Check for foreign key constraint. 
"""
To check if foreign key constraints have been implemented correctly
we will check to see if it's possible to delete a row from
dim_country that is referenced from fact_author_affiliation. 
We expect PostgreSQL to throw an error about the foreign key
constraint being violated
"""


In [70]:
qc.foreign_key_constraint_test(conn,"dim_country","country_id",77)

Foreign key constraint test passed. Database threw the correct error as shown below 

update or delete on table "dim_country" violates foreign key constraint "fk_country" on table "author_affiliation"
DETAIL:  Key (country_id)=(77) is still referenced from table "author_affiliation".



In [24]:
conn.close()

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.



Table | Field_Name | Field_Meaning | Field_Source
-----|-----|-----|---- 
dim_article|article_id|Unique identifier for an article| Generated by etl for each article found in Crossref public data file
dim_article|doi|Digital Object Identifier - a persistent unique identifier for digital objects. In this case journal articles| Crossref public data file
dim_article|title|The title of the journal article | Crossref public data file
dim_article|published_date|The date that the article was published | Crossref public data file
dim_article|year_id|Foreign key to dim_year giving the year that the article was published | derived from Crossref public data file and dim_year
dim_author|author_id|Unique identifier for an author | Generated by etl for each author found in Crossref public data file
dim_author|article_id|Forgien key to article that the author published | Generated by etl
dim_author|first_name|First name of an author of the article | Crossref public data file
dim_author|last_name|Last name of an author of the article|Crossref public data file
dim_country|country_id|Unique identifier for a country| Generated by etl for each country in ISO 3166 plus additions made for data quality 
dim_country|country_code|ISO 3166 Alpha-3 code|ISO 3166
dim_country|country_name|English short country name|ISO 3166
dim_year|year_id|Unique identifier for a year. Years in dimension range from 1800 to 2200|Generated by etl
dim_year|year|Year in integer format|Generated by etl
fact_author_affiliation|affiliation_id|unique identifier for an author affiliation|Generated by etl
fact_author_affiliation|author_id|Foreign key to author afilliation with an institution|Derived from Crossref public data file and dim_author
fact_author_affiliation|country_id|Forieng key to the country that the authors institution is located in | Derived from Crossref public data file and dim_country
fact_gdp|gdp_id|Unique identifier for each GDP fact|Generated by etl
fact_gdp|country_id|Foreign key to the country that the GDP fact is about|Derived from World Bank GDP file and dim_country
fact_gdp|year_id|Foreign key to the year that the GDP fact is from|Derived from World Bank GDP file and dim_year
fact_gdp|amount|The dollar amount of Gross Domestic Product Generated by a country in a year|World Bank GDP file


#### Step 5: Complete Project Write Up
**Clearly state the rationale for the choice of tools and technologies for the project**
  
I chose to use the Python programming language as the primary technology for wrting the ETL code. This is due to the large number of packages that support data development such as pandas, numpy and psycopg2. Being a high level language it also makes development more rapid than other languages such as Java. 

I chose to use a local PostgreSQL database for the project. PostgreSQL is a free and open-source relational database management system. With it being free it didn't add any cost to the project. A relational database suited the project due to the shape of the input data and the target model being a star schema for ease and speed of querying. 

A Jupyter notebook was used to show test results as it is a fantastic tool for communicating how code works with being able to blend text editing, code and code output.

**Propose how often the data should be updated and why.**

*Crossref* - After an initial load of the crossref publishing metadata could I would propose retrieving new records from the API on a daily basis. New content is being bublished all the time so it would be useful to have a daily view. Having said that the primary purpose of the anlysis is looking at macro trends so even though content may be published at ant time there isn't really any need for a real time view of this. 

*World Bank GDP data*- the World Bank GDP data is reported on an annual basis so new data would needed to be loaded each year when it becomes available

<br>

**Write a description of how you would approach the problem differently under the following scenarios:**

 * The data was increased by 100x.

If the data was increased by 100x then I would use a scalable cloud database that supported both distributed storage and processing. A possible solution could be Amazon Redshift.  
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

If the data needed to be available at a certain time each day then I would make sure that I used an orchestration tool with scheduling features. One possible solution would be Apache Airflow 
 
 * The database needed to be accessed by 100+ people.
  
 If the database neededd to be accessed by 100+ people I would choose a technology that was easily accessible and supported lots of concurrent connections. One possible option would be Amazon Redshift, a cloud data warehouse that promotes limitless concurrency.   