# Batched CSV File loader 
## Example of loading Northwind Neo4j using batched transactions

### mark.quinsland@neo4j.com
### October 6, 2020


## Overview

This is an example of a simple alternative to using Neo4j's LOAD CSV command.  LOAD CSV is essentially an EtL tool because Cypher has almost no ability to Transform the data in between the Extract (from CSV) and Load (to Neo) stages.    

This alternative uses Python libraries to Extract the CSV data and to Load the data to Neo, while the Transformation step can be whatever the use case calls for. 

In this example, a CSV reader obtains data from a simple CSV file, but in reality, this source could be anything - A JSON file, MySQL via JDBC, a Kafka queue, etc.  

After any user-defined processing, values are placed into dictionaries of key/value pairs.  One dictionary is created per node and/or relationship.   The dictionaries are loaded into lists.  The lists are submitted to Neo, along with a custom Cypher statement using the Neo4j Pytho driver.  The Cypher statement includes the UNWIND command which creates an iterator for Neo to process each dictionary as a 'row' of data.

## Avoiding the 'Eager' delays:
The Cypher statements in this example are set up to avoid the delays that 'Eager' processing can add to any Cypher statement.   All Cypher statements can be evaluated in either Lazy or Eager fashion.  Lazy evaluation allows Cypher to defer processing until it is absolutely necessary.   Eager evaluation forces Neo to stop processing until the item can be resolved.   For a fuller discussion, please read the following post.
https://markhneedham.com/blog/2014/10/23/neo4j-cypher-avoiding-the-eager/

Loading data is prone to the Eager delay because Neo must obtain an ID for each of the nodes and relationships.  A loading process that does a MERGE for the nodes followed by a MERGE for the relationships is doomed to be affected by the Eager delay.

A way to avoid Eager delays is to create all nodes (using MERGE if desired) in one pass, and then create relatiosnips in a second pass.  This allows the relationships to be MERGED/CREATED with IDs that are readily available without incurring the Eager penalty.


In [None]:
from neo4j import GraphDatabase
import neo4j
import csv
import requests

from time import process_time 

In [None]:
# get 
uri = 'neo4j://localhost:7687'
username = 'neo4j'
password = 'nimda'

try:
    driver = GraphDatabase.driver(uri, auth=(username, password), max_connection_lifetime=1000)
except Exception as e:
    print(type(e))
    print(e)


## Create Schema Constraints and Indices

This calls the apoc.schema.assert procedure to create index-backed constraints and non-constraining indices.


In [None]:

# indexes are in first map, constraints are in second map, TRUE will drop all existing constraints and indices
# this API call does not currently support creating compound indexes such as Node Keys.  They must be created separately.

schemaIndexCypherStmt = ''' 

CALL apoc.schema.assert(
    {
     Employee:['lastName','region'],
     Supplier:['name'],
     Product:['name','category'] 
    }, 
    {
      Customer:['id'],
      Employee:['id'],
      Product:['id'],
      Supplier:['id']
      }, TRUE)
'''
with driver.session(default_access_mode=neo4j.WRITE_ACCESS) as session:
    result = session.run(schemaIndexCypherStmt)



# Cypher to  Load Nodes and Relationships


In [None]:
suppliersCypher = ''' 
    UNWIND $rows as row
    MERGE (s:Supplier { id: row.id }) 
        ON CREATE SET s += row,
                      s.created = date()
   RETURN count(*) as count  
'''

productsCypher = ''' 
    UNWIND $rows as row
    MERGE (p:Product { id: row.id }) 
        ON CREATE SET p += row,
                      p.discontinued = apoc.convert.toBoolean(row.discontinued),
                      p.created = date()
    RETURN count(*) as count  
'''


customersCypher = ''' 
    UNWIND $rows as row
    MERGE (c:Customer { id: row.id }) 
        ON CREATE SET c += row,
                      c.created = date()
    RETURN count(*) as count  
'''


employeesCypher = ''' 
    UNWIND $rows as row
    MERGE (e:Employee { id: row.id }) 
        ON CREATE SET e += row,
                      e.birthDate = date(row.birthDate),
                      e.hireDate = date(row.hireDate),
                      e.created = date()
    RETURN count(*) as count  
'''

employeeReportsCypher = ''' 
    UNWIND $rows as row
    
    MATCH (from:Employee { id: row.fromId }) 
    MATCH (to:Employee { id: row.toId })
    
    MERGE (from)-[:REPORTS_TO]->(to)
    
    RETURN count(*) as count  
'''

productSupplierCypher = ''' 
    UNWIND $rows as row
    
    MATCH (p:Product { id: row.productID }) 
    MATCH (s:Supplier { id: row.supplierID })
    
    MERGE (s)-[:SUPPLIES]->(p)
    
    RETURN count(*) as count  
'''



