In [37]:
# Import dependencies
import pandas as pd
import json
import os
from datetime import datetime

# Import SQL Alchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.automap import automap_base
Base = declarative_base()

# Import modules to declare columns and column data types
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Date, func, ForeignKey, create_engine
from sqlalchemy.orm import Session, relationship, session, column_property
from sqlalchemy.exc import SQLAlchemyError

import psycopg2

DATA_SRC_RESTAURANTS = 1 

### Functions

In [38]:
# Database connection
def get_dbconnection():
    return psycopg2.connect(
        user = "postgres",
        password = "postgres",
        host = "localhost",
        port = "5432",
        database = "ETLproject"
    )    

# Read restaurant data from json file
# Fill in state_id FK from loaded states
def read_restaurants(file, states_id):

    with open(file, "r") as read_file:
        restaurants = json.load(read_file)["businesses"]

    # Hash of categories
    categories = {}

    # List of restaurants
    rest_list = []

    # List of restaurant categories
    rest_cat = []
    
    for restaurant in restaurants:
        id = restaurant["id"]
        name = restaurant["name"]

        # location
        location = restaurant["location"]
        if(not location['state'] in states_id):
            continue
        state_id = states_id[location['state']]
        
        # All address entries into street
        street = location["address1"]
        street2 = location["address2"]
        street3 = location["address3"]
        if(street2 and not street2.isspace()):
            street += " " + street2 
        if(street3 and not street3.isspace()):
            street += " " + street3 

        # operational info
        for c in restaurant["categories"]:
            # Object to list of categories 
            alias =  c['alias'].strip().lower() # alias is used as key
            rest_cat.append((id, CategoryCls(alias = c['alias'], title = c['title'] )))
            categories[c['alias']] = c['title']

        if(restaurant.get("price")):
            price = restaurant["price"].count('$') 
        else:
            price = None # some entries miss price

        restaurantObj = RestaurantCls(
            name = restaurant["name"],
            business_id = restaurant["id"],
            state_id = state_id, # value read from database
            city = location["city"],
            street = street,
            zip_code = location["zip_code"],
            price_range = price,
            rating = float(restaurant["rating"]),
            is_closed = bool(restaurant["is_closed"])
        )
        rest_list.append(restaurantObj)
    
    return rest_list, rest_cat, categories

# Get state_id form database
def get_states_id():
    # Get states keys
    engine = create_engine('postgresql+psycopg2://postgres:postgres@localhost/ETLproject')

    # Reflect an existing database into a new model
    Base = automap_base()

    # reflect the tables
    Base.prepare(engine, reflect=True)
    #Base.classes.keys()

    # Assign the state class to a variable
    StatesList = Base.classes.state
    # Create a session
    statsession = Session(engine)

    states_id = {}

    for row in statsession.query(StatesList, StatesList.id, StatesList.name_a2).all():
        states_id[row[2]] = row[1]

    return states_id


### Classes

In [39]:
class RestaurantCls(Base):
    __tablename__ = 'Restaurant'
    id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False)
    business_id = Column(String(255), unique=True, nullable=False)
    # location
    state_id = Column(Integer, ForeignKey('state.id'), nullable=False)
    city = Column(String(100), nullable=False)
    street = Column(String(250), nullable=False)
    zip_code = Column(String(10), nullable=False)
    # Operational info
    price_range = Column(Integer, nullable=False)
    rating = Column(Float, nullable=False)
    is_closed = Column(Boolean, nullable=False)
    source_id = Column(Integer, nullable=False)
    modified_date = Column(Date, nullable=False, default=func.now(), onupdate=datetime.now())

class CategoryCls(Base):
    __tablename__ = 'category'
    id = Column(Integer, primary_key=True)
    alias = Column(String(255), nullable=False)
    title = Column(String(255), nullable=False)
    source_id = Column(Integer, nullable=False)
    modified_date = Column(Date, default=func.now())    

