# Creates a Yelp POSTGRESSQL database
 - Uses the open source Yelp dataset and inserts json files into SQL tables

## Table of contents 
- importing json into python using pandas tools
- importing json to postgresql database

### Storing JSON in PostgreSQL
- upon further research, it looks like we can store JSON directly in PostgreSQL without the traditional field format
    - [Storing JSON in PostgreSQL: A must-know feature] (https://www.blendo.co/blog/storing-json-in-postgresql/)
    - [Replacing EAV with JSONB in PostgreSQL*](https://coussej.github.io/2016/01/14/Replacing-EAV-with-JSONB-in-PostgreSQL/)

    *EAV = Entity,Attribute,Value (ie three tables connected with joins to get fields for the entity)


- find connection information 

>michaelkranz$ `psql`

>michaelkranz=# `CREATE DATABASE restaurants`

>michaelkranz=# `\c restaurants`

>restaurants=# `\conninfo`

> You are now connected to database "restaurants" as user "michaelkranz".
restaurants=# \conninfo
You are connected to database "restaurants" as user "michaelkranz" via socket in "/tmp" at port "5432".

- [CHAR and VARCHAR : no performance differences and character limits used to check and will return error if longer](https://www.postgresqltutorial.com/postgresql-char-varchar-text/)

- https://www.postgresql.org/docs/12/datatype-json.html

## TODO
- write sample queries extracting from json to ensure it works
- import every Yelp JSON
- refactor code 

In [3]:
import json 
import pandas as pd 
from re import sub

from sqlalchemy import create_engine,text
from sqlalchemy.engine.url import URL 
import psycopg2 #postgressql driver

from tqdm.notebook import tqdm

In [47]:
def get_json_records(json_file_object,to_df=True,x_lines_to_read=-1):
    '''
    read all lines (ie records) or specified number of lines from json file
    and return these lines or convert to dataframe (default)
    '''
    with json_file_object:
        if x_lines_to_read is -1:
            json_list = json_file_object.readlines()
        else:
            json_list = [json_file_object.readline() for i in range(x_lines_to_read)]

        if to_df:
            json_data = [json.loads(json_line) for json_line in json_list] 
            json_df = pd.DataFrame(json_data)
            return json_df
        else:
            return json_list

def get_json_record(json_file_object,to_series=True):

    '''read a single json line (ie record) and return as 
    the json string (default) or convert to series if specified
    '''

    json_str = json_file_object.readline()
    if to_series:
        return pd.Series(eval(json_str))
    else:
        return json_str

#format the columns/fields that will be jsonb
def stringify_json_records(series):
    '''
    converts a series of dicts into a stringified 
    set of json records (which is necessary format for json storage)
    for example single quotes are replaced by double quotes
    and the dict is converted into a single quote string.

    Note: intended for use after json file converted to df and
    for nested json files
    '''
    if series.apply(lambda x: x is None or type(x) is dict).all():
       json_string_records = (
           series
           .apply(json.dumps)
           .apply(_restringify_nested_json_records) 
       )
    else:
        print("the series {} are not all dict records".format(series.name))
        json_string_records = series
    return json_string_records

def _restringify_nested_json_records(stringified_record):
    '''
    yelp open dataset has a nested (depth 2) json that is a string (invalid) and has single quotes
    rather than double quotes and should not have . need to change to be considered valid 
    json and inserted into postgressql table.
    '''
    #TODO: clean up with re.sub or replace booleans and nulls with proper json syntax (e.g., False is false and None is null)
    corrected_record = (
        stringified_record
        .replace('\"{','{') #nested json object should not have any quotes
        .replace('}\"','}')
        .replace('\'','\"') #change single to double quotes
        .replace('False,','\"False\",') #stringify booleans if not already
        .replace('True,','\"True\",')
        .replace('False}','\"False\"}') #end of dict boolean
        .replace('True}','\"True\"}')
        .replace('\"u\"','\"') #some records had double double quotes (ie "u"no"")
        .replace('\"\"','\"')
        .replace('None,','"None",') #stringify nones
        .replace('None}','"None"}')
    )
    return corrected_record


#functions to format for SQL-ready strings

def convert_to_sql_str(obj):
    '''
    if string : escape single quotes and/or make SQL quotes ($$), if not string : convert
    to string
    (see https://www.postgresqltutorial.com/dollar-quoted-string-constants/#:~:text=In%20PostgreSQL%2C%20you%20use%20single,doubling%20up%20the%20single%20quote.)
    '''
    if type(obj) is str:
        #if $ at end of string, will have have invalid triple $$$
        #% is a bind param variable
        obj_special_chars = obj.replace("$","$ ").replace("%","%%")
        obj_str = f"$${obj_special_chars}$$"
    else:
        obj_str = str(obj)
    return obj_str

def join_sql_str_values(sql_str_list):
    '''
    make valid values (ie record) for INSERT statement

    sql_str_list : iterable of strings 
    '''
    #convert every element to valid sql statements
    #join fields with ',' and wrap in '()' to 
    sql_str_values = "({})".format(",".join(sql_str_list))
    return sql_str_values 

## functions to create a sql table

def _make_sql_field_str(field_dict,sep=' '):
    '''turns a field name (key) and datatypes (value)
    into SQL statement. Note- the default space 
    is for CREATE TABLE fxn.
    '''
    formatted_list = [key + sep + val for key,val in field_dict.items()]
    formatted_str = '\n,'.join(formatted_list)
    return formatted_str

def make_create_sql_table_queries(table_name,field_dict):
    '''
    creates a string query that makes a table given a table name and set of variables with
    datatypes. 
    then creates a string query for the list of table, column, and datatype dataframe.

    table_name : str
    field_dict : dict of field names (key) and datatypes(value)
    '''
    #create sql statement strings
    drop_table_str = f"DROP TABLE IF EXISTS {table_name};"

    field_str = _make_sql_field_str(field_dict)
    create_table_str = f'''
    CREATE TABLE {table_name} (
            {field_str}
        );
        '''

    return (drop_table_str,create_table_str)


def execute_sql_queries(query_list,conn):
    '''
    executes list of string queries (drop,create table)
    and returns a df of field descriptions for table created

    returns result set of last query
    '''
    for query in query_list:
        result_set = conn.execute(query)
    return result_set

def get_sql_table_info(table_name,conn):
    '''
    gets info on a table's fields
    '''

    table_desc_str = f'''
    SELECT 
        table_name
        ,column_name
        ,data_type
    FROM 
        information_schema.columns
    WHERE
        table_name='{table_name}'
        '''
    table_info = pd.read_sql(sql=table_desc_str,con=conn)

    return table_info

#inserting sql into table
def make_insert_sql_query(sql_str_values,table_name):
    '''
    insert a SQL string-formatted record (ie one SQL string) 
    or a list of SQL records (list of SQL strings)
    
     into a SQL table
    '''
    insert_str = '''
    INSERT INTO {}
    VALUES {};
    '''
    if type(sql_str_values) is str:
        sql_query = insert_str.format(table_name,sql_str_values)
    elif type(sql_str_values) is list:
        sql_str = "\n,".join(sql_str_values)
        sql_query= insert_str.format(table_name,sql_str)
    else:
        sql_query=None
    return sql_query


In [12]:
review_fields = {
    "review_id":"VARCHAR PRIMARY KEY",
    "user_id":"VARCHAR",
    "business_id":"VARCHAR",
    'stars': "DECIMAL(2)",
    'useful': "SMALLINT",
    'funny': "SMALLINT",
    'cool': "SMALLINT",
    'text': "VARCHAR",
    'date': "TIMESTAMP"
}

business_info_fields = {
    'business_id':'VARCHAR PRIMARY KEY',
    'name':'VARCHAR',
    'address':'VARCHAR',
    'city':'VARCHAR',
    'state':'VARCHAR',
    'postal_code':'VARCHAR',
    'latitude':'FLOAT',
    'longitude':'FLOAT',
    'stars':'FLOAT(1)',
    'review_count':'INTEGER',
    'is_open':'SMALLINT',
    'attributes':'JSONB',
    'categories':'VARCHAR',
    'hours':'JSONB'
}

In [13]:
yelp_data_path = '/Users/michaelkranz/Documents/restaurant-app/data/yelp_dataset/'

yelp_json_filenames = {
    "tips":'yelp_academic_dataset_tip.json',
    "reviews":'yelp_academic_dataset_review.json',
    "business_info":'yelp_academic_dataset_business.json',
    "user":'yelp_academic_dataset_user.json'
    }

In [14]:
postgres_db_params = {'drivername': 'postgres',
                'database':'restaurants',
               'username': 'michaelkranz',
               'password': 'helloworld',
               'host': 'localhost',
               'port': 5432}

postgres_db_url = URL(**postgres_db_params)

In [15]:
engine = create_engine(postgres_db_url)

In [16]:
conn = engine.connect()

In [20]:
business_info_create_table_queries = make_create_sql_table_queries("business_info",   business_info_fields)
with engine.connect() as conn:
    execute_sql_queries(business_info_create_table_queries,conn=conn)
    table_info = get_sql_table_info("business_info",conn=conn)

In [21]:
table_info

Unnamed: 0,table_name,column_name,data_type
0,business_info,business_id,character varying
1,business_info,name,character varying
2,business_info,address,character varying
3,business_info,city,character varying
4,business_info,state,character varying
5,business_info,postal_code,character varying
6,business_info,latitude,double precision
7,business_info,longitude,double precision
8,business_info,stars,real
9,business_info,review_count,integer


In [17]:
reviews_create_table_queries = make_create_sql_table_queries("reviews",reviews_fields)
with engine.connect() as conn:
    execute_sql_queries(reviewscreate_table_queries,conn=conn)
    table_info = get_sql_table_info("reviews",conn=conn)

('\n    CREATE TABLE reviews (\n            review_id VARCHAR PRIMARY KEY\n,user_id VARCHAR\n,business_id VARCHAR\n,stars DECIMAL(2)\n,useful SMALLINT\n,funny SMALLINT\n,cool SMALLINT\n,text VARCHAR\n,date TIMESTAMP\n        );\n        ',
   table_name  column_name                    data_type
 0    reviews    review_id            character varying
 1    reviews      user_id            character varying
 2    reviews  business_id            character varying
 3    reviews        stars                      numeric
 4    reviews       useful                     smallint
 5    reviews        funny                     smallint
 6    reviews         cool                     smallint
 7    reviews         text            character varying
 8    reviews         date  timestamp without time zone)

In [11]:
file_line_count = count_file_lines(yelp_data_path+yelp_json_filenames['reviews'])

In [14]:
#TODO: parallelize and use get_json_records instead of one line
json_line_nbr = 0
with open(yelp_data_path+yelp_json_filenames['reviews']) as json_file:
    with engine.connect() as conn:
        for json_line in tqdm(json_file,total=file_line_count):
            json_line_nbr+=1
            json_series = pd.Series(eval(json_line))
            sql_str_series = json_series.apply(convert_to_sql_str)
            sql_str = combine_sql_str_values(sql_str_series)
            sql_return_str = insert_sql_record(sql_str,"reviews")

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=8021122.0), HTML(value='')))

