In [None]:
import csv
import re
import time
import pdb
import sys
import dill as pkl
from collections import namedtuple, Counter
from operator import itemgetter
import psycopg2
from psycopg2.extras import NamedTupleConnection
import requests
from requests_oauthlib import OAuth1
from pprint import pprint

In [248]:
###
### TODO Comment Up!
###

## _This Notebook: Download Raw Data, Clean, Create "inspections" table, Pull Restaurant Urls, Build "restaurants", "categories", and "neighborhoods" Tables_

#--------------------------------------------------------------------------------------------

## Set Up Database

###Download Data

In [None]:
json_url = 'https://data.cityofnewyork.us/api/views/xx67-kt59/rows.json?accessType=DOWNLOAD'
csv_url = 'https://data.cityofnewyork.us/api/views/xx67-kt59/rows.csv?accessType=DOWNLOAD'

In [None]:
print csv_url

In [None]:
r = requests.get(csv_url)
len(r.content)

### Clean CSV

In [None]:
with open('raw_inspection_data.csv', 'w') as f:
    f.write(r.content)

In [None]:
class inspection_entry():
    
    ## TODO data structure to build violation code table
    
    no_filter = ['CAMIS',
     'ZIPCODE',
     'PHONE',
     'INSPECTION DATE',
     'ACTION',
     'VIOLATION CODE',
     'SCORE',
     'GRADE DATE',
     'RECORD DATE',
     'INSPECTION TYPE']
    
    action_map = {'No violations were recorded at the time of this inspection.':'No violations cited.',
     'Violations were cited in the following area(s).':'Violations cited',
     'Establishment Closed by DOHMH.  Violations were cited in the following area(s) and those requiring immediate action were addressed.':'Establishment Closed by DOHMH'}
    
    def __init__(self, entry_dict, null_val = 'NULL'):
        
        self.null = null_val
        self.values = self._process(entry_dict)
    
    def _encode_clean(self, raw_str):
        
        return raw_str.replace('Â', '') if raw_str.strip() else self.null

    def _phone_process(self, phone):
        
        r = re.compile('^[\d +-]+$')
        r_replace = re.compile('[ +-]')
        return r_replace.sub('', phone) if r.match(phone) else self.null
        
    def _grade_process(self, grade):
        
        r = re.compile('^[A-Z]$')
        return grade.strip() if r.match(grade.strip()) else self.null
    
    def _action_process(self, action):
        
        if not action:
            return self.null
        else:
            return inspection_entry.action_map.get(action,action)
    
    def _crit_process(self, crit_val):
        
        if not crit_val:
            return self.null
        elif crit_val == 'Critical':
            return 1
        else:
            return 0
        
    def _addr_process(self, building, street):
        
        r = re.compile('\s+')
        addr = ' '.join((building, street))
        return r.sub(' ', addr).strip()
        
    def _i_date_process(self, i_date):
    
        return i_date if i_date != '01/01/1900' else self.null
    
    def _process(self, entry_dict):
             
        na_fill = lambda v: v if v else self.null
        
        values = {}
        for field in inspection_entry.no_filter:
            values[field.replace(' ', '_')] = na_fill(entry_dict.get(field))
        
        values['DBA'] = self._encode_clean(entry_dict.get('DBA'))
        values['ADDRESS'] = self._addr_process(entry_dict.get('BUILDING'), entry_dict.get('STREET'))
        values['CRITIAL FLAG'.replace(' ', '_')] = self._crit_process(entry_dict.get('CRITICAL FLAG'))
        values['INSPECTION DATE'.replace(' ', '_')] = self._i_date_process(entry_dict.get('INSPECTION DATE'))
        values['ACTION'] = self._action_process(entry_dict.get('ACTION'))
        values['GRADE'] = self._grade_process(entry_dict.get('GRADE'))
        values['PHONE'] = self._phone_process(entry_dict.get('PHONE'))

        return values

In [None]:
st = time.clock()

with open('./raw_inspection_data.csv', 'rt') as f:
    dialect = csv.Sniffer().sniff(f.read(1024))
    f.seek(0)
    reader = csv.reader(f, dialect)
    header = reader.next()
    
    entries = []
    for row in reader:
        entry_dict =  dict(zip(header, row))
        entries.append(inspection_entry(entry_dict))
        
