# Testing out some logging 

In [25]:
import logging
import sys
config = {'logging_dir':'.'}
def get_logger(appname=None, level='WARNING'):
    """ Provides a system-level logger, accessible from any module.

        Correct function signature is get_logger(__name__).
        This will log from whatever module it was called in.

        If appname is None, the root logger will be returned.
    """
    logging.basicConfig(level=level)
    logger = logging.getLogger(appname)
    format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    # log to std out
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setFormatter(format)
    logger.addHandler(stream_handler)
    # log to the configured logfile dir
    file_handler = logging.handlers.RotatingFileHandler('{}/{}.log'.format(config['logging_dir'], 'app'),
                                               maxBytes=1e9,
                                               backupCount=5)
    file_handler.setFormatter(format)
    logger.addHandler(file_handler)
    return logger

In [26]:
logger = get_logger()

In [27]:
logger.critical('fuck')

CRITICAL:root:fuck


2016-09-15 11:14:10,468 - root - CRITICAL - fuck
2016-09-15 11:14:10,468 - root - CRITICAL - fuck
2016-09-15 11:14:10,468 - root - CRITICAL - fuck
2016-09-15 11:14:10,468 - root - CRITICAL - fuck
2016-09-15 11:14:10,468 - root - CRITICAL - fuck


In [17]:
logger.info("shouldn't print")

INFO:root:shouldn't print


2016-09-15 10:58:34,871 - root - INFO - shouldn't print
2016-09-15 10:58:34,871 - root - INFO - shouldn't print


In [18]:
logger.warning("should print")





In [19]:
logger.setLevel('INFO')

In [20]:
logger.info('yo!')

INFO:root:yo!


2016-09-15 10:58:36,511 - root - INFO - yo!
2016-09-15 10:58:36,511 - root - INFO - yo!


# Yelp ETL

In [1]:
from json import loads
import pandas as pd
from datetime import datetime
from os import path
from haystack.util.conversion import xstr

In [2]:
fname = '../haystack/sources/yelpfiles/yelp_businesses.json'

In [3]:
def yelp_business_dict(business_dict):
    return OrderedDict([
            ('business_id',xstr(business_dict['id'], encoding='utf8')),
            ('name',xstr(business_dict['name'], encoding='utf8')),
            ('phone',xstr(business_dict['phone'], encoding='utf8')),
            ('rating',business_dict['rating']),
            ('url',xstr(business_dict['url'], encoding='utf8')),
            ('business_url',xstr(business_dict['business_url'], encoding='utf8')),
            ('_last_updated',int(datetime.strptime(business_dict['time_updated'], "%Y-%m-%dT%H:%M:%S")
                                        .strftime('%s'))),
            ('is_closed',bool(business_dict['is_closed']))
            ])
        

def yelp_location_dict(location_dict):
    location = OrderedDict([
                ('latitude',location_dict['coordinate']['latitude']),
                ('longitude',location_dict['coordinate']['longitude']),
                ('line1',xstr(location_dict['address'][0], encoding='utf8')),
                ('line2',xstr(location_dict['address'][1], encoding='utf8')),
                ('line3',xstr(location_dict['address'][2], encoding='utf8')),
                ('city',xstr(location_dict['city'], encoding='utf8')),
                ('country',xstr(location_dict['country'], encoding='utf8')),
                ('postal_code',xstr(location_dict['postal_code'], encoding='utf8')),
                ('state',xstr(location_dict['state'], encoding='utf8'))
            ])
    if location['latitude'] is not None:
        location['bbox_width'] = 0
        location['bbox_height'] = 0
    else:
        location['bbox_width'] = None
        location['bbox_height'] = None
    location['_identifier'] = xstr(location_identifier(location), encoding='utf8')
    return OrderedDict(location)
    
def yelp_review_dict(review_dict):
    # there is a corner case where a review['text] field
    # will end in a '\' which causes an escape of the 
    # quote around the cell in the csv table
    # when Cypher does LOAD CSV.
    # to fix this we just replace '\' with '/'
    return OrderedDict([
        ('text',xstr(review_dict['text'].replace('\\', '/'), encoding='utf8')),
        ('rating',review_dict['rating']),
        ('_authored_date',int(datetime.strptime(review_dict['created'], '%Y-%m-%d')
                                     .strftime('%s'))),
        ('review_id',xstr(review_dict['id'], encoding='utf8'))
    ])

