A Capstone Project for the Data Engineering (LTAT.02.007) course.
The aim of the present project is to develop an end-to-end data pipeline for analytic queries.
The project repository is hosted on Github. In case of reading this from a pdf, we advise the reader to navigate to the repository: https://github.com/qetdr/research-pipeline.
Dmitri Rozgonjuk
Lisanne Siniväli
Eerik-Sven Puudist
Cheng-Han Chung
The general pipeline structure is presented in Figure 1. Below is the high-level overview of the steps:
- Ingestion and Preliminary Preprocessing: Data is ingested from Kaggle and first clean data tables in .csv format are saved.
- Augmentation: Clean data are augmented via APIs or static data files, and saved to another directory as clean and augmented .csv-s.
- Loading to databases: The cleaned and augmented .csv files are read into
pandas
and loaded to Postgres (data warehousing) as well as to Neo4J database. - Analytic queries: The data are ready for analytic queries which are done from browser via Jupyter Notebook and Neo4J.
Figure 1. The macro-view of the data pipeline. |
Largely, the process is automated via pipeline orchestration done with Apache Airflow. The data files are kept until the scheduled update. Then, cleaned and augmented data files are deleted to be replaced by updated data files. However, for the raw data files which might be needed for the initial run, one needs to run a python
script from command line (see below).
We use Postgres as the relational database. The database schema is presented in Figure 2. The aim for this database was to allow us to run queries on author-related statistics (please see some example queries at the end of this document).
Figure 2. Entity-relationship diagram. |
We use Neo4J as the graph database. We mostly use the data that are also present in the Data Warehouse.
The database schema is similar to the schema shown in Figure 2; however, the main difference is that the yellow tables in Figure 2 (journal
, article
, author
, and category
) are treated as nodes, where the other two tables, authorship
and article_category
are relationships AUTHORED
and BELONGS_TO
, linking authors-articles and articles-categories, respectively, in Neo4J. We also created the COAUTHORS
relationship, linking individual authors via joint publication(s), as well as PUBLISHED_IN
, marking the relationship between a specific journal and article.
The general aim of this database was primarily to allow us to extract and visualize (in Neo4J) the ego-network of a given researcher. For instance, when given a researcher's name, we wanted to create the possibility to see with whom and on what the person has collaborated. However, the database provides also additional exploratory analysis options (not within the scope of this project).
Figure 3. Graph database schema. |
Below is the step-by step description of data cleaning and transformations done within the pipeline. The following steps are in the functionality of dags/scripts/raw_to_tables.py
module.
- Data are downloaded from kaggle, unzipped, and only the necessary columns are extracted and converted to a
pandas DataFrame
. - Initial cleaning is perfomed: records with a missing DOI are removed,duplicates (based on
article_id
) are dropped, only the articles that include the categorycs
(stands for 'computer science') are included. - Initial table extraction:
authorship
andauthor
: names are parsed from the dataset that was extracted in previous step. For each author, first, last, and middle names are extracted. Names are cleaned (removing non-alphabetical characters). Author identifier is created (in the formLastnameF
where'F'
stands for first name initial).authorship
table is a long-format table with each author corresponding to each article.author
table is extracted so that unique name identifiers create their own table.article_category
andcategory
: similarly, lists of article categories are parsed andarticle_category
(long-format table with each category label corresponding to article) as well ascategory
table (unique categories forming a table with super- and subdomain identifiers) are created.- initial
article
andjournal
tables are created.
- Once these tables are prepared, they are then cleaned for missing values, NaN-values, etc. Authors with too short last names are removed. Tables are written to .csv format in the
dags/tables
directory.
Next the clean data for use in databases are created and augmented. Here, the module /dags/scripts/final_tables.py
is relevant. The process starts with preparing and augmenting the article
table, since it defines what parts of other tables are included.
-
We query the DOIs of articles against Crossref API to receive the work type, number of citations, and journal ISSN. For that, we use the helper-function
fetch_article_augments()
from/dags/scripts/augmentations.py
. The querying is done in batches of 2000, where after each batch, the data are updated in the .csv file. WARNING! This process is very slow, since too many queries per second may result in the IP being blocked. Hence, we chose the stable but slow option over fast but highly risky. After thearticle
table is augmented, we select only the works where type isjournal-article
. Other tables are updated accordingly. -
journal
table is then augmented. We add the source-normalized impact factors from the CWTS website. However, for convenience, we have downloaded the Excel workbook and use this a source (from the local repository). The helper-functionscheck_or_prepare_journal_metrics()
andfind_journal_stats()
from/dags/scripts/augmentations.py
are used. -
Finally, we augment the
author
table. We start by including genders for authors based on their first name. The names are retrieved from a static dataset which is included in thedags/augmentation/
directory. First names fromauthor
table are matched with the names in data. If a match is found, gender is updated; otherwise, gender remains aNaN
. Then, we also compute various statistics for each author. Additionally, we compute the h-index for each author based on the metrics from the database. For h-index computation, we use thehindex()
function from/dags/scripts/augmentations.py
. -
Finally, we update all tables to be in coherence with each other (meaning that each entity/node has relations, etc).
-
Clean data tables are saved in
.csv
format todags/data_ready/
directory from where they can be used for loading to databases.
We use Airflow for pipeline orchestration. Airflow makes it convenient to schedule the pipeline tasks. For the needs of the present project, we want to update the data yearly. To meet this goal, here is how Airflow works (for a visual overview of tasks, please see Figure 4). Of note, the entire script for Airflow pipeline orchestration can be found in dags/research_pipeline_dag.py
.
-
Once Airflow is initiated, it will try to run the pipeline. Warning! It can happen that the pipeline run will not be successful, as different services are setting up and being initialized. In our experience, problems with loading data to Neo4J may have issues, and this is likely the biggest single point-of-failure of the pipeline, meaning that one might need to manually restart Neo4J. However, it is also mentioned on Docker that Neo4J may come with poor performance and volatile stability.
-
The scheduling is done so that the start data of the pipeline in Airflow is 01.08.2022, meaning that the pipeline will be definitely initialized when it is first run (because the start date is in the past), and will then run again yearly (so, in next August). Yearly-updates can be turned off (i.e., to manual triggering) by setting
'schedule_interval': None
in thedefault_args
. -
There are 7 tasks:
Begin_Execution
: starts the pipeline, a dummy/empty operator;delete_for_update
: checks for existence and deletes the augmented/clean data files fromdags/data_ready
directory. This is necessary for yearly updates.find_tables_or_ingest_raw
: checks if the non-cleansed tables have been prepared. If yes, the pipeline proceeds to next task. If no, it is prompted that the user needs to ingest the data given the prompted script (to be run from Terminal).check_or_augment
: checks if the cleaned and augmented tables are present. If not, the tables from previous step are used for augmentation and cleaning.pandas_to_dwh
: imports the cleaned .csv-s and loads to Postgres Data Warehouse.pandas_to_neo
: imports the cleaned .csv-s and loads to Neo4J Graph Database.Stop_Execution
: a dummy operator to indicate the status of pipeline execution end.
Figure 4. Airflow-orchestrated data pipeline (a successful pipeline run). |
We would also like to note that we considered running the pandas_to_dwh
and pandas_to_neo
tasks in parallel. But because we want to keep it open with regards to how much data is used, we refactored the solution to sequential, since this reduces the risk of running out of memory.
- It is assumed that the entire project is on a local machine (i.e., your computer). If not, clone it from github:
git clone https://github.com/eeriksp/research-pipeline
. - If you want to ingest the raw data from Kaggle, you will need to prepare a
kaggle.json
file and include it in the project root directory. The default file is included but it needs to be updated with the appropriate credentials. More information can be found here: https://pypi.org/project/opendatasets/.
-
Start your
Docker Desktop
and make sure to allocate sufficient memory (Settings -> Resources -> Memory
, say 6.75 GB). -
Navigate to the root directory of this project.
-
From command line (or Terminal on a Mac), run
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
This creates an environment file for Airflow to allow to run it as a superuser. -
Run
docker-compose up airflow-init
. This initializes Airflow with username and pwd 'airflow' to be used. Wait until the run is completed. -
Now, run
docker-compose up
. This runs all the services described in thedocker-compose.yaml
file. These services include Airflow, Jupyter Notebook, Postgres, and Neo4J. This may take some time, but it is suggested to keep an eye on the progress from command line, especially whether all services have properly started. This command also installs all necessarypython
modules and packages to Airflow. -
Make sure that all services are up and running. For that, try accessing the services from your browser:
- Airflow: http://localhost:8080/
- Jupyter Notebook: http://localhost:8888/
- Neo4J: http://localhost:7474/
If it's not possible to access all services, wait a bit. Typically, the problem occurs with Neo4J, and if it's not possible to start the service(s), you can also try stopping the process (press twice
Ctrl (or Cmd) + C
) anddocker-compose down
. Then, repeat the process, starting from Step 4. -
The project folder includes the data tables by default. However, you can also test data ingestion, transformation, and augmentation yourself by deleting the .csv files. WARNING! Doing so will mean a significantly long runtime (can be more than half a day - not counting in potential issues with Neo4J). If you do choose to go without the default data tables (in repositories
dags/tables
anddags/data_ready
, see below for description), navigate to the root directory of this project and run the following commands from Terminal:- Install the necessary packages (on your local machine):
python -m pip install -r requirements.txt
- Run the script:
python3 dags/scripts/raw_to_tables.py
This script (1) will download the Kaggle data on your machine, (2) unzip it (appx 3.6+ GB), (3) extracts the necessary data, (4) makes the preliminary data cleaning, (5) creates the tables depicted in Figure 2, and (6) saves the tables to thedags/tables
directory in .csv format. Note: you might be asked for your Kaggle credentials from the command line but usually it works also when the file is in the root directory of the project.
- Install the necessary packages (on your local machine):
-
Navigate to Airflow (http://localhost:8080/). If everything is correct, you should see a DAG called
research_pipeline_dag
. Click on it. Then click onGraph
. You should now be able to see the DAG. -
If not already automatically triggered, to trigger the DAG, click on the button on the right that resembles 'Play'. Select
Trigger DAG
when prompted. This triggers the DAG. Wiat for the execution to be finished. -
Navigate to Jupyter Notebook (http://localhost:8888/) and run the analytic queries for relational database. Of note, it is also possible to run Neo4J queries from the notebook, but for our purposes (well, mainly better visuals), we run the Neo4J queries from Neo4J browser interface (http://localhost:7474/)
-
And this should be it. Airflow should trigger the data update in summer in a year (at around 1st August), so nothing should change before that.
Below is a high-level overview of the general directory structure. Some files (e.g., for caching) that are produced but will not be directly interacted with by the user are not presented below. Additionally, we present here the pipeline where data are ingested and cleaned, i.e., to its 'final' form. When the project is ran for the very first time, the directories dags/tables/
and dags/data_ready
are empty.
research_pipe_container/
: the root directory
docker-compose.yaml
: Docker container configuration fileREADME.md
: the file you're reading nowrequirements.txt
: Python libraries/modules to be installedanalytical_queries.ipynb
: example queries for Postgres,a nd the possiblity to query Neo4J from Jupyter Notebookdags/
: main directory for scripts, etcresearch_pipeline_dag.py
: the entire DAG configuration and scripts for Airflowaugmentation/
article_journal.csv
: clean table with only journal articlescwts2021.csv
: the data for journals' impact (SNIPs)names_genders.csv
: most of the names matched with genders
data_ready/
: directory with clean and aaugmented dataarticle_augmented_raw.csv
: the augmented article table where data are not filtered based on columntype
valuearticle_category.csv
: each article linked to each category labelarticle.csv
: clean augmented table with onlytype
journal-article
author.csv
: clean table with each unique author with their attributes (see Figure 2)authorship.csv
: each individual author-article relationship tablecategory.csv
: each unique article category with super- and subdomainsjournal.csv
: each unique journal with impact metrics (SNIPs)
scripts/
: ETL scriptsraw_to_tables.py
: a module written to primarily be run from command line. Includes data ingestion from source (Kaggle), preliminary pre-processing, and initial data tables (in Figure 2) preparationaugmentations.py
: augmentation scripts for article (CrossRef queries), journal metrics (from a static .csv prepared from a .xslsx file online), and authors (h-index computation using the binary search algorithm)final_tables.py
: checking if clean tables exist; if not, augmentations are applied, statistics, and tables are cleanedsql_queries.py
: dropping, creating, and inserting into Postgres tablesneo4j_queries.py
: Neo4J connection class, data insertion (in batches) from pandas to Neo4J
tables/
: directory with preliminary data tables after ingestion and preliminary preprocessingarticle_category.csv
: each article linked to each category labelarticle.csv
: table with extract journal articles (missingtype
, etc)author.csv
: each unique author with their attributes (see Figure 2)authorship.csv
: each individual author-article relationship tablecategory.csv
: each unique article category with super- and subdomainsjournal.csv
: an empty journal table placeholder to be filled in augmentation process
images/
: directory for images used in this notebooklogs/
: Airflow logsneo4j/
: Neo4J related files and foldersplugins/
: Airflow plugins (not used in this project)
Although it is possible to work with the entire Arxiv dataset available in Kaggle, we decided to limit our data to indexed journal articles with a valid DOI for which at least one of the category tags included 'cs'
, or Computer Science. Hence, only eligible entities (i.e., authors, journals) were included in further databases and analyses.
We queried Crossref API with a given work's DOI. Works of type journal-article
were updated with citation counts and journal ISSNs. We did not query more information about the scientific work, since there was no specific purpose for that, and not adding additional queries helps to save on runtime.
In order to get the journal information, we need the journal ISSN list from the article table. Although journal Impact Factor is a more known metric, it is trademarked and, hence, retrieving it is not open-source. The alternative is to use SNIP: the source-normalized impact per publication. This is the average number of citations per publication, corrected for differences in citation practice between research domains. Fortunately, the list of journals and their SNIP is available from the CWTS website (https://www.journalindicators.com/).
For each author, we used a names-genders dataset to add supposed genders to each author. Although affiliation could also be of interest, there are several problems due to which we decided not to seek and extract affiliation data. First, the article augmentation source (the Crossref database) was largely missing affiliation data for authors. Second, in some cases, making queries based on author names were not possible (e.g., only author's first name initial was present - not allowing to identify the author properly or making it unjustifiably costly). Third, authors' affiliation may change dynamically (e.g., when changing an institution) and authors can also have multiple affiliations. Fourth, author affiliation in itself was not within the scope of the present project.
In the present pipeline, we had several analytic goals for which the pipeline was created. The aim of the Data Warehouse was to provide insights into the productivity (operationalized as the number of total publications) and influence (operationalized as citation count) of top researchers in the database. Additionally, we used a graph database that allowed us both presenting relations between records as well as appealing visuals.
Below are (1) analytic questions, (2) SQL-queries (via Python), and pictures of results of the queries.
SELECT author_id, rank_total_pubs as rank, total_pubs as publications FROM author ORDER BY rank_total_pubs LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100;
Figure 5. Top researchers with most publications. |
5.1.2. Proportionally, in which journals have the top 0.01% of scientists (in terms of publication count) published their work the most?
SELECT final.author_id, final.rank, final.publications, final.journal_title as top_journal, TO_CHAR((final.number * 100 / final.publications), 'fm99%') as percentage_of_all_publications FROM (select a.author_id, rank, publications, mode() within group (order by j.journal_title) AS journal_title, COUNT(j.journal_title) as number from (SELECT author_id, rank_total_pubs as rank, total_pubs as publications FROM author ORDER BY rank_total_pubs LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100) AS a INNER JOIN authorship au ON a.author_id = au.author_id INNER JOIN article ar ON au.article_id = ar.article_id INNER JOIN journal j ON ar.journal_issn = j.journal_issn group by a.author_id, rank, publications,j.journal_title having j.journal_title = mode() within group (order by j.journal_title)) as final LEFT JOIN (select a.author_id, rank, publications, mode() within group (order by j.journal_title) AS journal_title, COUNT(j.journal_title) as number from (SELECT author_id, rank_total_pubs as rank, total_pubs as publications FROM author ORDER BY rank_total_pubs LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100) AS a INNER JOIN authorship au ON a.author_id = au.author_id INNER JOIN article ar ON au.article_id = ar.article_id INNER JOIN journal j ON ar.journal_issn = j.journal_issn group by a.author_id, rank, publications,j.journal_title having j.journal_title = mode() within group (order by j.journal_title)) as final1 ON final.author_id = final1.author_id AND final.number < final1.number WHERE final1.author_id IS NULL ORDER BY final.rank LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100;
Figure 6. Proportion of specific journals among all publications within the top authors. |
SELECT final.author_id, final.rank, final.year AS most_influential_year, final.pub AS count_of_pub, final.avg_cites FROM (SELECT a.author_id, rank, count(ar.year) as pub, ar.year, (sum(ar.n_cites::DECIMAL)::int) / count(ar.year) as avg_cites FROM (SELECT author_id, rank_total_pubs as rank FROM author ORDER BY rank_total_pubs LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100) AS a INNER JOIN authorship au ON a.author_id = au.author_id INNER JOIN article ar ON au.article_id = ar.article_id GROUP BY a.author_id, rank, ar.year) as final LEFT JOIN (SELECT a.author_id, rank, count(ar.year) as pub, ar.year, (sum(ar.n_cites::DECIMAL)::int) / count(ar.year) as avg_cites FROM (SELECT author_id, rank_total_pubs as rank FROM author ORDER BY rank_total_pubs LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100) AS a INNER JOIN authorship au ON a.author_id = au.author_id INNER JOIN article ar ON au.article_id = ar.article_id GROUP BY a.author_id, rank, ar.year) as final1 ON final.author_id = final1.author_id AND final.avg_cites < final1.avg_cites WHERE final1.author_id IS NULL ORDER BY final.rank LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100;
Figure 7. The most productive year for the top scientists. |
5.1.4. What was the most influential (in terms of N citations/ N publications) year for top 0.01% scientists?
SELECT final.author_id, final.rank, final.hindex, final.pub, final.avg_cites, final.year FROM (SELECT a.author_id, rank, sum(hindex::DECIMAL) as hindex, sum(publications::DECIMAL) as pub, sum(avg_cites::DECIMAL) as avg_cites, ar.year FROM (SELECT author_id, rank_total_pubs as rank, total_pubs as publications, hindex, avg_cites FROM author ORDER BY rank_total_pubs LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100) AS a INNER JOIN authorship au ON a.author_id = au.author_id INNER JOIN article ar ON au.article_id = ar.article_id GROUP BY a.author_id, rank, ar.year) as final LEFT JOIN (SELECT a.author_id, rank, sum(hindex::DECIMAL) as hindex, sum(publications::DECIMAL) as pub, sum(avg_cites::DECIMAL) as avg_cites, ar.year FROM (SELECT author_id, rank_total_pubs as rank, total_pubs as publications, hindex, avg_cites FROM author ORDER BY rank_total_pubs LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100) AS a INNER JOIN authorship au ON a.author_id = au.author_id INNER JOIN article ar ON au.article_id = ar.article_id GROUP BY a.author_id, rank, ar.year) as final1 ON final.author_id = final1.author_id AND final.hindex < final1.hindex WHERE final1.author_id IS NULL ORDER BY final.rank LIMIT 0.01 * (SELECT COUNT(*) FROM author) / 100;
Figure 8. The most infuential year for the top scientists. |
Using the graph database, the aim of the queries was to provide information about a particular author's research activity - with compelling visuals. Specifically, we wanted to see with whom and on what a given author (e.g., based on name) has collaborated. Querying graph database allows to gain insights into the ego-network of a particualr scientist. To that end, we can see the total network (with the scientist) as well as gain a first insight into how modularized is the network (when exploring the network qithout including the ego-node). In addition, we can query other entities and their relations - and explore more beyond the scope of the present project.
5.2.1. Display the collaboration network of Lars Birkedal and the papers published with himself on it .
MATCH (author1:Author)-[r:COAUTHORS]-(author2:Author) WHERE author1.id = "BirkedalL" RETURN author1, author2, r
Figure 9. Ego-network of Lars Birkedal (with author). |
MATCH (author1:Author)-[r:COAUTHORS]-(author2:Author) WHERE author1.id = "BirkedalL" RETURN author2, r
Figure 10. Ego-network of Lars Birkedal (without the author). |
MATCH p=(ar:Article)-[r:PUBLISHED_IN]->(j:Journal) WHERE j.title = 'Artificial Intelligence' RETURN p
Figure 11. All papers published in 'Artificial Intelligence' |
5.2.4. Display all articles that are from data science domain ('DS') and are cited more than 100 times.
MATCH q = (ar:Article)-[r:BELONGS_TO]->(c:Category) WHERE c.subdom = 'DS' AND ar.n_cites > 100 RETURN q
Figure 12. Papers from 'data science' domain with more than 100 citations |