ft = time.clock()
print "{0:f} seconds".format(ft-st)

In [None]:
entries[0].values.keys()

In [None]:
st = time.clock()
fieldnames = ['CAMIS',
            'DBA',
            'ADDRESS',
            'ZIPCODE',
            'PHONE',
            'INSPECTION_TYPE',
            'INSPECTION_DATE',
            'ACTION',
            'SCORE',
            'GRADE',
            'GRADE_DATE',
            'VIOLATION_CODE',
            'CRITIAL_FLAG',
            'RECORD_DATE']

csv.register_dialect('pipes', delimiter='|', quotechar = '"', quoting = csv.QUOTE_MINIMAL)
with open('cleaned_inspection_data.csv', 'wt') as f:
    writer = csv.DictWriter(f, fieldnames, dialect='pipes')
    writer.writeheader()
    for entry in entries:
        writer.writerow(entry.values)
        
ft = time.clock()
print "{0:f} seconds".format(ft-st)

In [None]:
!head ./cleaned_inspection_data.csv

### Add "inspections" Table to Database

In [None]:
conn = psycopg2.connect("dbname=yelp")

In [None]:
c = conn.cursor()

In [None]:
c.execute("DROP TABLE IF EXISTS inspections")

In [None]:
c.execute('''CREATE TABLE inspections (
CAMIS varchar(10),
DBA varchar(255),
ADDRESS  varchar(100),
ZIPCODE varchar(5),
PHONE varchar(12),
INSPECTION_TYPE varchar(64),
INSPECTION_DATE date,
ACTION varchar(150),
SCORE smallint,
GRADE varchar(1),
GRADE_DATE date,
VIOLATION_CODE varchar(3),
CRITICAL varchar(1),
RECORD_DATE date
)'''
)

In [None]:
conn.commit()

In [None]:
copy_command = '''
COPY inspections 
FROM STDIN
WITH (
FORMAT CSV,
DELIMITER '|',
NULL 'NULL',
HEADER TRUE,
QUOTE '"'
);
'''

with open('./cleaned_inspection_data.csv', 'rt') as f:
    c.copy_expert(copy_command, f)

conn.commit()

In [None]:
conn.close()

#--------------------------------------------------------------------------------------------

## Yelp API

In [202]:
class yelp_api_response_parser():
    ''' Takes the json style response and extracts the desired restaurant informtion.
    '''
    
    def __init__(self, response_json):
        
        self.RestaurantInfo = namedtuple('RestaurantInfo', 'name, biz_id, address, snippet, categories, neighborhoods')
        self.json = response_json
        
        
    def parse(self):
        
        if self.json.get('total') == 0:
            return None
    
        biz_struct = self.json.get('businesses')[0]
        
        name = biz_struct.get('name')
        biz_id = biz_struct.get('id')
        address = biz_struct.get('location').get('address')
        address = address[0] if len(address) > 0 else None
        snippet = biz_struct.get('snippet_text')
    
        cat_list = biz_struct.get('categories', [])
        categories = map(itemgetter(0), cat_list)
        
        neighborhoods = biz_struct.get('location').get('neighborhoods', []) 
        
        ## TODO grab phone number, zipcode
        
        info_iterable = [name, biz_id, address, snippet, categories, neighborhoods]
        return self.RestaurantInfo._make(info_iterable)
    
    
####--------------------------------------------------------------------------------------------------------------####    
####--------------------------------------------------------------------------------------------------------------####
####--------------------------------------------------------------------------------------------------------------####