## Semi-Generic node loader
This function will run a cypher statement within a write transaction


In [None]:
## generic node loader

def create_node_tx(tx, cypherStmt,rows):
    # print ('running ', cypherStmt, 'rowCount',len(rows))
    result = tx.run(cypherStmt,rows)
    record = result.single()
    value = record.value()
    info = result.consume()
    return value, info



# Suppliers
### URI: http://data.neo4j.com/northwind/suppliers.csv
### Sample Row
``` json
{
  "country": "UK",
  "supplierID": "1",
  "address": "49 Gilbert St.",
  "contactTitle": "Purchasing Manager",
  "city": "London",
  "phone": "(171) 555-2222",
  "contactName": "Charlotte Cooper",
  "postalCode": "EC1 4SD",
  "companyName": "Exotic Liquids",
  "fax": "NULL",
  "region": "NULL",
  "homePage": "NULL"
}
```

In [None]:
url= 'http://data.neo4j.com/northwind/suppliers.csv'

maxBatchSize = 10

response = urllib.request.urlopen(url)
lines = [l.decode('utf-8') for l in response.readlines()]
cr = csv.DictReader(lines)
 
suppliers = []


for row in cr:
  
    # process the current record
    supplier = {'id':int(row['supplierID']),
                      'name': row['companyName'],
                      'contactName': row['contactName'],
                      'contactTitle': row['contactTitle'],
                      'country':row['country']             
                } 
    # don't load Null or NONE values
    if row['address'] != "NULL":
        supplier['address'] = row['address']
    if row['region'] != "NULL":
        supplier['region'] = row['region']
    if row['fax'] != "NULL":
        supplier['fax'] = row['fax']
    if row['city'] != "NULL":
        supplier['city'] = row['city']
    if row['postalCode'] != "NULL":
        supplier['postalCode'] = row['postalCode']
    if row['phone'] != "NULL":
        supplier['phone'] = row['phone']
    if row['homePage'] != "NULL":
        supplier['homePage'] = row['homePage']
    
    suppliers.append(supplier)
    
    # check to see if we need to finalize the current batch
    if len(suppliers) >= maxBatchSize  :
        #print ('suppliers batch ', suppliers)
        with driver.session() as session:
            count = session.write_transaction(create_node_tx, suppliersCypher,{'rows':suppliers})
        suppliers = []
    
## end of row loop

## finish loading any stragglers
if len(suppliers) >= 1 :
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, suppliersCypher,{'rows':suppliers})

   
    
   

# Products
### URI: http://data.neo4j.com/northwind/products.csv
### Sample Row
``` json
{
  "reorderLevel": "10",
  "unitsInStock": "39",
  "unitPrice": "18.00",
  "supplierID": "1",
  "productID": "1",
  "discontinued": "0",
  "quantityPerUnit": "10 boxes x 20 bags",
  "categoryID": "1",
  "unitsOnOrder": "0",
  "productName": "Chai"
}
```

In [None]:
url= 'http://data.neo4j.com/northwind/products.csv'
response = urllib.request.urlopen(url)
lines = [l.decode('utf-8') for l in response.readlines()]
cr = csv.DictReader(lines)


products = []
productSuppliers = []

# get the pre-loaded product categories dictionary
categories = getCategories()
#print (categories)

for row in cr:
    if len(products) >= maxBatchSize  : # load a batch full of products, then reset
        with driver.session() as session:
            count = session.write_transaction(create_node_tx, productsCypher,{'rows':products})
        products = []

    products.append({'id':int(row['productID']),
                     'name': row['productName'],
                     'discontinued':int(row['discontinued']),
                     'unitPrice':float(row['unitPrice']),
                     'reorderLevel':int(row['reorderLevel']),
                     'unitsOnOrder':int(row['unitsOnOrder']),
                     'quantityPerUnit':row['quantityPerUnit'],
                     'unitsInStock':int(row['unitsInStock']),
                     'category':categories[row['categoryID']]        
                })
    # create a row for the supplier relationship
    productSuppliers.append({'productID':int(row['productID']),
                             'supplierID':int(row['supplierID'])})
    
    

## finish loading any stragglers left over
if len(products) >= 1 :
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, productsCypher,{'rows':products})
        
# Now load the (:Supplier)-[:SUPPLIES]-(:Product) relationship
# Don't load this unitl ALL relevant products and suppliers have been loaded
if len(productSuppliers) >= 1 :
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, productSupplierCypher,{'rows':productSuppliers})



# Customers 
### URI: http://data.neo4j.com/northwind/customers.csv
### sample row

