# Data Engineering Project 
## ETL

**Authors**: 
- Dmitri Rozgonjuk
- Eerik Sven Puudist
- Lisanne Siniväli
- Cheng-Han Chung


The aim of this script is to clean the main raw data frame and write a new, clean data frame for further use. In this notebook, the comparisons of different read- and write-methods are demonstrated.

First, we install and import the necessary libraries from one cell (to avoid having libraries in some individual cells below). The packages and their versions to be installed will later be added to the `requirements.txt` file.

We also use this section to set global environment parameters.

In [None]:
!conda install psycopg2 -y
!pip install -r requirements.txt

In [30]:
## NB!! run the installs from terminal
########### Library Installations ##############

################### Imports ####################
### Data wrangling
import pandas as pd # working with dataframes
import numpy as np # vector operationsõ


### Specific-purpose libraries
# NB! Most configure with an API key
#from pybliometrics.scopus import AbstractRetrieval
from habanero import Crossref # CrossRef API
from genderize import Genderize # Gender API

### Misc
from math import floor
import time
import requests
import warnings # suppress warnings
import os # accessing directories
from tqdm import tqdm # track loop runtime
from unidecode import unidecode # international encoding fo names

### Custom Scripts (ETL, augmentations, SQL)
from scripts.raw_to_tables import *
from scripts.augmentations import *
from scripts.final_tables import *
from scripts.sql_queries import *

### Database drivers
import psycopg2

########## SETTING ENV PARAMETERS ################
warnings.filterwarnings('ignore') # suppress warnings

## Pipeline start

In [31]:
# First check if the tables are already in the system
## If tables exist, import from .csv

if os.path.exists('./tables') and len(os.listdir('./tables')) == 8: # directory + 7 tables
    print('Tables exist...')
    author = pd.read_csv('./tables/author.csv')
    authorshiphip = pd.read_csv('./tables/authorship.csv')
    article = pd.read_csv('./tables/article.csv')
    article_category = pd.read_csv('./tables/article_category.csv')
    category = pd.read_csv('./tables/category.csv')
    journal = pd.read_csv('./tables/journal.csv')
    print('Tables are in the working directory!')
    
## If tables do not exist, pull from kaggle (or local machine), proprocess to tables
else: 
    print('Preparing tables...')
    print()
    ingest_and_prepare()
    print('Tables are in the working directory!')

Tables exist...
Tables are in the working directory!


# 2. Loading Clean Data or Data Augmentation

In [32]:
article = article_ready()
journal = journal_ready()

# Remove not found journals from articles
article = article[article['journal_issn'].isin(journal['journal_issn'])].reset_index(drop = True)
# Update 'article.csv' in 'data_ready' directory
article.to_csv('./data_ready/article.csv', index = False)

authorship = authorship_ready(article)
author = author_ready(article, authorship)
article_category = article_category_ready(article)
category = category_ready(article_category)

Clean table 'article' exists and loaded to pwd
Clean table 'journal' exists and loaded to pwd.
Clean table 'authorship' exists and loaded to pwd.
Clean table 'author' exists and loaded to pwd.
Clean table 'article_category' exists and loaded to pwd.
Clean table 'category' exists and loaded to pwd.


### Author update and augments
In order to query 'gender' of a given author, we first extract all valid (length > 3) first names. We acknowledge that there may be first names that are smaller than four characters in length, but given that query amount is limited, we are going with a more robust way to extract as many names as possible.