class yelp_api_interfacer():
    ''' Middle level class. Take a restaurant description (tuple) 
    and interfaces with the yelp api, then passes the response to
    a response parser.
    '''
    
    def __init__(self, by_phone = True, by_addr_0 = False, by_addr_1 = False):
        
        self.CONSUMER_KEY = 'kvO4vOdaI2jjLjA0eUEXiQ'
        self.CONSUMER_SECRET = 'H7na91jgidG_k3_iiHi_2qbkJMQ'
        self.TOKEN = '6WN41TOPmVeEgu-w0iM8gyyKhMM5rCu7'
        self.TOKEN_SECRET = 'erbNVdX7x6oG0ZCbsZVzfCZg7dg'
        self.SEARCH_ADDR_BASE_URL = 'http://api.yelp.com/v2/search'
        self.SEARCH_PHONE_BASE_URL = 'http://api.yelp.com/v2/phone_search'
        self.AUTH = OAuth1(self.CONSUMER_KEY, self.CONSUMER_SECRET,
                  self.TOKEN, self.TOKEN_SECRET)
        self.MAX_GET_ATTEMPTS = 3
        
        self.by_phone = by_phone
        self.by_addr_0 = by_addr_0
        self.by_addr_1 = by_addr_1
    
    def pull_restaurants(self, restaurants):
        restaurant_infos = []
        print "{0} Restaurants to pull.".format(len(restaurants))
        for i,r in enumerate(restaurants):
            
            verbose = (i % 250 == 0)
            if verbose:
                print "Pulling restaurant number {0}...".format(i)
                sys.stdout.flush()
                
            restaurant_infos.append(self._pull_restaurant(r))
            
            if verbose:
                print "Restaurant {0} completed.".format(i)
                sys.stdout.flush()
        
        print "Completion: {0} restaurants pulled.".format(len(restaurants))
        return restaurant_infos
    
    def _pull_restaurant(self, rest_tuple):
        
        result_dict = {}
        if self.by_phone:
            result_dict['by_phone'] = self._pull_restaurant_by_phone(rest_tuple)
        if self.by_addr_0:
            result_dict['by_addr_0'] = self._pull_restaurant_by_address(rest_tuple, sort = 0)
        if self.by_addr_1:
            result_dict['by_addr_1'] = self._pull_restaurant_by_address(rest_tuple, sort = 1)      

        return (rest_tuple, result_dict)
    
    
    def _pull_restaurant_by_address(self, rest_tuple, sort = 0):
        
        name = rest_tuple.dba
        addr = "{0}, {1}".format(rest_tuple.address, rest_tuple.zipcode)
        payload = {'term':name,
            'location':addr,
            'limit':1,
            'sort':sort
            }
        
        extract = self._fetch(rest_tuple, payload, phone = False)
        return extract
    
    
    
    def _pull_restaurant_by_phone(self, rest_tuple):
        
        phone = rest_tuple.phone
        if not phone:
            return None
        
        cc = 'US'
        payload = {'phone':phone,
            'cc':cc,
            'limit':1 
            }
        
        extract = self._fetch(rest_tuple, payload, phone = True)
        return extract
        

    def _fetch(self, rest_tuple, payload, phone):
        
        
        base_url = self.SEARCH_PHONE_BASE_URL if phone else self.SEARCH_ADDR_BASE_URL
        response = None
        for _ in xrange(self.MAX_GET_ATTEMPTS):
            
            try:
                response = requests.get(url = base_url, params = payload, auth = self.AUTH)
                break
            except requests.ConnectionError:
                self._report_connection_error(rest_tuple, payload)

        if response is None:
            return None
         
        try:
            return yelp_api_response_parser(response.json()).parse()
        
        except:
            self._report_parse_error(rest_tuple, response)
            response_json = response.json()
            if 'error' in response_json:
                return None
            elif 'businesses' in response_json and len(response_json.get('businesses')) > 0:
                return None
            else:
                raise
        
        
    def _report_parse_error(self, rest_tuple, response):
            print '************************'
            print 'Could not parse response.'
            print rest_tuple._asdict()
            print response.json()
            print '************************'        
        
    def _report_connection_error(self, rest_tuple, payload):
            print '************************'
            print "Connection Error encountered."
            print rest_tuple._asdict()
            print "\tUrl:"
            print self.SEARCH_ADDR_BASE_URL
            print "\tParams:"
            print payload
            print '************************'
        

####--------------------------------------------------------------------------------------------------------------####
####--------------------------------------------------------------------------------------------------------------####
####--------------------------------------------------------------------------------------------------------------####