``` JSON
{
  "country": "Germany",
  "address": "Obere Str. 57",
  "contactTitle": "Sales Representative",
  "city": "Berlin",
  "phone": "030-0074321",
  "contactName": "Maria Anders",
  "postalCode": "12209",
  "companyName": "Alfreds Futterkiste",
  "customerID": "ALFKI",
  "fax": "030-0076545",
  "region": "NULL"
}

```



In [None]:
url= 'http://data.neo4j.com/northwind/customers.csv'

maxBatchSize = 10

response = urllib.request.urlopen(url)
lines = [l.decode('utf-8') for l in response.readlines()]
cr = csv.DictReader(lines)
 
customers = []


for row in cr:
  
    # process the current record
    customer = {'id': row['customerID'],
                  'name': row['companyName'],
                  'contactName': row['contactName'],
                  'contactTitle': row['contactTitle'],
                  'country':row['country'] } 
    # don't load Null or NONE values
    if row['address'] != "NULL":
        customer['address'] = row['address']
    if row['region'] != "NULL":
        customer['region'] = row['region']
    if row['fax'] != "NULL":
        customer['fax'] = row['fax']
    if row['city'] != "NULL":
        customer['city'] = row['city']
    if row['postalCode'] != "NULL":
        customer['postalCode'] = row['postalCode']
    if row['phone'] != "NULL":
        customer['phone'] = row['phone']
    
    customers.append(customer)
    
    # check to see if we need to finalize the current batch
    if len(customers) >= maxBatchSize  :
        with driver.session() as session:
            count = session.write_transaction(create_node_tx, customersCypher,{'rows':customers})
        customers = []
    
## end of row loop

## finish loading any stragglers
if len(customers) >= 1 :
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, customersCypher,{'rows':customers})

   
  

# Employees
### URI: http://data.neo4j.com/northwind/employees.csv
### sample row

``` json
{
  "country": "USA",
  "lastName": "Davolio",
  "hireDate": "1992-05-01 00:00:00.000",
  "extension": "5467",
  "address": "507 - 20th Ave. E. Apt. 2A",
  "notes": "Education includes a BA in psychology from Colorado State University in 1970.  She also completed "The Art of the Cold Call."  Nancy is a member of Toastmasters International.",
  "city": "Seattle",
  "photoPath": "http://accweb/emmployees/davolio.bmp",
  "postalCode": "98122",
  "homePhone": "(206) 555-9857",
  "photo": "0x151C2F000200000...0000000000",
  "reportsTo": "2",
  "employeeID": "1",
  "title": "Sales Representative",
  "titleOfCourtesy": "Ms.",
  "birthDate": "1948-12-08 00:00:00.000",
  "firstName": "Nancy",
  "region": "WA"
}
```

In [None]:
url= 'http://data.neo4j.com/northwind/employees.csv'

maxBatchSize = 10

response = urllib.request.urlopen(url)
lines = [l.decode('utf-8') for l in response.readlines()]
cr = csv.DictReader(lines)
 
employees = []
employeeReports = []

for row in cr:
  
    # process the current record
    employee = {'id': int(row['employeeID']),
                  'firstName': row['firstName'],
                  'lastName': row['lastName'],
                  'title': row['title'],
                  'country':row['country'] } 
    # don't load Null or NONE values
    if row['address'] != "NULL":
        employee['address'] = row['address']
    if row['region'] != "NULL":
        employee['region'] = row['region']
    if row['city'] != "NULL":
        employee['city'] = row['city']
    if row['postalCode'] != "NULL":
        employee['postalCode'] = row['postalCode']
    if row['homePhone'] != "NULL":
        employee['homePhone'] = row['homePhone']
    if row['extension'] != "NULL":
        employee['extension'] = row['extension']
    if row['titleOfCourtesy'] != "NULL":
        employee['prefix'] = row['titleOfCourtesy']
    if row['birthDate'] != "NULL":
        employee['birthDate'] = row['birthDate'][0:10]
    if row['hireDate'] != "NULL":
        employee['hireDate'] = row['hireDate'][0:10]
    
    employees.append(employee)
    
    # create an entry for the employee-[:REPORTS_TO]-employee relationship
    if row['reportsTo'] != "NULL":
        employeeReports.append({'fromId': int(row['employeeID']),'toId': int(row['reportsTo']) })
    
    # check to see if we need to finalize the current batch
    if len(employees) >= maxBatchSize  :
        with driver.session() as session:
            count = session.write_transaction(create_node_tx, employeesCypher,{'rows':employees})
        employees = []
    
## end of row loop

## finish loading any stragglers
if len(employees) >= 1 :
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, employeesCypher,{'rows':employees})

