In [1]:
%reload_ext dotenv
%dotenv

In [2]:
import os
import time
import json
import random
import warnings
from pathlib import Path
from itertools import takewhile, repeat
from math import ceil

import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from sqlalchemy import create_engine, types
from sqlalchemy.sql import text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.engine.base import Engine

warnings.filterwarnings('ignore')

# Operational Data Setup

<div class="alert alert-block alert-warning"><b>Warning:</b> This notebook assumes that you understand the Logical and Platform Architecture of this project. If not, please see the README and related documentation to have a full understanding of this script.</div>

The purpose of [Yelp Dataset](https://www.yelp.com/dataset) is to be used for research purposes, because of that some information are already aggregated (e.g. number of stars) or transformed in some way. To be able to simulate all the steps from an operational data to an analytical data product, the objective of this notebook is to prepare this data in order to be as close as possible from what is believed to exists in a real world scenario (giving the available data). Some important takeways:

* We are assuming that the analytical data will be available in a SQL database;
* Only one instance of PostgreSQL is used for all domains which is probably different in a real world scenario where each domain manages it's own microservice (and related database), to mantain a certain degree of isolation each domain will have its own database;
* Data will be preserved as close as possible from the available in the dataset, this is, with the minimum number of transformations. The data products will handle the needed transformations;

**Running this script:** Create a file .env with the environment variables described in the secrets section. They will be loaded automatically when running this notebook

**Secrets**

In [3]:
POSTGRESQL_USER = os.environ.get("POSTGRESQL_USER")
POSTGRESQL_PASSWORD = os.environ.get("POSTGRESQL_PASSWORD")
POSTGRESQL_HOST = os.environ.get("POSTGRESQL_HOST")
POSTGRESQL_PORT = os.environ.get("POSTGRESQL_PORT")
DATABASE_OWNER =  os.environ.get("DATABASE_OWNER") # The same owner will be used for all databases (this should change on a prod environment)

**Parameters**

In [4]:
DATA_FOLDER = Path('../data')
CHUNK_SIZE = 1000 # Number of lines to be processed at a time when sending data to postgres

**Support Functions**

In [5]:
def rawincount(path: Path) -> int: # ref: https://stackoverflow.com/questions/845058/how-to-get-line-count-of-a-large-file-cheaply-in-python
    """Count the number of lines of a file without loading all into memory
    Args:
        filename: Path of the file to count the lines.
    Return:
        int: number of lines on the file.
    """
    f = open(path, 'rb')
    bufgen = takewhile(lambda x: x, (f.raw.read(1024*1024) for _ in repeat(None)))
    return sum( buf.count(b'\n') for buf in bufgen )

def pg_create_database(engine: Engine, name: str, owner: str) -> None:
    """Creates a database into postgres
    Args:
        engine: SQL Alchemy engine to connect with postgres.
        name: Name of the database.
        owner: name of the user that will be the owner of the database.
    """
    try:
        with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection:
            connection.execute(text(f'CREATE DATABASE {name} WITH OWNER "{owner}"'))
    except ProgrammingError as e:
        if e.orig.pgcode == "42P04": # i.e. Duplicated Database (ref: https://www.postgresql.org/docs/8.2/errcodes-appendix.html)
            pass
        else:
            raise e

def pg_run_script(engine: Engine, path: Path) -> None:
    """Runs an SQL script into Postgres
    Args:
        engine: SQL Alchemy engine to connect with postgres.
        path: Path of the .sql file to run into postgres.
        owner: name of the user that will be the owner of the database    
    """
    with open(path, 'r') as f:
        sql_query = f.read()
    with engine.connect() as connection:
        connection.execute(text(sql_query))
        connection.commit()

**Global Configs**

In [6]:
random.seed(42) # Set for reproducibility
global_engine = create_engine(f'postgresql://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}:{POSTGRESQL_PORT}')

## Business Domain
The business domain contain information about each business in Yelp such as address, location, categories and many other. See [Yelp Dataset Documentation](https://www.yelp.com/dataset/documentation/main)) for more complete information about the variables.

In [7]:
BUSINESS_DATASET_PATH = Path(DATA_FOLDER/'yelp_academic_dataset_business.json')
BUSINESS_SQL_PATH = Path('sql/business.sql')
BUSINESS_DATABASE_NAME = "business_domain"

### Example

In [8]:
with open(BUSINESS_DATASET_PATH, mode='r') as f:
    print(f.readline())

{"business_id":"Pns2l4eNsfO8kk83dixA6A","name":"Abby Rappoport, LAC, CMQ","address":"1616 Chapala St, Ste 2","city":"Santa Barbara","state":"CA","postal_code":"93101","latitude":34.4266787,"longitude":-119.7111968,"stars":5.0,"review_count":7,"is_open":0,"attributes":{"ByAppointmentOnly":"True"},"categories":"Doctors, Traditional Chinese Medicine, Naturopathic\/Holistic, Acupuncture, Health & Medical, Nutritionists","hours":null}



### Create database and tables

Here we create the database for the business domain and create the tables based on the schemas found on the `/sql` folder 

In [9]:
pg_create_database(global_engine, BUSINESS_DATABASE_NAME, DATABASE_OWNER)

business_engine = create_engine(f'postgresql://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}:{POSTGRESQL_PORT}/{BUSINESS_DATABASE_NAME}')
pg_run_script(business_engine, BUSINESS_SQL_PATH)

### Populate the tables with data

To populate the business dataset we only need to drop some columns that we assume that do not exists in a operational informations, since they are aggregations from other domains. This is, `stars` and `reviews_count` because they are from the **evaluation** domain:

In [10]:
n_lines = rawincount(BUSINESS_DATASET_PATH)
with business_engine.connect() as connection:
    for chunk in tqdm(pd.read_json(BUSINESS_DATASET_PATH, chunksize=CHUNK_SIZE, lines=True), total=ceil(n_lines/CHUNK_SIZE)):
        chunk = chunk.rename(columns={"business_id": "id"}) 
        chunk = chunk.drop(columns=["stars", "review_count"])
        chunk.to_sql('business', con=connection, if_exists='append', index=False, method='multi', dtype={'attributes': types.JSON, 'hours': types.JSON, 'is_open': types.BOOLEAN})

  0%|          | 0/16 [00:00<?, ?it/s]

## Users Domain

The users domain contain basic information about the users such as names, their friends, when they joined yelp and so on. See [Yelp Dataset Documentation](https://www.yelp.com/dataset/documentation/main)) for more complete information about the variables.

In [7]:
USERS_DATASET_PATH = Path(DATA_FOLDER/'yelp_academic_dataset_user.json')
USERS_SQL_PATH =  Path('sql/user.sql')
USERS_DATABASE_NAME = "user_domain"

### Example

In [12]:
with open(USERS_DATASET_PATH, mode='r') as f:
    example = json.loads(f.readline())
    del example["friends"] # Removed only for a more readable outpyt
    print(example)

{'user_id': 'qVc8ODYU5SZjKXVBgXdI7w', 'name': 'Walker', 'review_count': 585, 'yelping_since': '2007-01-25 16:47:26', 'useful': 7217, 'funny': 1259, 'cool': 5994, 'elite': '2007', 'fans': 267, 'average_stars': 3.91, 'compliment_hot': 250, 'compliment_more': 65, 'compliment_profile': 55, 'compliment_cute': 56, 'compliment_list': 18, 'compliment_note': 232, 'compliment_plain': 844, 'compliment_cool': 467, 'compliment_funny': 467, 'compliment_writer': 239, 'compliment_photos': 180}


### Create database and tables

In [13]:
pg_create_database(global_engine, USERS_DATABASE_NAME, DATABASE_OWNER)

users_engine = create_engine(f'postgresql://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}:{POSTGRESQL_PORT}/{USERS_DATABASE_NAME}')
pg_run_script(users_engine, USERS_SQL_PATH)

### Populate the tables with data

#### Users and elite members table

Populate users domain data requires two steps. On the first we are going to populate 2 tables, the `users` that contain basic information about an user and the `elite_members` table that contain all the elite members by the year that they were. Here we follow the same approach for the business dataset by removing some data that are from other domains (e.g. `review_count`, `funny` and many others).

In [14]:
def generate_elite_member_table(df: pd.DataFrame) -> pd.DataFrame:
    """Transform the `elite` variable into a table

    This function gets the `elite` string variable (years as elite
    with a ',' separator and transforms into a Datataframe with
    'user_id' and 'year' as columns of the data.
    
    Args:
        df: Dataframe that contains the elite members variable

    Returns:
        pd.Dataframe: Dataframe with 'user_id' and 'year' as columns
    """
    return_data = []
    for i, row in df.iterrows():
        for year in row['elite'].split(','):
            if year == '20': # Just a small correction where all 2020 year appears as 20,20
                return_data.append({'user_id': row['id'], 'year': 2020})
            elif year != '':
                return_data.append({'user_id': row['id'], 'year': int(year)})
    return pd.DataFrame(return_data).drop_duplicates()


n_lines = rawincount(USERS_DATASET_PATH)
with users_engine.connect() as connection:
    for chunk in tqdm(pd.read_json(USERS_DATASET_PATH, chunksize=CHUNK_SIZE, lines=True), total=ceil(n_lines/CHUNK_SIZE)):
        chunk = chunk.rename(columns={"user_id": "id"}) 

        # Users table
        users_chunk = chunk[['id', 'name', 'yelping_since']]
        users_chunk.to_sql('users', con=connection, if_exists='append', index=False, method='multi') 

        # Elite Table
        elite_chunk = generate_elite_member_table(chunk)
        elite_chunk.to_sql('elite_members', con=connection, if_exists='append', index=False, method='multi')

  0%|          | 0/199 [00:00<?, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

#### Friends table

The `friends` table will represent the connection between user as a bi-directional graph using the user ids as foreign keys (and nodes). Also, some users in the friends list do not exists in the `users` database. To solve this problem the approach was to get only the users that exists and insert as friends.

In [15]:
with open(USERS_DATASET_PATH, mode='r') as f:
    users_list = set([json.loads(line)["user_id"] for line in f])

In [16]:
whn_lines = rawincount(USERS_DATASET_PATH)
with users_engine.connect() as connection:
    for chunk in tqdm(pd.read_json(USERS_DATASET_PATH, chunksize=CHUNK_SIZE, lines=True), total=ceil(n_lines/CHUNK_SIZE), desc="Chunks"):
        chunk = chunk[["user_id", "friends"]]
        chunk.friends = chunk.friends.str.split(", ")
        chunk.friends = chunk.friends.transform(lambda x: set(x).intersection(users_list))
        chunk = chunk.explode("friends")
        chunk = chunk.rename(columns={"user_id": "previous_user", "friends": "next_user"})
        chunk = chunk.dropna()
        chunk.to_sql('friends', con=connection, if_exists='append', index=False, method='multi') 

Chunks:   0%|          | 0/199 [00:00<?, ?it/s]

## Checkin Domain

This domain contain information about each check-in made on a business by an user. See [Yelp Dataset Documentation](https://www.yelp.com/dataset/documentation/main)) for more complete information about the variables.

In [8]:
CHECKIN_DATASET_PATH = Path(DATA_FOLDER/'yelp_academic_dataset_checkin.json')
CHECKIN_SQL_PATH =  Path('sql/checkin.sql')
CHECKIN_DATABASE_NAME = "checkin_domain"

### Example

In [9]:
with open(CHECKIN_DATASET_PATH, mode='r') as f:
    print(f.readline())

{"business_id":"---kPU91CF4Lq2-WlRu9Lw","date":"2020-03-13 21:10:56, 2020-06-02 22:18:06, 2020-07-24 22:42:27, 2020-10-24 21:36:13, 2020-12-09 21:23:33, 2021-01-20 17:34:57, 2021-04-30 21:02:03, 2021-05-25 21:16:54, 2021-08-06 21:08:08, 2021-10-02 15:15:42, 2021-11-11 16:23:50"}



### Create database and Tables

In [10]:
pg_create_database(global_engine, CHECKIN_DATABASE_NAME, DATABASE_OWNER)

checkin_engine = create_engine(f'postgresql://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}:{POSTGRESQL_PORT}/{CHECKIN_DATABASE_NAME}')
pg_run_script(checkin_engine, CHECKIN_SQL_PATH)

### Populate tables with data

A check-in is a relationship between an user and a business. This is a very sensitive information to be made public so, as can be seen in the example above, Yelp removed this information from the data. Since we are simulating a real world dataset, our approach will be to randomly select (with repetition) an existing `user_id` for each business checkin, this is not an optimal way to simulate but should be sufficient to create example products and dashboards based on this data.

<div class="alert alert-block alert-info"> <b>Note:</b> To maintain reproducibility note that the random seed is fixed on the beginning of this notebook</div>

In [11]:
with open(USERS_DATASET_PATH, mode='r') as f:
    users_list = [json.loads(line)["user_id"] for line in f]

In [12]:
n_lines = rawincount(CHECKIN_DATASET_PATH)
with checkin_engine.connect() as connection:
    for chunk in tqdm(pd.read_json(CHECKIN_DATASET_PATH, chunksize=CHUNK_SIZE, lines=True), total=ceil(n_lines/CHUNK_SIZE), desc="Chunks"):
        chunk.date = chunk.date.str.split(", ")
        chunk = chunk.explode("date")
        chunk["user_id"] = np.random.choice(users_list, size=len(chunk), replace=True)
        chunk = chunk.rename(columns={"date": "checkin_date"})
        chunk.to_sql('checkins', con=connection, if_exists='append', index=False, method='multi') 

Chunks:   0%|          | 0/132 [00:00<?, ?it/s]

## Evaluations Domain

This evaluations domain contain information about tips and reviews made by users on business. See [Yelp Dataset Documentation](https://www.yelp.com/dataset/documentation/main)) for more complete information about the variables.

In [18]:
REVIEWS_DATASET_PATH = Path(DATA_FOLDER/'yelp_academic_dataset_review.json')
TIP_DATASET_PATH = Path(DATA_FOLDER/'yelp_academic_dataset_tip.json')

EVALUATIONS_SQL_PATH =  Path('sql/evaluations.sql')
EVALUATIONS_DATABASE_NAME = "evaluations_domain"

### Examples

In [19]:
with open(REVIEWS_DATASET_PATH, mode='r') as f:
    print(f.readline())

{"review_id":"KU_O5udG6zpxOg-VcAEodg","user_id":"mh_-eMZ6K5RLWhZyISBhwA","business_id":"XQfwVwDr-v0ZS3_CbbE5Xw","stars":3.0,"useful":0,"funny":0,"cool":0,"text":"If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have been to it's other locations in NJ and never had a bad experience. \n\nThe food is good, but it takes a very long time to come out. The waitstaff is very young, but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt for another diner or restaurant on the weekends, in order to be done quicker.","date":"2018-07-07 22:09:11"}



In [20]:
with open(TIP_DATASET_PATH, mode='r') as f:
    print(f.readline())

{"user_id":"AGNUgVwnZUey3gcPCJ76iw","business_id":"3uLgwr0qeCNMjKenHJwPGQ","text":"Avengers time with the ladies.","date":"2012-05-18 02:17:21","compliment_count":0}



### Create database and Tables

In [21]:
pg_create_database(global_engine, EVALUATIONS_DATABASE_NAME, DATABASE_OWNER)

evaluations_engine = create_engine(f'postgresql://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}:{POSTGRESQL_PORT}/{EVALUATIONS_DATABASE_NAME}')
pg_run_script(evaluations_engine, EVALUATIONS_SQL_PATH)

### Populate tables with data

These tables doesn't require any major change in the data besides some variable renaming, so we are only going to add this information in their respective tables

In [22]:
n_lines = rawincount(REVIEWS_DATASET_PATH)
with evaluations_engine.connect() as connection:
    for chunk in tqdm(pd.read_json(REVIEWS_DATASET_PATH, chunksize=CHUNK_SIZE, lines=True), total=ceil(n_lines/CHUNK_SIZE), desc="Chunks"):
        chunk = chunk.rename(columns={"review_id": "id", "useful": "useful_count", "funny": "funny_count", "cool": "cool_count", "text": "content", "date": "review_date"})
        chunk.to_sql('reviews', con=connection, if_exists='append', index=False, method='multi') 

Chunks:   0%|          | 0/6991 [00:00<?, ?it/s]

In [23]:
n_lines = rawincount(TIP_DATASET_PATH)
with evaluations_engine.connect() as connection:
    for chunk in tqdm(pd.read_json(TIP_DATASET_PATH, chunksize=CHUNK_SIZE, lines=True), total=ceil(n_lines/CHUNK_SIZE), desc="Chunks"):
        chunk = chunk.rename(columns={"text": "content", "date": "tips_date"})
        chunk.to_sql('tips', con=connection, if_exists='append', index=False, method='multi') 

Chunks:   0%|          | 0/909 [00:00<?, ?it/s]