### Journal
In order to get the journal information, we need the journal ISSN list from the `article` table. Although journal Impact Factor are more common metrics, they are trademarked and, hence, retrieving them is not open-source. The alternative is to use SNIP - source-normalized impact per publication. This is the average number of citations per publication, corrected for differences in citation practice between research domains. Fortunately, the list of journals and their SNIP is available from the CWTS website (https://www.journalindicators.com/).

# 3. From Pandas to PostgreSQL

In [33]:
# Import the data from Pandas
article = pd.read_csv('data_ready/article.csv')
author = pd.read_csv('data_ready/author.csv')
authorship = pd.read_csv('data_ready/authorship.csv')
category = pd.read_csv('data_ready/category.csv')
article_category = pd.read_csv('data_ready/article_category.csv')
journal = pd.read_csv('data_ready/journal.csv')

tables = [article, author, authorship, category, article_category, journal]

# Name of tables (for later print)
article.name = 'article'
author.name = 'author'
authorship.name = 'authorship'
category.name = 'category'
article_category.name = 'article_category'
journal.name = 'journal'

In [34]:
# Insert into tables (helper function)
def insert_to_tables(table, query):
    ''' Helper function for inserting values to Postresql tables
    Args:
        table (pd.DataFrame): pandas table
        query (SQL query): correspondive SQL query for 'table' for data insertion in DB
    '''
    
    print(f'Inserting table -- {table.name} -- ...')
    
    try:
        for i, row in table.iterrows():
            cur.execute(query, list(row))
        print(f'Table -- {table.name} -- successfully inserted!')
    except:
        print(f'Error with table -- {table.name} --')
    print()

In [35]:
    # Connect to the database
conn = psycopg2.connect(host="postgres", user="postgres", password="password", database="postgres")
conn.set_session(autocommit=True)
cur = conn.cursor()

    # create sparkify database with UTF8 encoding
cur.execute("DROP DATABASE IF EXISTS research_db")
cur.execute("CREATE DATABASE research_db WITH ENCODING 'utf8' TEMPLATE template0")

In [36]:
# Drop Tables 
for query in drop_tables:
    cur.execute(query)
    conn.commit()
    
    # Create Tables
for query in create_tables:
    cur.execute(query)
    conn.commit()

In [37]:
# Insert into tables
for i in tqdm(range(len(tables))):
    insert_to_tables(tables[i], insert_tables[i])

  0%|          | 0/6 [00:00<?, ?it/s]

Inserting table -- article -- ...


 17%|█▋        | 1/6 [00:11<00:57, 11.55s/it]

Table -- article -- successfully inserted!

Inserting table -- author -- ...


 33%|███▎      | 2/6 [00:27<00:56, 14.03s/it]

Table -- author -- successfully inserted!

Inserting table -- authorship -- ...


 50%|█████     | 3/6 [00:52<00:57, 19.31s/it]

Table -- authorship -- successfully inserted!

Inserting table -- category -- ...
Table -- category -- successfully inserted!

Inserting table -- article_category -- ...


 83%|████████▎ | 5/6 [01:09<00:13, 13.03s/it]

Table -- article_category -- successfully inserted!

Inserting table -- journal -- ...


100%|██████████| 6/6 [01:09<00:00, 11.64s/it]

Table -- journal -- successfully inserted!






# Database Connection

In [None]:
# !conda install psycogp2
# !pip install ipython-sql
# !pip install sqlalchemy

In [6]:
%load_ext sql
%sql postgresql://postgres:password@postgres/postgres

## Load the possiblity to run magic function

# Test Queries

In [7]:
%sql SELECT * FROM authorship LIMIT 10;

 * postgresql://postgres:***@postgres/postgres
10 rows affected.


article_id,author_id
1001.0001,KrotovD
1001.0001,HedenO
1001.0041,IndykP
1001.0041,SzarekS
1001.0361,GargouriY
1001.0361,HajjemC
1001.0361,LariviereV
1001.0361,GingrasY
1001.0361,CarrL
1001.0361,BrodyT


In [42]:
%sql SELECT * FROM article_category LIMIT 10;

 * postgresql://postgres:***@postgres/postgres
10 rows affected.


article_id,category_id
1001.0001,cs.IT
1001.0001,math.IT
1001.0041,math.MG
1001.0041,cs.CC
1001.0041,math.FA
1001.0361,cs.CY
1001.0361,cs.DL
1001.0639,cs.DS
1001.0641,cs.LO
1001.0641,cs.GT


In [None]:
%sql SELECT * FROM article LIMIT 10;

In [None]:
%sql SELECT * FROM category LIMIT 10;

In [None]:
%sql SELECT * FROM journal LIMIT 10;

# 4. Preparing Graph DB Data
In essence, we need to (a) rename the attributes to be compliant with Neo4J notation, and (b) save the above-created tables to .csv-s: https://medium.com/@st3llasia/analyzing-arxiv-data-using-neo4j-part-1-ccce072a2027

- about network analysis with these data in Neo4J: https://medium.com/swlh/network-analysis-of-arxiv-dataset-to-create-a-search-and-recommendation-engine-of-articles-cd18b36a185e

- link prediction: https://towardsdatascience.com/link-prediction-with-neo4j-part-2-predicting-co-authors-using-scikit-learn-78b42356b44c

The Graph Database Schema is pictured below:
<img src="images/graph_db_schema.png"/>

Tutorial: https://www.youtube.com/watch?v=PfySvVqHAWo&t=33s

In [43]:
from neo4j import GraphDatabase

In [44]:
# https://github.com/cj2001/bite_sized_data_science/blob/main/notebooks/part3.ipynb

class Neo4jConnection:
    
    def __init__(self, uri, user, pwd):
        
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db=None):
        
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response