## Now write out the [:REPORTS_TO] relationships
if len(employeeReports) > 0:
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, employeeReportsCypher,{'rows':employeeReports})


    

# Orders
## URI: http://data.neo4j.com/northwind/orders.csv
### sample row

``` JSON
{
  "shipCity": "Reims",
  "orderID": "10248",
  "freight": "32.38",
  "requiredDate": "1996-08-01 00:00:00.000",
  "employeeID": "5",
  "shipName": "Vins et alcools Chevalier",
  "shipPostalCode": "51100",
  "shipCountry": "France",
  "shipAddress": "59 rue de l'Abbaye",
  "shipVia": "3",
  "customerID": "VINET",
  "orderDate": "1996-07-04 00:00:00.000",
  "shipRegion": "NULL",
  "shippedDate": "1996-07-16 00:00:00.000"
}
```

In [None]:
url= 'http://data.neo4j.com/northwind/orders.csv'

maxBatchSize = 10

response = urllib.request.urlopen(url)
lines = [l.decode('utf-8') for l in response.readlines()]
cr = csv.DictReader(lines)
 
order = []
customerOrder = []
employeeOrder = []

for row in cr:
  
    # process the current record
    employee = {'id': int(row['employeeID']),
                  'firstName': row['firstName'],
                  'lastName': row['lastName'],
                  'title': row['title'],
                  'country':row['country'] } 
    # don't load Null or NONE values
    if row['address'] != "NULL":
        employee['address'] = row['address']
    if row['region'] != "NULL":
        employee['region'] = row['region']
    if row['city'] != "NULL":
        employee['city'] = row['city']
    if row['postalCode'] != "NULL":
        employee['postalCode'] = row['postalCode']
    if row['homePhone'] != "NULL":
        employee['homePhone'] = row['homePhone']
    if row['extension'] != "NULL":
        employee['extension'] = row['extension']
    if row['titleOfCourtesy'] != "NULL":
        employee['prefix'] = row['titleOfCourtesy']
    if row['birthDate'] != "NULL":
        employee['birthDate'] = row['birthDate'][0:10]
    if row['hireDate'] != "NULL":
        employee['hireDate'] = row['hireDate'][0:10]
    
    employees.append(employee)
    
    # create an entry for the employee-[:REPORTS_TO]-employee relationship
    if row['reportsTo'] != "NULL":
        employeeReports.append({'fromId': int(row['employeeID']),'toId': int(row['reportsTo']) })
    
    # check to see if we need to finalize the current batch
    if len(employees) >= maxBatchSize  :
        with driver.session() as session:
            count = session.write_transaction(create_node_tx, employeesCypher,{'rows':employees})
        employees = []
    
## end of row loop

## finish loading any stragglers
if len(employees) >= 1 :
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, employeesCypher,{'rows':employees})

## Now write out the [:REPORTS_TO] relationships
if len(employeeReports) > 0:
    with driver.session() as session:
        count = session.write_transaction(create_node_tx, employeeReportsCypher,{'rows':employeeReports})


  

In [None]:
# dont forget to close the connection when done
driver.close()

In [None]:
with driver.session() as session:
    result = session.run("MATCH (a:Event) RETURN a.id AS id limit 10")
    for record in result:
        print (record)


In [None]:
cypherStmt = ''' 
UNWIND $rows as row
CREATE (n:NodeExample { name: row.name }) 
RETURN count(*) as count
'''
def create_node_tx(tx, rows):
    result = tx.run(cypherStmt,rows)
    record = result.single()
    print (record)
    return record["count"]

rows = []
rows.append({'id':123,'name':'mark'})
rows.append({'id':2334,'name':'abby'})
with driver.session() as session:
    node_id = session.write_transaction(create_node_tx, {'rows':rows})

## Product Categories
Instead of following the typical RDBMS 3rd Normal Form approach creating a separate table for categories,
we're going to store denormalized category names with the product nodes.    
This chunk of code will obtain the category values from a URI and load them into a simple dictionary.  This will be used when loading the products.

### sample row
``` json
{
  "description": "Soft drinks, coffees, teas, beers, and ales",
  "categoryName": "Beverages",
  "categoryID": "1",
  "picture": "0x151C2F0...80000"
}
```



In [None]:
# load in a lookup table 
def getCategories ():
    categories = {}
    
    url= 'http://data.neo4j.com/northwind/categories.csv'
    response = urllib.request.urlopen(url)
    lines = [l.decode('utf-8') for l in response.readlines()]
    cr = csv.DictReader(lines)
 

    for row in cr:
        categories [row['categoryID']] = row['categoryName']
    
    return categories

In [None]:
categories = getCategories()
print (categories)