at 6183519



In [22]:
with open(yelp_data_path+yelp_json_filenames['business_info']) as json_file:
    json_df = get_json_records(json_file)
#with engine.connect() as conn:

In [24]:
#upload json to postgresSQL table
json_df.head(1)

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,attributes,categories,hours
0,f9NumwFMBDn751xgFiRbNA,The Range At Lake Norman,10913 Bailey Rd,Cornelius,NC,28031,35.462724,-80.852612,3.5,36,1,"{'BusinessAcceptsCreditCards': 'True', 'BikePa...","Active Life, Gun/Rifle Ranges, Guns & Ammo, Sh...","{'Monday': '10:0-18:0', 'Tuesday': '11:0-20:0'..."


In [25]:
json_df['attributes'] = stringify_json_records(json_df['attributes'])
json_df['hours'] = stringify_json_records(json_df['hours'])

In [39]:
json_str_list = (
    json_df
    .fillna('null') #null is syntax for SQL NAs
    .applymap(convert_to_sql_str)
    .apply(combine_sql_str_values,axis=1)
    .values
    .tolist()
)

In [48]:
business_info_insert_vals_query = make_insert_sql_query(json_str_list,"business_info")

In [49]:
print(business_info_insert_vals_query[0:1000])


    INSERT INTO business_info
    VALUES ($$f9NumwFMBDn751xgFiRbNA$$,$$The Range At Lake Norman$$,$$10913 Bailey Rd$$,$$Cornelius$$,$$NC$$,$$28031$$,35.4627242,-80.8526119,3.5,36,1,$${"BusinessAcceptsCreditCards": "True", "BikeParking": "True", "GoodForKids": "False", "BusinessParking": {"garage": "False", "street": "False", "validated": "False", "lot": "True", "valet": "False"}, "ByAppointmentOnly": "False", "RestaurantsPriceRange2": "3"}$$,$$Active Life, Gun/Rifle Ranges, Guns & Ammo, Shopping$$,$${"Monday": "10:0-18:0", "Tuesday": "11:0-20:0", "Wednesday": "10:0-18:0", "Thursday": "11:0-20:0", "Friday": "11:0-20:0", "Saturday": "11:0-20:0", "Sunday": "13:0-18:0"}$$)