In [45]:
conn = Neo4jConnection(uri='bolt://neo:7687', user='', pwd='')

In [48]:
result = conn.query('MATCH (n) RETURN COUNT(n) AS ct')
print(result[0]['ct'])

0


In [47]:
# Delete all nodes
conn.query('MATCH (a) DELETE a')

[]

In [49]:
def insert_data(query, rows, batch_size = 10000):
    # Function to handle the updating the Neo4j database in batch mode.
    
    total = 0
    batch = 0
    start = time.time()
    result = None
    
    while batch * batch_size < len(rows):

        res = conn.query(query, 
                         parameters = {'rows': rows[batch*batch_size:(batch+1)*batch_size].to_dict('records')})
        total += res[0]['total']
        batch += 1
        result = {"total":total, 
                  "batches":batch, 
                  "time":time.time()-start}
        
    return result

In [50]:
def add_category(rows):
    query = """UNWIND $rows AS row
               MERGE (:Category {id: row.category_id, superdom: row.superdom, subdom: row.subdom})
               RETURN COUNT(*) AS total
            """
    return insert_data(query, rows)

add_category(category)

{'total': 136, 'batches': 1, 'time': 0.28398895263671875}

In [51]:
def add_journal(rows):
    
    query = """UNWIND $rows AS row
               MERGE (:Journal {id: row.journal_issn, title: row.journal_title, snip: row.snip_latest})
               RETURN COUNT(*) AS total
            """
    return insert_data(query, rows)

add_journal(journal)

{'total': 2219, 'batches': 1, 'time': 8.947782278060913}

In [None]:
def add_article(rows):
    
    query = """UNWIND $rows AS row
               MERGE (:Article {id: row.article_id, title: row.title, n_authors: row.n_authors,
               n_cites: row.n_cites, year: row.year})
               RETURN COUNT(*) AS total
            """
    return insert_data(query, rows)

add_article(article)

In [None]:
def add_author(rows):
    
    query = """UNWIND $rows AS row
               MERGE (:Author {id: row.author_id, last_name: row.last_name, first_name: row.first_name,
            middle_name: row.middle_name, gender: row.gender, total_pubs: row.total_pubs,
            total_cites: row.total_cites, avg.cites: row.avg_cites, med_coauthors: row.med_coauthors,
            n_unique_coauthors: row.n_unique_coauthors, hindex:row.hindex,
            rank_total_pubs: row.rank_total_pubs, rank_total_cites: row.rank_total_cites,
            rank_avg_cites: row.rank_avg_cites,rank_hindex: row.rank_hindex})
               RETURN COUNT(*) AS total
            """
    return insert_data(query, rows)

add_author(author)

In [None]:
--def add_authorship(rows):
    
    query = """UNWIND $rows AS row
               MERGE (:Author {id: row.author_id, last_name: row.last_name, first_name: row.first_name,
            middle_name: row.middle_name, gender: row.gender, total_pubs: row.total_pubs,
            total_cites: row.total_cites, avg.cites: row.avg_cites, med_coauthors: row.med_coauthors,
            n_unique_coauthors: row.n_unique_coauthors, hindex:row.hindex,
            rank_total_pubs: row.rank_total_pubs, rank_total_cites: row.rank_total_cites,
            rank_avg_cites: row.rank_avg_cites,rank_hindex: row.rank_hindex})
               RETURN COUNT(*) AS total
            """
    return insert_data(query, rows)

add_author(author)

In [None]:
result = conn.query('MATCH (n) RETURN COUNT(n) AS ct')
print(result[0]['ct'])

In [None]:
from py2neo import Graph
#!pip install ipython-cypher
try:
    graph = Graph("bolt://neo:7687")
    print('Neo4J connection established!')
except:
    print("Error Connection to Neo4j DB!!")
    
print('DB schema:')
graph.run('call db.schema()')

# 5. Example Queries

## 5.1. Data Warehouse

## 5.2. Graph Database

## Total Pipeline Runtime

In [None]:
end_pipe = time.time()

print(f'Time of pipeline start: {time.ctime(end_pipe)}')
print(f'Total pipeline runtime: {(end_pipe - start_pipe)/60} min.')