# 1.1-load-gfe-db / Load Database

Development notebook for executing Cypher transactions through the Neo4j HTTP API.

Activities:
- Generate S3 pre-signed URL for each CSV
- Update the load script with the pre-signed URL
- Execute the load script via Run Command
- Validate that the load script has been executed or is running

In [24]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [25]:
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv());

In [26]:
import os
import sys
sys.path.append("../") if "../" not in sys.path else None
import logging
import re
import time
import base64
import json
import requests
import boto3

In [27]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [28]:
# !source ../scripts/set_env.sh

## Generate Pre-signed URLs for CSV files

In [29]:
# Environment variables
root = os.environ["ROOT"]
scripts_dir = os.environ["BIN_DIR"]
src_dir = os.environ["SRC_DIR"]
data_dir = os.environ["DATA_DIR"]
logs_dir = os.environ["LOGS_DIR"]
cypher_dir = os.environ["CYPHER_PATH"]
load_script = os.environ["SCRIPT"]

s3_bucket = os.environ["GFE_BUCKET"]
release = os.environ["RELEASES"]

In [30]:
release = '3410'

In [31]:
path = "/".join([logs_dir, "s3CopyLog.txt"])
path = "." + path

def parseS3CopyLog(path):
    
    logger.info(f"Parsing file: {path}")
    
    # Identify S3 urls in text
    pattern = r's3:\/\/([^/]+)\/([\w\W]+)\.(.*)'
    s3_urls = []
    
    try:
        with open(path, "r") as file:
            for line in file.readlines():
                if "upload" in line:
                    s3_url = re.search(pattern, line).group(0)
                    s3_urls.append(s3_url)
                    logger.info(f"Found: {s3_url}")
    except Exception as err:
        logger.error("Could not parse file")
        raise err
        
    return s3_urls

In [32]:
s3_urls = parseS3CopyLog(path)

In [33]:
s3_urls

['s3://gfe-db-4498/data/3440/csv/all_groups.3440.csv',
 's3://gfe-db-4498/data/3440/csv/all_cds.3440.csv',
 's3://gfe-db-4498/data/3440/csv/gfe_sequences.3440.csv',
 's3://gfe-db-4498/data/3440/csv/all_features.3440.csv',
 's3://gfe-db-4498/data/3440/csv/all_alignments.3440.csv']

In [12]:
# Get the service client.
s3 = boto3.client('s3')

def generate_presigned_urls(s3_urls, expire=3600):
    """Accepts a list of S3 URLs or paths and returns
    a dictionary of pre-signed URLs for each"""
    
    logger.info(f"Generating pre-signed URLs...")
    
    s3_urls = [s3_urls] if not isinstance(s3_urls, list) else s3_urls
    
    presigned_urls = {}
    
    for s3_url in s3_urls:
        
        i = 2 if "s3://" in s3_url else 0
        
        bucket = s3_url.split("/")[i]
        key = "/".join(s3_url.split("/")[i + 1:])
        
        # Generate the URL to get 'key-name' from 'bucket-name'
        url = s3.generate_presigned_url(
            ClientMethod='get_object',
            Params={
                'Bucket': bucket,
                'Key': key
            },
            ExpiresIn=expire
        )
        
        presigned_urls[s3_url] = url
        
    return presigned_urls

In [13]:
presigned_urls = generate_presigned_urls(s3_urls)

## Update the load script with the pre-signed URL

In [14]:
cypher_path = "/".join([root, f'{cypher_dir}/{load_script}'])
cypher_path = "." + cypher_path

In [15]:
cypher_path

'../neo4j/cypher/load.cyp'

In [16]:
def update_cypher(cypher_path):
    """Replaces instances of "file:///{csv_prefix}.RELEASE.csv" with
    an S3 pre-sign URL"""

    with open(cypher_path, "r") as file:
        cypher_script = file.read()

    for s3_url in s3_urls:

        csv_prefix = s3_url.split("/")[-1].split(".")[0]
        cypher_script = cypher_script.replace(f'file:///{csv_prefix}.RELEASE.csv', presigned_urls[s3_url])
        
    return cypher_script

In [17]:
cypher_script = update_cypher(cypher_path)

## HTTP Request to load GFE DB

In [None]:
protocol = 'http'
host = "44.192.54.30"
username = "neo4j"
password = "gfedb"
port = "7474"
endpoint = "db/neo4j/tx/commit"
url = f'{protocol}://{host}:{port}/{endpoint}'

### Test Neo4j Server