def yelp_category_dict(category_dict):
    return OrderedDict([
            ('alias', xstr(category_dict['alias'], encoding='utf8')),
            ('title', xstr(category_dict['title'], encoding='utf8'))
        ])

def yelp_user_dict(user_dict):
    photo_url = user_dict['photo_url'] if 'photo_url' in user_dict else ''
    return OrderedDict([
            ('name', xstr(user_dict['name'], encoding='utf8')),
            ('photo_url', xstr(photo_url, encoding='utf8'))
        ])

In [4]:
from haystack.data_models.sources import location_identifier
from haystack.config import yelp_download_config as yelp_config

In [5]:
open('yelp_single.json', 'w').write(open(fname, 'rb').next())

In [6]:
import logging
logger = logging.getLogger()
logger.setLevel('INFO')

In [7]:
from haystack.util.timing import sec_to_hms
from time import time
from collections import OrderedDict
from io import open
import csv

In [12]:
def yelp_feed_to_graph_csv(filename):
    num_businesses = sum(1 for line in open(filename, 'rb'))
    logger.info('Writing out all yelp data to csv for ETL')
    logger.info('{} total businesses... this usually takes about 10 minutes'.format(num_businesses))
    start = time()
    
    # open csv file handles
    prefix = yelp_config['neo_data_dir']
    locations_file = open(path.join(prefix, 'locations.csv'), 'wb')
    businesses_file = open(path.join(prefix, 'businesses.csv'), 'wb')
    categories_file = open(path.join(prefix, 'categories.csv'), 'wb')
    categories_businesses_file = open(path.join(prefix, 'categories_businesses.csv'), 'wb')
    reviews_file = open(path.join(prefix, 'reviews.csv'), 'wb')
    users_file = open(path.join(prefix, 'users.csv'), 'wb')
    
    # create the csv writers
    locations_writer = csv.writer(locations_file, quoting=csv.QUOTE_MINIMAL, doublequote=True)
    businesses_writer = csv.writer(businesses_file, quoting=csv.QUOTE_MINIMAL, doublequote=True)
    categories_writer = csv.writer(categories_file, quoting=csv.QUOTE_MINIMAL, doublequote=True)
    categories_businesses_writer = csv.writer(categories_businesses_file, quoting=csv.QUOTE_MINIMAL, doublequote=True)
    reviews_writer = csv.writer(reviews_file, quoting=csv.QUOTE_MINIMAL, doublequote=True)
    users_writer = csv.writer(users_file, quoting=csv.QUOTE_MINIMAL, doublequote=True)
    
    # iterate over the data and write it to file online 
    with open(filename, 'rb') as datafile:
        # these must be cached as they can have duplicates
        locations = {}
        categories ={}
        num_users = 0
        
        for i, line in enumerate(datafile):
            try:
                og_business_dict = loads(line)
            except ValueError:
                logger.warning("Broken JSON Element. Skipping...")
                continue
            
            created = int(datetime.now().strftime('%s'))
            #logger.info("Syncing business {}".format(i))
            # create the location
            location_dict = yelp_location_dict(og_business_dict['location'])
            location_dict['_created'] = created
            location_dict['_modified'] = created
            locations[location_dict['_identifier']] = location_dict
            
            # create the business node
            business_dict = yelp_business_dict(og_business_dict)
            business_dict['location_id'] = location_dict['_identifier']
            business_dict['_created'] = created
            business_dict['_modified'] = created
            if i == 0:
                businesses_writer.writerow(business_dict.keys())
            businesses_writer.writerow(business_dict.values())
            
            # create the categories
            for j, category_dict in enumerate(og_business_dict['categories']):
                category_dict = yelp_category_dict(category_dict)
                category_dict['_created'] = created
                category_dict['_modified'] = created
                categories[category_dict['alias']] = category_dict
                cat_biz_dict = OrderedDict({'business_id':business_dict['business_id'],
                                'alias':category_dict['alias']})
                if i == 0:
                    categories_businesses_writer.writerow(cat_biz_dict.keys())
                categories_businesses_writer.writerow(cat_biz_dict.values())
                
            l = len(og_business_dict['reviews'])
            for j, review_dict in enumerate(og_business_dict['reviews']):
                # print out a detailed status update
                time_sofar = time() - start
                h,m,s = sec_to_hms(time_sofar)
                print '\r Business: {0}/{1} Review: {2:>4}/{3:>4} - {4:.0f}:{5:.0f}:{6:.0f} so far'.format(
                         i+1, num_businesses, j+1, l, h,m,s),
                
                # create the user
                user_dict = yelp_user_dict(review_dict['user'])
                user_dict['id'] = num_users
                user_dict['_created'] = created
                user_dict['_modified'] = created
                num_users += 1
                if i == 0:
                    users_writer.writerow(user_dict.keys())
                users_writer.writerow(user_dict.values())
                
                # create the review
                review_dict = yelp_review_dict(review_dict)
                review_dict['business_id'] = business_dict['business_id']
                review_dict['user_id'] = user_dict['id']
                review_dict['_created'] = created
                review_dict['_modified'] = created
                if i == 0:
                    reviews_writer.writerow(review_dict.keys())
                reviews_writer.writerow(review_dict.values())
        
        # write out the locations
        for i, location_dict in enumerate(locations.values()):
            if i == 0:
                locations_writer.writerow(location_dict.keys())
            locations_writer.writerow(location_dict.values()) 
            
        # write out the categories
        for i, category_dict in enumerate(categories.values()):
            if i == 0:
                categories_writer.writerow(category_dict.keys())
            categories_writer.writerow(category_dict.values()) 
        
        # close up file handles
        locations_file.close()
        businesses_file.close()
        categories_file.close()
        categories_businesses_file.close()
        users_file.close()
        reviews_file.close()
        
        logger.info("Successfully wrote all data to csv")