,($$Yzvjg0SayhoZgCljUJRF9Q$$,$$Carlos Santo, NMD$$,$$8880 E Via Linda, Ste 107$$,$$Scottsdale$$,$$AZ$$,$$85258$$,33.5694041,-111.8902637,5.0,4,1,$${"GoodForKids": "True", "ByAppointmentOnly": "True"}$$,$$Health & Medical, Fitness & Instruction, Yoga, Active Life, Pilates$$,$$null$$)
,($$XNoUzKckATkOD1hP6vghZg$$,$$Felinu

In [50]:
with engine.connect() as conn:
    conn.execute(business_info_insert_vals_query)

### Sample queries
- [Querying Json fields](https://www.postgresqltutorial.com/postgresql-json/)

In [51]:
#return BikeParking attribute
(
pd.read_sql(con=engine.connect(),sql='''
SELECT attributes -> 'BikeParking' as BikeParking
FROM business_info
'''
)
.head()
.values
)

array([['True'],
       [None],
       [None],
       ['True'],
       [None]], dtype=object)

In [52]:
#return BikeParking attribute
(
pd.read_sql(con=engine.connect(),sql='''
SELECT attributes ->> 'BikeParking' as BikeParking
FROM business_info
'''
)
.head()
)

Unnamed: 0,bikeparking
0,True
1,
2,
3,True
4,


In [53]:
pd.read_sql(con=engine.connect(),sql='''
SELECT
    name
    ,address
    ,attributes ->> 'BikeParking' AS BikeParking
FROM business_info
WHERE attributes ->> 'BikeParking' = 'True' 
'''
)

Unnamed: 0,name,address,bikeparking
0,The Range At Lake Norman,10913 Bailey Rd,True
1,Nevada House of Hose,1015 Sharp Cir,True
2,Green World Cleaners,"6870 S Rainbow Blvd, Ste 117",True
3,Chocolate Shoppe Ice Cream,2831 Parmenter St,True
4,Manolo Blahnik,3131 Las Vegas Blvd,True
...,...,...,...
69804,Julep,829 E Washington Ave,True
69805,Domino's,"6420 Rea Rd, Suite B1",True
69806,Galaxy Cafe,"835 Seven Hills Dr, Ste 190",True
69807,Edible Arrangements,"6610 E Baseline Rd, Ste 108",True


In [8]:
pd.read_sql(con=engine.connect(),sql='''
SELECT COUNT(*) FROM reviews
'''
)

Unnamed: 0,count
0,4593349


In [None]:
#join business info to reviews and get restaurants in a city