In [1]:
from neo4j import GraphDatabase, basic_auth
import neo4j
import pandas as pd
import numpy as np
import time
import os
import aiohttp
from dotenv import load_dotenv
from pathlib import Path

In [2]:
#load the environment variables
dotenv_path = Path('~/.env')
load_dotenv(dotenv_path=dotenv_path)  # This line brings all environment variables from .env into os.environ

# Get variables
SUSTAINGRAPH_URI = os.getenv('SUSTAINGRAPH_URI')
SUSTAINGRAPH_USER = os.getenv('SUSTAINGRAPH_USER')
SUSTAINGRAPH_PASSWORD = os.getenv('SUSTAINGRAPH_PASSWORD')
database_name = os.getenv('DATABASE_NAME')

# Connect to database
driver = GraphDatabase.driver(SUSTAINGRAPH_URI, auth=(SUSTAINGRAPH_USER, SUSTAINGRAPH_PASSWORD))

# Verify connectivity
with driver.session(database=database_name) as session:
    print(session.run("RETURN 'Connected to ' + $db", db=database_name).single()[0])

Connected to neo4j


### Constraints

In [3]:
def create_constraint(tx,statement):
    tx.run(statement)

constraints = [
    """CREATE CONSTRAINT ndc_unique IF NOT EXISTS FOR (n:NDC) REQUIRE (n.text,n.typeOfInformation,n.documentType) IS NODE KEY""",
    """CREATE CONSTRAINT ndc_text_type IF NOT EXISTS FOR (n:NDC) REQUIRE n.text IS :: STRING""",
    """CREATE CONSTRAINT ndc_typeOfInformation_type IF NOT EXISTS FOR (n:NDC) REQUIRE n.typeOfInformation IS :: STRING""",
    """CREATE CONSTRAINT ndc_documentType_type IF NOT EXISTS FOR (n:NDC) REQUIRE n.documentType IS :: STRING""",
    """CREATE CONSTRAINT ndc_sector_type IF NOT EXISTS FOR (n:NDC) REQUIRE n.sector IS :: LIST<STRING NOT NULL>""",
    """CREATE CONSTRAINT ndc_status_type IF NOT EXISTS FOR (n:NDC) REQUIRE n.status IS :: LIST<STRING NOT NULL>""",  
]

with driver.session(database=database_name) as session:
    for statement_constraint in constraints:
        session.execute_write(create_constraint, statement_constraint)

### Write batch function

In [4]:
def write_batch(tx,statement, params_list):
    tx.run(statement, parameters={"parameters": params_list})

### Import NDC-SDG linkages

