This notebook loads Yelp data related to Toronto restaurants only.  This will eventually replace the data model in the original data load module.

In [3]:
from neo4j.v1 import GraphDatabase, basic_auth
import csv
import json
import os

data_dir = '/Users/gtenorio/neo4j_yelp_toronto/import/'

In [4]:
# Helper functions

# Specify CSV parameters through custom dialect
csv.register_dialect('custom', escapechar='\\', quotechar='"', quoting=csv.QUOTE_ALL)


def identity(x): 
    return x


def select(keys, d):
    return {key: value for (key, value) in d.items() if key in set(keys)}


# Converts the given value to a CSV formatted string
def toCSV(value):
    # Represent a list of items as a semicolon delimited string
    if type(value) == list:
        return ';'.join(value)

    # Surround fields with double quotes and handle escape characters
    if type(value) == str:
        return value.replace('"', '').replace('\\', '')

    return value


# Converts a dict to a list of CSV formatted strings, optionally restricting
# converted fields by passing in an ordered list of fields
def dictToCSV(obj, fields=None):
    fields = fields if fields is not None else obj.keys()
    return [toCSV(obj[k]) for k in fields]


# Takes a JSON file and writes it as a CSV file
def jsonToCSVFile(json_file, csv_file=None, transform=identity):

    root, ext = os.path.splitext(json_file)
    csv_file = csv_file if csv_file else root + '.csv'

    with open(json_file, 'r') as jsonFile, \
         open(csv_file, 'w') as csvFile:

        objs = transform(map(json.loads, jsonFile))
        first = next(objs)
        fields = first.keys()

        writer = csv.writer(csvFile, dialect='custom')
        writer.writerow(fields)         # write header

        writer.writerow(dictToCSV(first, fields))
        for obj in objs:
            writer.writerow(dictToCSV(obj, fields))

### Part 1: Convert Yelp JSON files to CSV format

#### A. Convert Business streaming JSON file to CSV

In [5]:
%%time
business_fields = ['business_id', 'name', 'categories', 'neighborhood', 'stars']
jsonToCSVFile(data_dir + "business.json",
              transform=lambda business: (select(business_fields, b) for b in business if b["city"] == 'Toronto' and 'Restaurants' in b['categories']))


CPU times: user 4.4 s, sys: 157 ms, total: 4.56 s
Wall time: 4.63 s


#### b. Convert User streaming JSON file to CSV

In [6]:
%%time
user_fields = ['user_id','name','average_stars']
jsonToCSVFile(data_dir + "user.json", transform=lambda user: (select(user_fields, u) for u in user))

CPU times: user 56.5 s, sys: 2.11 s, total: 58.6 s
Wall time: 59.6 s


#### c. Convert Review streaming JSON file to CSV

In [7]:
%%time
review_fields = ['user_id', 'business_id', 'stars']
jsonToCSVFile(data_dir + "review.json", transform=lambda review: (select(review_fields, r) for r in review))

CPU times: user 2min 10s, sys: 5.07 s, total: 2min 15s
Wall time: 2min 21s


### Part 2: Load CSV files into Neo4j

In [8]:
driver = GraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "neo4jneo4j"))

In [None]:
%%time

# Each time this notebook is run, we start with an empty graph database
with driver.session() as session:
    session.run("MATCH (n) DETACH DELETE n")

In [27]:
%%time

# Drop constraints from our database
with driver.session() as session:
    session.run("DROP CONSTRAINT ON (business:Business)   ASSERT business.id   IS UNIQUE")
    session.run("DROP CONSTRAINT ON (category:Category)   ASSERT category.name IS UNIQUE")
    session.run("DROP CONSTRAINT ON (user:User)           ASSERT user.id       IS UNIQUE")
    session.run("DROP CONSTRAINT ON (review:Review)       ASSERT review.id     IS UNIQUE")

CPU times: user 4.27 ms, sys: 2.24 ms, total: 6.51 ms
Wall time: 108 ms