class yelp_api_trawler():
    '''Interacts with the database and sends 
    (camis, dba, address, zipcode, phone) tuples to the yelp api handlers.
    '''
    
    def __init__(self, start_read = 0, **kwargs):
            
        self.current = start_read
        self.interface = yelp_api_interfacer(**kwargs)
        self.conn = None
        self.temp_table = 'trwlr_table'
    
    def seek(self, n):
        self.current = n
    
    
    def read_next_n(self, n):
        
        if not self.conn:
            self.open_conn()
           
        q = '''SELECT camis, dba, address, zipcode, phone
            FROM {temp_table}
            LIMIT {0} OFFSET {1};
        '''.format(n, self.current, temp_table = self.temp_table)
        
        self.c.execute(q)
        query_result_tuples = self.c.fetchall()
        extract_tuples = self.interface.pull_restaurants(query_result_tuples)
        self.current += n
        return extract_tuples
         
    def read_all(self):
        
        if not self.conn:
            self.open_conn()
        
        q = '''SELECT *
            FROM {temp_table};
            '''.format(self.current, temp_table = self.temp_table)
        self.c.execute(q)
        query_result_tuples = self.c.fetchall()
        
        extract_tuples = self.interface.pull_restaurants(query_result_tuples)
        self.current = 0
        return extract_tuples
    
    def open_conn(self):
        
        self.conn = psycopg2.connect("dbname=yelp", cursor_factory=psycopg2.extras.NamedTupleCursor)
        self.c = self.conn.cursor()
        q = '''CREATE TEMP TABLE {temp_table} AS (
            SELECT camis, dba, address, zipcode, phone
            FROM inspections
            GROUP BY camis, dba, address, zipcode, phone
            ORDER BY camis ASC
            );
            '''.format(temp_table = self.temp_table)
        self.c.execute(q)
    
    def close_conn(self):
        if self.conn is not None:
            self.conn.close()
        self.conn = None
        
    ## TODO: add __open__ and __close__ to use with thewith statement.

    

    

In [207]:
trwlr = yelp_api_trawler(start_read = 0, by_phone = True, by_addr_0 = False, by_addr_1= False)

In [208]:
st = time.clock()
start_record = 20000
n = 10000
trwlr.seek(start_record)
output = trwlr.read_next_n(n)

ft = time.clock()
'{0} seconds.'.format(ft-st)