NDC stands for a Nationally Determined Contribution. For the Paris Agreement goals to be achieved, every country needs to play its part. Because countries have different circumstances, resources and abilities, the agreement was designed so each country defines their own pledges, in terms of targets and contributions to the universal agreement. These country pledges are the NDCs. Each country produced an [NDC document](https://www4.unfccc.int/sites/NDCStaging/Pages/All.aspx) outlining these contributions and how they will be achieved .

### Collect data from API

The [Climate Watch](https://www.climatewatchdata.org/data-explorer/ndc-sdg-linkages?ndc-sdg-linkages-countries=GRC&ndc-sdg-linkages-goals=clean-water-and-sanitation&ndc-sdg-linkages-sectors=All%20Selected&ndc-sdg-linkages-targets=All%20Selected&page=1#meta) offers an API, in order to use NDC data in our own applications. The data retrieved from then Climate-Watch API contain information about the linkages between them and the SDG targets.

In [5]:
# get the start time
st = time.time()

async with aiohttp.ClientSession() as session:
    runs_imported = 1
    page = 1
    ndcs = [] #final list with all the ndcs 
    while runs_imported > 0:
        url = 'https://www.climatewatchdata.org/api/v1/data/ndc_sdg?page='+str(page)
        async with session.get(url) as resp:
            response = await resp.json()
            runs_imported = len(response['data'])
            ndcs += response['data'] #list of ndcs 
            page += 1

# get the end time
et = time.time()

# get the execution time
elapsed_time = et - st
print('Execution time:', elapsed_time/60, 'minutes')

Execution time: 4.589588038126627 minutes


### Import data to neo4j

We collect with a cypher query the available GeoArea iso alpha-3 codes in the knowledge graph to import only the NDCs 
that refer to them. 

In [6]:
records, summary, keys = driver.execute_query("""\
        MATCH (ga:Area)
        UNWIND ga.ISOalpha3code as codes
        RETURN COLLECT(codes) as geocodes
        """,routing_="r",database_=database_name)
available_iso3codes = list(map(str,records[0]['geocodes']))

We import the NDCs to the graph and create the linkages with the SDG targets, in batches of 10000 for better memory performance

In [7]:
# Create Observation nodes in the neo4j and commit result in batches.
statement_ndc = """ 
    UNWIND $parameters as row
    WITH row 
    MATCH (ga:GeoArea),(t:Target{code:row.target_code})
    WHERE row.geo = ga.ISOalpha3code
    MERGE (n:NDC{text:row.ndc_text,typeOfInformation:row.info,documentType:row.doc_type})
    FOREACH(_ in CASE WHEN NOT n.sector IS NOT NULL THEN [1] END | SET n.sector = [row.sector])
    FOREACH(x in CASE WHEN NOT (row.sector IN n.sector) THEN [1] END |
    SET n.sector = COALESCE(n.sector, []) + row.sector)
    FOREACH(_ in CASE WHEN NOT n.status IS NOT NULL THEN [1] END | SET n.status = [row.status])
    FOREACH(x in CASE WHEN NOT (row.status IN n.status) THEN [1] END |
    SET n.status = COALESCE(n.status, []) + row.status)
    WITH n,ga,t,row
    MERGE (n)- [:REFERS_TO_AREA] ->(ga)
    MERGE (n)-[:ASSOCIATED_WITH{climateResponse:row.clim_resp}] -> (t)
"""

# Begin a new auto-commit GraphTransaction.
params,all_params = [],[]
index = 0 
batch_i=1
batch_size=10000

# get the start time
st1 = time.time()

with driver.session(database=database_name) as session:
    for ndc in ndcs:
        if(ndc['iso_code3'] in available_iso3codes):
            params_dict = {
                'geo': ndc['iso_code3'],
                'ndc_text': ndc['indc_text'].rstrip('.,;'),
                'target_code': ndc['sdg_target'].split('. ')[0],
                'info': "Not available" if ndc['type_of_information'] is None else str(ndc['type_of_information'].title()),
                'clim_resp': "Not available" if ndc['climate_response'] is None else str(ndc['climate_response'].title()),
                'status':"Not available" if ndc['status'] is None else str(ndc['status'].title()),
                'sector': "Not available" if ndc['sector'] is None else str(ndc['sector'].title()),
                'doc_type': "Not available" if ndc['document_type'] is None else str(ndc['document_type'].title())
            }
            params.append(params_dict)
            index +=1
        if index % batch_size == 0 and index > 0:
            st = time.time()
            session.execute_write(write_batch, params_list = params,statement = statement_ndc)
            et = time.time()
            # get the execution time
            elapsed_time = et - st            
            print('Batch {} with {} observations : Done! ({} minutes)'.format(batch_i,len(params),elapsed_time/60))
            batch_i +=1
            all_params = all_params + params
            params = []
    all_params = all_params + params
    if params:
            st = time.time()  # Record start time for the last batch
            session.execute_write(write_batch, params_list = params,statement = statement_ndc)
            et = time.time()
            elapsed_time = et - st
            print('{} observations: Done! ({} minutes)'.format(len(params), elapsed_time/60))
# get the end time
et1 = time.time()

# get the execution time
elapsed_time_total = et1 - st1
print('Execution time:', elapsed_time/60, 'minutes')

Batch 1 with 10000 observations : Done! (0.050255354245503744 minutes)
9660 observations: Done! (0.04829176266988119 minutes)
Execution time: 0.04829176266988119 minutes


> Check cypher query

In [9]:
df = pd.DataFrame(all_params)

# Count number of distinct ASSOCIATED rels
df2 = df.groupby(['ndc_text','doc_type','clim_resp','target_code']).size().reset_index()

records, summary, keys = driver.execute_query("""\
       match (n:NDC)-[r:ASSOCIATED_WITH]->(T:Target) RETURN COUNT(DISTINCT r) as rel
        """,routing_="r",database_=database_name)
print("{rels} ASSOCIATED_WITH (expected: {expected}) in {time} ms.".format(
    rels=records[0]['rel'],
    time=summary.result_available_after,
    expected = len(df2)
))

# Count number of distinct NDC nodes
df1 = df.groupby(['ndc_text','doc_type','info']).size().reset_index()

records, summary, keys = driver.execute_query("""\
       match (n:NDC) RETURN COUNT(DISTINCT n) AS NDC
        """,routing_="r",database_=database_name)
print("{ndc} NDCs (expected: {expected}) in {time} ms.".format(
    ndc=records[0]['NDC'],
    time=summary.result_available_after,
    expected = len(df1)
))

14716 ASSOCIATED_WITH (expected: 14716) in 1 ms.
10137 NDCs (expected: 10137) in 1 ms.