### Load data from file into objects for further processing
* Use direct data reader, not Pandas df to get values
* Insert data into categories, restaurants, after that to restaurant_categories

In [40]:
states_id = get_states_id()
restaurant_file = os.path.join("..", "Data", "YelpData.txt")
restaurants, restaurant_categories, categories = read_restaurants(restaurant_file, states_id)
len(restaurants)

50

#### Cagories
* Insert new

In [41]:
connection = get_dbconnection()
cursor = connection.cursor()

try:
    for alias, title in categories.items():
        cursor.execute("INSERT INTO category (alias, title, source_id) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", 
                       (alias, title, DATA_SRC_RESTAURANTS))
     # commit the changes to the database
    connection.commit()
    cursor.close()
except psycopg2.DatabaseError as e:
    print(e)
finally:
    connection.rollback()
    if connection is not None:
        connection.close()    
    print("Categories import finished.")

Categories import finished.


#### Restaurants
* Insert new
* Update exising - TODO

In [42]:
connection = get_dbconnection()
cursor = connection.cursor()

try:
    for r in restaurants:
        cursor.execute("INSERT INTO restaurant (name, business_id, state_id, city, street, zip_code, price_range, rating, is_closed, source_id) \
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", 
                       (r.name, r.business_id, r.state_id, r.city, r.street, r.zip_code, r.price_range, r.rating, r.is_closed, DATA_SRC_RESTAURANTS))
     # commit the changes to the database
    connection.commit()
    cursor.close()
except psycopg2.DatabaseError as e:
    print(e)
finally:
    connection.rollback()
    if connection is not None:
        connection.close()    
    print("Restaurants import finished.")


Restaurants import finished.


#### Restaurant Categories
* Run only after categories and restaurants are loaded
* Add new categories
* Delete unused categories - TODO

In [46]:
# Prepare data
restaturant_category_df = pd.DataFrame(map(lambda r: [r[0], r[1].alias], restaurant_categories))
restaturant_category_df.columns = ["business_id", "alias"]

connection = get_dbconnection()
cursor = connection.cursor()

try:
    restaurant_df = pd.read_sql('SELECT id AS cid, business_id FROM restaurant', con=connection)
    category_df = pd.read_sql('SELECT id AS rid, alias FROM category', con=connection)

    map_df = restaturant_category_df.merge(restaurant_df, on="business_id")
    map_df = map_df.merge(category_df, on="alias")
    
except psycopg2.DatabaseError as e:
    print(e)
finally:
    connection.rollback()
    if connection is not None:
        connection.close()    
    print("Restaurant category import finished.")
    
map_df.head()


Restaurant category import finished.


Unnamed: 0,business_id,alias,cid,rid
0,wGl_DyNxSv8KUtYgiuLhmA,icecream,1,48
1,76smcUUGRvq3k1MVPUXbnA,icecream,6,48
2,47OC_X6KkiDDQ4jwoCUjFg,icecream,29,48
3,wGl_DyNxSv8KUtYgiuLhmA,bakeries,1,49
4,ri7UUYmx21AgSpRsf4-9QA,bakeries,4,49


In [49]:
# Load data
connection = get_dbconnection()
cursor = connection.cursor()

try:
    for index, row in map_df.iterrows():
        # modified_data column value is automatically filled by DB engine with default value now() 
        cursor.execute("INSERT INTO restaurant_category (restaurant_id, category_id, source_id) \
                        VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", 
                        (row['rid'], row['cid'], DATA_SRC_RESTAURANTS))
    connection.commit()
    cursor.close()
except psycopg2.DatabaseError as e:
    print(e)
    connection.rollback()
finally:
    if connection is not None:
        connection.close()    
    print("Restaurant-Category import finished.")

insert or update on table "restaurant_category" violates foreign key constraint "restaurant_category_fk2"
DETAIL:  Key (category_id)=(1) is not present in table "category".

Restaurant-Category import finished.