In [13]:
yelp_feed_to_graph_csv('../haystack/sources/yelpfiles/yelp_businesses.json')

INFO:root:Writing out all yelp data to csv for ETL
INFO:root:39006 total businesses... this usually takes about 10 minutes


 Business: 39006/39006 Review:  367/ 424 - 0:6:6 so far 

INFO:root:Successfully wrote all data to csv


 Business: 39006/39006 Review:  368/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  369/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  370/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  371/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  372/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  373/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  374/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  375/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  376/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  377/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  378/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  379/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  380/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  381/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  382/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  383/ 424 - 0:6:6 so far  Business: 39006/39006 Review:  384/ 424 - 0:6:6 so far  Business: 39006/39006 Review:

In [None]:
!cat /Users/thomaseffland/neo4j_data/import/businesses.csv

In [16]:
num_lines = sum(1 for line in open('../haystack/sources/yelpfiles/yelp_businesses.json', 'rb'))

In [17]:
print num_lines

39006


In [14]:
from haystack.data_models.base import db

In [15]:
def create_node_string(filename, labels, fields):
    labels_string = ':'.join(labels)
    fields_string = ''
    for field in fields:
        fields_string += '{}: row.{},'.format(field, field)
    fields_string = fields_string[:-1] # drop trailing ','
    return """
    USING PERIODIC COMMIT
    LOAD CSV WITH HEADERS FROM "file:///{0}" AS row
    CREATE (:{1} {{{2}}});""".format(filename, labels_string, fields_string)

def create_edge_string(filename, e1, e2, edge_label):
    return """
    USING PERIODIC COMMIT
    LOAD CSV WITH HEADERS FROM "file:///{0}" AS row
    MATCH (e1:{e1[label]} {{{e1[index_name]}: row.{e1[col_name]}}})
    MATCH (e2:{e2[label]} {{{e2[index_name]}: row.{e2[col_name]}}})
    MERGE (e1)-[:{edge_label}]->(e2);""".format(filename, e1=e1, e2=e2, edge_label=edge_label)