In [None]:
url = f"http://{host}:{port}"

# Send requests
response = requests.get(
    url, 
    headers={
        "Accept": "application/json"
    })

print(response.text)

In [19]:
# cypher = "MATCH (m:Movie) RETURN m LIMIT 3"
cypher = list(filter(lambda x: x != "\n", cypher_script.split(";")))
cypher = list(map(lambda x: "".join([x, ";"]), cypher))

In [20]:
def run_cypher(cypher, debug=False):
    
    payload = {
        "statements": [
            {
                "statement": cypher,
                "params": {}
            }
        ]
    }
    
    # Headers
    headers = { 
        "Accept": "application/json;charset=UTF-8",
        "Content-Type": "application/json",
        "Authorization": f"Basic {base64.b64encode(':'.join([username, password]).encode()).decode()}"
    }

    # Send requests
    response = requests.post(
        url, 
        data=json.dumps(payload), 
        headers=headers)
    
    response = json.loads(response.content)
    
    # if debug:
    #     print(f"statement: {statement}")
    #     print(f"response: {response}\n\n")
    
    if len(response['errors']) > 0:
        logger.error(response)
    else:
        logger.info(response)
    
    return response

In [21]:
cypher[:2]

["// RETURN '(:GFE)' AS `Creating GFE nodes...`;",
 "\nUSING PERIODIC COMMIT 50000\nLOAD CSV WITH HEADERS FROM 'https://gfe-db-4498.s3.amazonaws.com/data/3440/csv/gfe_sequences.3440.csv?AWSAccessKeyId=ASIAXXVOWRIZJYYHDE6E&Signature=UgVI0Lc9XYKWc8RM4U%2BX4wabbTw%3D&x-amz-security-token=IQoJb3JpZ2luX2VjEE0aCXVzLWVhc3QtMSJHMEUCIAvFpFXUCRSACkgUop5hIDeJypM8KwZt46acqnFIYXMUAiEAgsTtWcDRJe84hBOnrr%2B%2Bq%2F6DVRV92%2FXjWUk92PUVbZ4q9wIIZhACGgw1MzE4Njg1ODQ0OTgiDDZ9QC%2Blzc%2F2eeEY4irUAooeWPnPfyiXFFpaKFlpKDAyDt3b2KSgXOAL2NIQenwNSoOqedfCsCReho67eo24%2F31qOYzZ%2Bg6vCfkBaiBwQmg%2B9Unwk3u4gl8f6%2FZ1YTDquy0y%2Br6X4B7oLg%2BDiVk%2BJG1zkOGBIl2QxkLoZYiYpH0zEy7BXkpq6o1%2Bb568mkq00C6sm65Phc8AB7R3EHyfMxQEN%2F0hJoVTBEEPAMbsIU4%2BS0Xo1VhDreEHn76umlyF5BqEgfFeIGXEZQ%2BWilz4qUhU9vN0UVb4a8d0u92sryizg50nlJRadl3bxWNj50LgFCC0pJwOi4xoewn5jF%2FtS%2Bu7jFURdfX4KGVa5pFUXxE9YxXR18%2B9AtV8mLcKB3lH%2BLWh99zKiiC6addXoGvwTqHxKAIOhStfr9c9YLHFNur9AFHZFgKl7wr7yFD7Gk%2FZu2HuZ4YCPlJlcoSZ1KQCdr%2Bzejkwz7S2iAY6pgFNdCiioC3ah81wr3cdOQ4y

In [22]:
limit = 10
start = time.time()
for idx, statement in enumerate(cypher):
    print(f'Executing statement: {statement}')
    statement_start = time.time()
    response = run_cypher(statement, debug=True)
    statement_end = time.time()
    statement_elapsed_time = statement_end - statement_start
    logger.info(f'Statement: {statement}\nTime elapsed: {statement_elapsed_time}\nResponse: {response}')
    print(f'Time elapsed: {statement_elapsed_time}\nResponse: {response}\n\n')
    if limit and idx + 1 == limit:
        break
        
end = time.time()
time_elapsed = end - start
print(f"Time elapsed: {time_elapsed} seconds")

Executing statement: // RETURN '(:GFE)' AS `Creating GFE nodes...`;


ConnectionError: HTTPConnectionPool(host='18.206.14.94', port=7474): Max retries exceeded with url: /db/neo4j/tx/commit (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb080faca60>: Failed to establish a new connection: [Errno 60] Operation timed out'))

In [26]:
response

{'results': [{'columns': [], 'data': []}], 'errors': []}