In [9]:
%%time

# Create constraints in our database
with driver.session() as session:
    session.run("CREATE CONSTRAINT ON (business:Business)   ASSERT business.id   IS UNIQUE")
    session.run("CREATE CONSTRAINT ON (category:Category)   ASSERT category.name IS UNIQUE")
    session.run("CREATE CONSTRAINT ON (user:User)           ASSERT user.id       IS UNIQUE")
    session.run("CREATE CONSTRAINT ON (review:Review)       ASSERT review.id     IS UNIQUE")

CPU times: user 4.06 ms, sys: 2.27 ms, total: 6.33 ms
Wall time: 266 ms


#### A. Load Business data into Neo4j

In [10]:
%%time

load_business = """
    LOAD CSV WITH HEADERS FROM {input_dir} AS line                      
    WITH line 

    // Create Business nodes
    CREATE (business:Business {id: line.business_id})
    SET business.name         = line.name,
        business.neighborhood = line.neighborhood,
        business.avg_rating   = toFloat(line.stars)

    // Create Category nodes
    WITH line, business, split(line.categories, ";") as cat_list
    UNWIND cat_list as cat
    MERGE (category:Category {name: cat})
    CREATE (business)-[:IN_CATEGORY]->(category)
    """

with driver.session() as session:
    session.run(load_business, input_dir='file:///business.csv')

CPU times: user 2.74 ms, sys: 2.56 ms, total: 5.29 ms
Wall time: 4.73 s


In [11]:
%%time

# Delete category node with name "Restaurants" as we only care about restaurant sub-categories.
# This is important for when we do Label Propagation.
# We must first delete all relationships associated with "Restaurants" node.

delete_rels = """
              MATCH (b:Business)-[r:IN_CATEGORY]->(c:Category {name: 'Restaurants' })
              DELETE r
              """

with driver.session() as session:
    session.run(delete_rels)

CPU times: user 2.05 ms, sys: 2.9 ms, total: 4.95 ms
Wall time: 681 ms


In [12]:
%%time

# Now we can delete category node "Restaurants".

delete_node = """
              MATCH (c:Category {name: 'Restaurants' })
              DELETE c
              """

with driver.session() as session:
    session.run(delete_node)

CPU times: user 1.61 ms, sys: 1.66 ms, total: 3.27 ms
Wall time: 50.4 ms


#### b. Load Review data into Neo4j

In [13]:
%%time

# Note: We only want to load reviews associated with the businesses already in the graph (ie: restaurants in Toronto)

load_review = """
    // Load and commit every 50000 records
    USING PERIODIC COMMIT 50000 
    LOAD CSV WITH HEADERS FROM {input_dir} AS line                      
    WITH line 

    // Only care about businesses that are already in the graph (ie: Toronto restaurants)
    MATCH (business:Business {id:line.business_id})
    MERGE (user:User         {id:line.user_id})
    
    CREATE (user)-[:REVIEWED  {stars:toInteger(line.stars)}]->(business)
    """

with driver.session() as session:
    session.run(load_review, input_dir='file:///review.csv')

CPU times: user 8.01 ms, sys: 5.79 ms, total: 13.8 ms
Wall time: 3min 3s


#### c. Load User data into Neo4j

In [14]:
%%time

# Note: We only care about setting properties for users who are already in the graph.

load_user = """
    USING PERIODIC COMMIT 50000 
    LOAD CSV WITH HEADERS FROM {input_dir} AS line                      
    WITH line 

    // Set properties on User nodes already in the graph
    MATCH (user:User {id: line.user_id})
    SET user.name              = line.name,
        user.avg_review_rating = toFloat(line.average_stars)
    """

with driver.session() as session:
    session.run(load_user, input_dir='file:///user.csv')

CPU times: user 4.9 ms, sys: 4.87 ms, total: 9.78 ms
Wall time: 1min 28s