def cypher_bulk_create():
    # create the location nodes
    locations_statement = create_node_string('locations.csv',
                                          ['Location'],
                                          ['_created',
                                           '_modified',
                                           'latitude',
                                           'longitude',
                                           'line1',
                                           'line2',
                                           'line3',
                                           'city',
                                           'state',
                                           'postal_code',
                                           'country',
                                           'bbox_height',
                                           'bbox_width',
                                           '_identifier'
                                          ])
    db.run(locations_statement)
    
    # create the business nodes
    business_statement = create_node_string('businesses.csv',
                                         ['YelpBusiness'],
                                         ['_created',
                                          '_modified',
                                          'business_id',
                                          'name',
                                          'phone',
                                          'rating',
                                          'url',
                                          'business_url',
                                          '_last_updated',
                                          'is_closed'
                                         ])
    db.run(business_statement)
    
    # create the reviews
    reviews_statement = create_node_string('reviews.csv',
                                         ['Document', 'YelpReview'],
                                         ['_created',
                                          '_modified',
                                          'text',
                                          'rating',
                                          'review_id',
                                          '_authored_date'
                                         ])
    db.run(reviews_statement)
    
    # create the categories
    categories_statement = create_node_string('categories.csv',
                                            ['YelpCategory'],
                                            ['_created',
                                             '_modified',
                                             'alias',
                                             'title'
                                            ])
    db.run(categories_statement)
    
    # create the users
    users_statement = create_node_string('users.csv',
                                        ['YelpUser'],
                                        ['_created',
                                         '_modified',
                                         'id',
                                         'name',
                                         'photo_url'])
    db.run(users_statement)
    
    # create indicies for each node type
    db.run("CREATE INDEX ON :Location(_identifier);")
    db.run("CREATE INDEX ON :YelpBusiness(business_id);")
    db.run("CREATE INDEX ON :YelpCategory(alias);")
    db.run("CREATE INDEX ON :YelpReview(review_id);")
    db.run("CREATE INDEX ON :YelpUser(id);")

    
    # create business->location relations
    businesses_locations_statement = create_edge_string('businesses.csv',
                                                      {'label':'YelpBusiness',
                                                       'index_name':'business_id',
                                                       'col_name':'business_id'},
                                                      {'label':'Location',
                                                       'index_name':'_identifier',
                                                       'col_name':'location_id'},
                                                      'LOCATED_AT')
    db.run(businesses_locations_statement)
    
    # create business->category relations
    categories_businesses_statement = create_edge_string('categories_businesses.csv',
                                                         {'label':'YelpBusiness',
                                                          'index_name':'business_id',
                                                          'col_name':'business_id'},
                                                         {'label':'YelpCategroy',
                                                          'index_name':'alias',
                                                          'col_name':'alias'},
                                                         'HAS_CATEGORY')
    db.run(categories_businesses_statement)
    
    # create the review->business relations
    reviews_businesses_statement = create_edge_string('reviews.csv',
                                                     {'label':'YelpReview',
                                                      'index_name':'review_id',
                                                      'col_name':'review_id'},
                                                     {'label':'YelpBusiness',
                                                      'index_name':'business_id',
                                                      'col_name':'business_id'},
                                                     'IS_REVIEW_OF')
    db.run(reviews_businesses_statement)
    
    # create the user->review relations
    users_reviews_statement = create_edge_string('reviews.csv',
                                                 {'label':'YelpUser',
                                                  'index_name':'id',
                                                  'col_name':'user_id'},
                                                 {'label':'YelpReview',
                                                  'index_name':'review_id',
                                                  'col_name':'review_id'},
                                                 'AUTHORED')
    db.run(users_reviews_statement)
    logger.info('Yelp Data Bulk Insert Completed!')
    

In [16]:
start = time()
cypher_bulk_create()
total = time() - start
print total