5230 Restaurants to pull.
Pulling restaurant number 0...
Restaurant 0 completed.
************************
Could not parse response.
OrderedDict([('camis', '50007672'), ('dba', 'ITALIA 90'), ('address', '1011 ALLERTON AVENUE'), ('zipcode', '10469'), ('phone', '0000000000')])
{u'error': {u'text': u'One or more parameters are invalid in request', u'id': u'INVALID_PARAMETER', u'field': u'phone'}}
************************
Pulling restaurant number 250...
Restaurant 250 completed.
Pulling restaurant number 500...
Restaurant 500 completed.
Pulling restaurant number 750...
Restaurant 750 completed.
Pulling restaurant number 1000...
Restaurant 1000 completed.
Pulling restaurant number 1250...
Restaurant 1250 completed.
Pulling restaurant number 1500...
Restaurant 1500 completed.
Pulling restaurant number 1750...
Restaurant 1750 completed.
Pulling restaurant number 2000...
Restaurant 2000 completed.
************************
Could not parse response.
OrderedDict([('camis', '50015341'), ('dba', 'F

'35.196032 seconds.'

In [209]:
trwlr.close_conn()

In [210]:
def make_trwlr_output_pklable(output):
    
    def to_dicts_helper(result_tuple):
        pulled_dict = result_tuple[1]
        v = None
        if pulled_dict['by_phone'] is not None:
            v = pulled_dict['by_phone']._asdict()
            
        return (result_tuple[0]._asdict(), v)
    
    return map(to_dicts_helper, output)
    

In [211]:
pklable_output = make_trwlr_output_pklable(output)
with open('./{0}-{1}output.pkl'.format(start_record+1, start_record+n), 'wb') as f:
    pkl.dump(pklable_output, f)

In [212]:
print "Total restuarants processed: {0}".format(len(output))
print "Successfully paired: {0}".format(len(filter(lambda x: x[1]['by_phone'] is not None, output)))

Total restuarants processed: 5230
Successfully paired: 2805


###Retry Failed Phone Pull by Address Search

In [213]:
successes = map(itemgetter(0), filter(lambda x: x[1]['by_phone'] is not None, output))
failures = map(itemgetter(0), filter(lambda x: x[1]['by_phone'] is None, output))

In [214]:
print len(successes)
print len(failures)

2805
2425


In [215]:
pprint(map(lambda x: (x.dba), failures)[:50])

['CAMDEN FOOD COMPANY',
 'SUBWAY',
 'LA PARADA II RESTAURANT',
 'ALFREDO 100',
 'NURNBERGER',
 'BAR SAN MIGUEL',
 'OAXACA TAQUERIA',
 'ABDULLAH SWEETS AND RESTAURANT',
 "DECATUR'S STEAKHOUSE",
 'EL SABROZON',
 "Big E's Deli",
 'DIVINE BICKLES RESTAURANT.',
 'PHILIPPE NYC I',
 '241ST CAFE RESTAURANT',
 'MAISON KAYSER',
 'THIRSTY BAR',
 'BAR PRIMI',
 'TINA  RESTAURANT',
 '624 KAM HAI CHINESE FOOD',
 'Co Ba 53',
 'KENNEDY FRIED CHICKEN',
 "OLD STANLEY'S",
 'CASABLANCA BAR & GRILL',
 'STAR MOUNTAIN COFFEE SHOP',
 'MIN GOLDEN WOK',
 'MANDELA RESTAURANT',
 'NEW DRAGON',
 'NEW DRAGON SEA CHINESE RESTAURANT',
 'Dunkin Donuts',
 'CROWN FRIED CHICKEN AND FISH',
 'ELYNE RESTAURANT',
 'MEMORIES BAR SHEVROJA',
 'CAFE DE BROADWAY',
 'EAT @ SHERMAN CREEK',
 'MCDONALDS',
 'KHAO KANG',
 'NICEY NICE JAMAICAN RESTAURANT & LOUNGE',
 "KARY'S LOUNGE",
 'PEKING OISHI',
 'SUBWAY',
 'TRIPLE 8 RESTAURANT',
 'ASIAN KITCHEN',
 'LE BAOBAB-GOUYGUI RESTAURANT',
 'SUBWAY',
 'SUBWAY',
 'UNITED AIRLINES RED CARPET CLUB

In [216]:
yelp_interfacer = yelp_api_interfacer(by_phone = False, by_addr_0 = True, by_addr_1= True)

In [217]:
output_v2 = yelp_interfacer.pull_restaurants(failures)

2425 Restaurants to pull.
Pulling restaurant number 0...
Restaurant 0 completed.
Pulling restaurant number 250...
Restaurant 250 completed.
Pulling restaurant number 500...
Restaurant 500 completed.
Pulling restaurant number 750...
Restaurant 750 completed.
Pulling restaurant number 1000...
Restaurant 1000 completed.
Pulling restaurant number 1250...
Restaurant 1250 completed.
Pulling restaurant number 1500...
Restaurant 1500 completed.
Pulling restaurant number 1750...
Restaurant 1750 completed.
Pulling restaurant number 2000...
Restaurant 2000 completed.
Pulling restaurant number 2250...
Restaurant 2250 completed.
Completion: 2425 restaurants pulled.


In [218]:
def find_match(result_tuple):
    
    r = re.compile('\s+')
    
    record = result_tuple[0]
    address = record.address
    split_address = map(lambda a: a.strip(), address.split(' '))
    yelp_records = result_tuple[1]
    
    for k, yelp_record in yelp_records.items(): 
        
        if yelp_record is not None and yelp_record.address is not None:
            
            yel_addr = r.sub(' ', yelp_record.address).strip()
            split_yelp_addr = map(lambda a: a.strip(), yel_addr.split(' '))
            if len(split_yelp_addr) > 1 and len(split_address) > 1:
                if split_address[0] == split_yelp_addr[0] and \
                split_address[1].lower()[0] == split_yelp_addr[1].lower()[0]:
                    
                    return (record, yelp_record)

    return None

In [219]:
found = filter(None, map(find_match, output_v2))

In [220]:
pklable_found_p1 = map(lambda x: (x[0]._asdict(), x[1]._asdict()), found)
with open('./{0}-{1}found_second_pass.pkl'.format(start_record+1, start_record + n), 'wb') as f:
    pkl.dump(pklable_found_p1, f)

In [221]:
c = Counter(map(lambda x: (x.dba), failures))
c.most_common(30)

[(None, 143),
 ('SUBWAY', 60),
 ('DUNKIN DONUTS', 28),
 ('KENNEDY FRIED CHICKEN', 19),
 ('CROWN FRIED CHICKEN', 14),
 ('FIKA', 9),
 ('JUST SALAD', 9),
 ("MCDONALD'S", 7),
 ('Dunkin Donuts', 6),
 ('CHECKERS', 6),
 ('STARBUCKS COFFEE', 6),
 ('SUBWAY RESTAURANT', 4),
 ('CAFFE BENE', 4),
 ('Subway', 4),
 ('BURGER KING', 3),
 ('MCDONALDS', 3),
 ('HEALTHY BITE', 3),
 ('STARBUCKS', 3),
 ('KENNEDY FRIED CHICKEN & PIZZA', 3),
 ("WENDY'S", 3),
 ('JUICE GENERATION', 3),
 ('Kennedy Fried Chicken', 3),
 ('LITTLE CAESARS', 3),
 ('CRUMBS BAKE SHOP', 3),
 ('AMPLE HILLS CREAMERY', 2),
 ('EMPANADAS MONUMENTAL', 2),
 ("MIGHTY QUINN'S BBQ", 2),
 ('WALDORF ASTORIA NYC', 2),
 ('15 FLAVORS', 2),
 ('MANGO MANGO', 2)]

In [222]:
c_retry = Counter(map(lambda x: x[0].dba, found))
c_retry.most_common(30)

[('SUBWAY', 13),
 ('JUST SALAD', 8),
 ('FIKA', 8),
 ('DUNKIN DONUTS', 8),
 (None, 6),
 ("MCDONALD'S", 4),
 ('CROWN FRIED CHICKEN', 3),
 ("WENDY'S", 3),
 ('CAFFE BENE', 3),
 ('MCDONALDS', 3),
 ('SUBWAY RESTAURANT', 3),
 ('CRUMBS BAKE SHOP', 3),
 ('WALDORF ASTORIA NYC', 2),
 ('FILICORI ZECCHINI', 2),
 ('RED MANGO', 2),
 ('GONG CHA', 2),
 ("MIGHTY QUINN'S BBQ", 2),
 ('VIVE LA CREPE', 2),
 ('MADMAN ESPRESSO', 2),
 ('SHAKE SHACK', 2),
 ('CHECKERS', 2),
 ('FORT GRACE', 2),
 ('THE JUICE SHOP', 2),
 ('EQUINOX', 2),
 ('STARBUCKS COFFEE', 2),
 ('BRAVO AFRICAN RESTAURANT', 1),
 ('C. LO CAFE', 1),
 ("OLD STANLEY'S", 1),
 ('GRAND ST PIZZA', 1),
 ('vapor lounge', 1)]

In [223]:
c.subtract(c_retry)

In [224]:
c.most_common(30)

[(None, 137),
 ('SUBWAY', 47),
 ('DUNKIN DONUTS', 20),
 ('KENNEDY FRIED CHICKEN', 18),
 ('CROWN FRIED CHICKEN', 11),
 ('Dunkin Donuts', 6),
 ('CHECKERS', 4),
 ('STARBUCKS COFFEE', 4),
 ('BURGER KING', 3),
 ('HEALTHY BITE', 3),
 ('KENNEDY FRIED CHICKEN & PIZZA', 3),
 ("MCDONALD'S", 3),
 ('Kennedy Fried Chicken', 3),
 ('Subway', 3),
 ('EMPANADAS MONUMENTAL', 2),
 ('PEKING OISHI', 2),
 ('MAPLE', 2),
 ('LUCKY STAR KITCHEN', 2),
 ('STARBUCKS', 2),
 ('THINK COFFEE', 2),
 ('CROWN FRIED CHICKEN & PIZZA', 2),
 ('VIDA SALUDABLE', 2),
 ('POPEYES', 2),
 ('VILLA ITALIAN KITCHEN', 2),
 ('TACOS NEZA', 2),
 ('JUICE GENERATION', 2),
 ('SIPS & BITES', 2),
 ('BROADWAY BAKERY', 2),
 ('GOOD TASTE', 2),
 ('RESTAURANT ASSOCIATES LLC', 2)]

#--------------------------------------------------------------------------------------------

## Build Restaurants Tables

In [225]:
# TODO make more systematic
output_p1_paths = ['./1-10000output.pkl', './10001-20000output.pkl', './20001-30000output.pkl']
output_p2_paths = ['./1-10000found_second_pass.pkl', './10001-20000found_second_pass.pkl', './20001-30000found_second_pass.pkl']

outputs_p1 = []
outputs_p2 = []
for path in output_p1_paths:
    with open(path, 'rb') as f:
        outputs_p1.extend(pkl.load(f))
        
for path in output_p2_paths:
    with open(path, 'rb') as f:
        outputs_p2.extend(pkl.load(f))
        

In [226]:
all_successes = []
all_successes.extend(filter(lambda x: x[1] is not None, outputs_p1))
all_successes.extend(outputs_p2)
print len(all_successes)

20217


In [228]:
c = Counter(map(lambda x: x[0].get('camis'), all_successes))
print c.most_common(5)
# No duplicates? :-)

[('50018608', 1), ('50001460', 1), ('41543725', 1), ('41543722', 1), ('50006252', 1)]


In [None]:
# restaurants
## (yelp_id, camis, yelp_name, address, zipcode)
# neighborhoods
## (yelp_id, neighborhood)
# categories
## (yeld_id, category)

## TableBuilder Superclass

In [235]:
class TableBuilder:
    
    def __init__(self):

        self.conn = None
        self.c = None
    
    
    def _open_conn(self):
        
        self.conn = psycopg2.connect("dbname=yelp", cursor_factory=psycopg2.extras.NamedTupleCursor)
        self.c = self.conn.cursor()

        
    def _close_conn(self):
        
        if self.conn is not None:
            self.conn.close()
        self.conn = None
        
        
    def build_table(self, matched_records = None):
        
        try:
            self._open_conn()
            self._create_table()
            if matched_records is not None:
                self._add_records(matched_records)
        
        finally:
            self._close_conn()
    
    
    def add_records(self, matched_records):
        
        try:
            self._open_conn()
            self._add_records(matched_records)
        
        finally:
            self._close_conn()
    
    def _psql_safe_string_format(self, text):
        
        text = text.replace("'", "''") if text is not None else 'NULL'
        text = unicode(text)
        return text
        
        return 
    
    def _create_table(self):
        raise NotImplementedError
        
    def _add_records(self, matched_records):
        raise NotImplementedError       
        

## Restaurants Table

In [242]:
class RestaurantsTableBuilder(TableBuilder):
    
    def __init__(self):
        TableBuilder.__init__(self)
    
    def _create_table(self):
        
        # restaurants
        ## (yelp_id, camis, yelp_name, yelp_address, zipcode)
        self.c.execute("DROP TABLE IF EXISTS restaurants")
        q = '''
        CREATE TABLE restaurants (
        camis varchar(8),
        yelp_id varchar(80),
        yelp_name varchar(70),
        yelp_address varchar(60),
        zipcode varchar(5)
        )
        '''
        self.c.execute(q)
        self.conn.commit()
        
    def _add_records(self, matched_records):
        
        
        q_template = u'''INSERT INTO restaurants 
        (camis, yelp_id, yelp_name, yelp_address, zipcode)
        VALUES (
        {camis},
        '{yelp_id}',
        '{yelp_name}',
        '{yelp_address}',
        {zipcode}
        );
        '''
               
        for record in matched_records:
            input_dict = self._extract_record_to_input_dict(record)
            q = q_template.format(**input_dict)

            self.c.execute(q)
            self.conn.commit()
            
    def _extract_record_to_input_dict(self, record):  
        
        extract = {}

        extract['camis'] = record[0].get('camis')
        extract['yelp_id'] = unicode(record[1].get('biz_id'))
        
        name = self._psql_safe_string_format(record[1].get('name'))
        addr = self._psql_safe_string_format(record[1].get('address'))
        
        extract['yelp_name'] = name
        extract['yelp_address'] = addr
        
        zipcode = record[0].get('zipcode')
        zipcode = zipcode if re.match('\d{5}', zipcode) else 'NULL'
        extract['zipcode'] = zipcode
        
        # TODO add *yelp*_phone, yelp_zip
        
        return extract  


In [243]:
rtb = RestaurantsTableBuilder()
rtb.build_table()
rtb.add_records(all_successes)

## Categories Table

In [244]:
class CategoriesTableBuilder(TableBuilder):

    def __init__(self):
        TableBuilder.__init__(self)
    
    def _create_table(self):
        
        # categories
        ## (yeld_id, yelp_category)
        self.c.execute("DROP TABLE IF EXISTS categories")
        q = '''
        CREATE TABLE categories (
        yelp_id varchar(80),
        yelp_category varchar(35)
        )
        '''
        self.c.execute(q)
        self.conn.commit()
        
    def _add_records(self, matched_records):
        
        q_template = u'''INSERT INTO categories 
        (yelp_id, yelp_category)
        VALUES (
        '{yelp_id}',
        '{yelp_category}'
        );
        '''
        
        for record in matched_records:
            
            input_dicts = self._extract_record_to_input_dicts(record)
            for d in input_dicts:
                q = q_template.format(**d)
                self.c.execute(q)

        self.conn.commit()
            
    def _extract_record_to_input_dicts(self, record):  
        
        extracts = []
        yelp_id = self._psql_safe_string_format(record[1].get('biz_id'))
        for category in record[1].get('categories'):
            
            extract = {}
            extract['yelp_id'] = yelp_id
            extract['yelp_category'] = self._psql_safe_string_format(category)
            
            extracts.append(extract)
        
        return extracts


In [245]:
ctb = CategoriesTableBuilder()
ctb.build_table()
ctb.add_records(all_successes)

##Neighborhoods Table

In [246]:
class NeighborhoodsTableBuilder(TableBuilder):

    def __init__(self):
        TableBuilder.__init__(self)
    
    def _create_table(self):
        
        # neighborhoods
        ## (yelp_id, neighborhood)
        self.c.execute("DROP TABLE IF EXISTS neighborhoods")
        q = '''
        CREATE TABLE neighborhoods (
        yelp_id varchar(80),
        yelp_neighborhood varchar(40)
        )
        '''
        self.c.execute(q)
        self.conn.commit()
        
    def _add_records(self, matched_records):
        
        q_template = u'''INSERT INTO neighborhoods 
        (yelp_id, yelp_neighborhood)
        VALUES (
        '{yelp_id}',
        '{yelp_neighborhood}'
        );
        '''
        
        for record in matched_records:
            
            input_dicts = self._extract_record_to_input_dicts(record)
            for d in input_dicts:
                q = q_template.format(**d)
                self.c.execute(q)

        self.conn.commit()
            
    def _extract_record_to_input_dicts(self, record):  
        
        extracts = []
        yelp_id = self._psql_safe_string_format(record[1].get('biz_id'))
        
        # SHOULD BE ABLE TO REMOVE 'IS NONE' CONDITION' AFTER REPULL FROM YELP
        neighborhoods = record[1].get('neighborhoods') if record[1].get('neighborhoods') is not None else []
        ####
        for neighborhood in neighborhoods:
            
            extract = {}
            extract['yelp_id'] = yelp_id
            extract['yelp_neighborhood'] = self._psql_safe_string_format(neighborhood)
            
            extracts.append(extract)
        
        return extracts


In [247]:
ntb = NeighborhoodsTableBuilder()
ntb.build_table()
ntb.add_records(all_successes)