INFO:neo4j.bolt:~~ [CONNECT] ('localhost', 7687)
INFO:neo4j.bolt:C: [HANDSHAKE] 0x6060B017 [1, 0, 0, 0]
INFO:neo4j.bolt:S: [HANDSHAKE] 1
INFO:neo4j.bolt:C: INIT u'py2neo/3.1.2' {'credentials': 'fbnyc', 'scheme': 'basic', 'principal': 'neo4j'}
INFO:neo4j.bolt:S: SUCCESS {}
INFO:neo4j.bolt:C: RUN u'\n    USING PERIODIC COMMIT\n    LOAD CSV WITH HEADERS FROM "file:///locations.csv" AS row\n    CREATE (:Location {_created: row._created,_modified: row._modified,latitude: row.latitude,longitude: row.longitude,line1: row.line1,line2: row.line2,line3: row.line3,city: row.city,state: row.state,postal_code: row.postal_code,country: row.country,bbox_height: row.bbox_height,bbox_width: row.bbox_width,_identifier: row._identifier});' {}
INFO:neo4j.bolt:C: PULL_ALL 
INFO:neo4j.bolt:S: SUCCESS {u'fields': []}
INFO:neo4j.bolt:S: SUCCESS {u'type': u'w', u'stats': {u'labels-added': 35820, u'nodes-created': 35820, u'properties-set': 318203}}
INFO:neo4j.bolt:C: RUN u'\n    USING PERIODIC COMMIT\n    LOAD 

KeyboardInterrupt: 

In [124]:
print db.

http://localhost:7474/db/data/transaction


In [37]:
cat /Users/thomaseffland/neo4j_data/import/locations.csv

latitude,longitude,line1,line2,line3,city,country,postal_code,state,bbox_width,bbox_height,_identifier,_created,_modified
40.72061,-73.84451,107-16 71st Avenue,,,Forest Hills,US,11375,NY,0.0,0.0,107-16 71st Avenue |  |  | Forest Hills | NY | 11375 | 0 | 0,1474352437.0,1474352437.0


In [11]:
!grep -A 10 "978ede58-da87-43f3-ac84-12b503537811" /Users/thomaseffland/neo4j_data/logs/debug.log

1423026000,1474381511,1474381511,gvWzAiMrFm9AgfSN2I8Miw,3,-IykNeuM4IP_c3QxDuB_GQ,"T'. See debug.log for more details, reference 978ede58-da87-43f3-ac84-12b503537811.
2016-09-20 16:27:03.242+0000 ERROR [o.n.b.v.r.i.ErrorReporter] Client triggered an unexpected error [UnknownError]: At /Users/thomaseffland/neo4j_data/import/reviews.csv:53127 -  there's a field starting with a quote and whereas it ends that quote there seems to be characters in that field after that ending quote. That isn't supported. This is what I read: 'It has definitely gone downhill. I am not sure if the old chef is still there, but clearly someone inside the kitchen does not know how to cook food.

The first dish I ordered was the fried prawns with fried sweet condense milk. The shrimp wasn't too bad, but the fried sweet condense milk was cold inside. I informed them that it was cold and they quickly reheated it. I thought it was nice of them to quickly try and rectify the situation. But it came out burnt. :-\


In [66]:
!grep -A 20 "It has definitely gone downhill. I am not sure if the old chef is still there, but clearly someone inside the kitchen does not know how to cook food." /Users/thomaseffland/neo4j_data/import/reviews.csv

1470196800,1474381824,1474381824,lgzOZ1Z8fHOx6WS9e2QV8A,1,-IyW3XwNWEbvpHzh5FXvOw,"It has definitely gone downhill. I am not sure if the old chef is still there, but clearly someone inside the kitchen does not know how to cook food.

The first dish I ordered was the fried prawns with fried sweet condense milk. The shrimp wasn't too bad, but the fried sweet condense milk was cold inside. I informed them that it was cold and they quickly reheated it. I thought it was nice of them to quickly try and rectify the situation. But it came out burnt. :-\

The second dish I had problems with was the Fish Maw Soup. What I disliked wasn't about the quality of food, but more of the service. I had a party of 14 people and requested portions large enough that can feed the entire group with possible refills. The waiter was confident about their large size and was very insistent that it was sufficent enough. When the soup came out and was distributed evenly into 14 bowls, it was half full. :-\

Th

In [26]:
'a'.encode('utf8')

'a'