In [1]:
import sys
sys.path.append("/work")

import os
import sqlalchemy
import pandas as pd
from uuid import uuid4
import pandas_datareader as web
from datetime import datetime, timedelta
from random import randint, choice, random

from data_gen.constants import PAYMENT_METHODS, LOCATIONS, PRODUCTS, UNIT_PRICES, TAX_RATE, EMPLOYEES

## Setup

### PostgreSQL
Before you set up this integration, you need to create a PostgreSQL instance in GCP's Cloud SQL service. After you created it, you will get access to the conenction details required below.

First you need to set up the database connection to PostgreSQL. You need to do that by setting an environment variable in the appropriate Deepnote integration. 
1. Click 'Integration' on the top of the left sidebar
2. Select 'Environment variables'
3. Name your integration (can be anything, e.g. use 'env')
4. Add a key 'PG_CONN'
5. Add the value, which is a string that looks like this:
    ```
    postgresql://postgres:[PG_PASSWORD]@[DB_HOST]/postgres
    ```
    Where you need to replace `PG_PASSWORD` with the master password of your database, and `DB_HOST` with the IP address of the server then your database runs.
6. Click 'Save'
7. Go to the 'Integrations' tab on the right side, find your itegration that you just created, then click 'Connect'
8. Run the next cell to verify that the postgres connection string was saved correctly.


In [2]:
os.getenv("PG_CONN")

'postgresql://postgres:dagstercourse@34.171.169.167/postgres'

### Google Drive
Deepnote has a really handy Drive integration. You need to set it up, and it will mount your Drive to the instance's local file system, and you will be able to read/write files just as you would on your local disk.
1. Go to 'Integrations' on the top left
2. Select 'Google Drive'
3. Click 'Grant permission to Google Drive'
4. Sign in tou your Google account, then click 'Allow'
5. Name your integration (can be anything, e.g. 'drive')
6. Click 'Create integration'
7. Go to the 'Integrations' tab on the right side, find your itegration that you just created, then click 'Connect'
8. Run the next cell to verify that the Drive connection works correctly.

In [3]:
!ls /datasets/drive

markets


## Create `products` table
This table wil contain a (short) list of products, with names, SKUs (Stock Keeping Unit, basically a product ID), and unit price.

In [4]:
products_df = pd.DataFrame.from_dict({
    "sku": PRODUCTS.keys(),
    "name": PRODUCTS.values(),
    "unit_price": UNIT_PRICES.values()
})
products_df.to_sql(
    name="products",
    con=os.getenv("PG_CONN"),
    if_exists="replace"  # If the table already exists, drop and replace it
)

## Create `pos_transactions` table
These transactions represent records generated by an online Point-Of-Sale terminal. Since there is no manual process involved, the data should not contain malformed cells.

In [5]:
def next_timestamp(ts):
    """
    A function that takes a timestamp, adds a random amount of minutes to it, and returns the new timestamp.
    It also ensures that the returned time is within working hours, so the generated data is more realistic.
    """
    # Add a random amount of minutes
    ts += timedelta(minutes=randint(1, 59))

    # Keep adding time until we get a timestamp within working hours
    while ts.hour < 8 or 18 < ts.hour:
        ts += timedelta(minutes=randint(1, 59))
        
    return ts

In [6]:
records_to_generate = 10000

# Define starting timestamp
ts = datetime(2022, 1, 1, 0, 0, 0)

# Generate list of transactions as list of dictionaries
transactions = []
for i in range(records_to_generate):
    sku = choice(list(PRODUCTS.keys()))
    unit_price = UNIT_PRICES[sku]
    qty = randint(1, 30)
    tax = round(unit_price * qty * TAX_RATE, 2)
    ts = next_timestamp(ts)
    transactions.append({
        "transaction_id": uuid4(),
        "created_at": ts,
        "location": choice(LOCATIONS),
        "sku": sku,
        "payment_method": choice(PAYMENT_METHODS),
        "unit_price": unit_price,
        "quantity": qty,
        "tax": tax,
        "total": round(unit_price * qty + tax, 2)
    })

# Create DataFrame from records
df = pd.DataFrame(transactions)

# Save DataFrame to the database
df.to_sql(
    name="pos_transactions",
    con=os.getenv("PG_CONN"),
    if_exists="replace",
    index=False
)
df.head()

Unnamed: 0,transaction_id,created_at,location,sku,payment_method,unit_price,quantity,tax,total
0,10539324-0e0e-4cbd-ba31-b14d5eb75c70,2022-01-01 08:15:00,supermarket,87245676,debit,10.99,5,2.75,57.7
1,37aed14e-d6d0-40a5-a9fa-722a3040d0c3,2022-01-01 08:34:00,supermarket,98623454,cash,3.49,10,1.75,36.65
2,a1a7481b-3f6c-4f92-bce3-0102ec81cfb9,2022-01-01 09:03:00,main_farm,83846512,credit,4.99,20,4.99,104.79
3,57d0bd78-23b3-4dc0-b954-316143ff84cf,2022-01-01 10:00:00,full_moon_market,98623454,debit,3.49,3,0.52,10.99
4,8234854a-cde3-44f8-b6b5-65cd3e62e78d,2022-01-01 10:35:00,full_moon_market,83846512,credit,4.99,30,7.49,157.19


## Create `online_transactions` table
These transactions contain data about products sold online through the farm's website. It uses a Stripe integration for payments. A webhook has been set up that receives the transaction data in JSON format and dumps it to a postgres table.

The example was taken from Stripe's [official documentation](https://stripe.com/docs/api/treasury/received_debits)

Since this data is coming from Stripe's API, we can reasonable expect that it will conform the the schema, so we don't need to introduce errors.


In [7]:
def get_stripe_trx(amount, timestamp, product):
    return {
        "id": f"rd_{uuid4()}",
        "object": choice(PAYMENT_METHODS),
        "amount": amount,
        "created": int(timestamp.strftime('%s')),
        "currency": "usd",
        "description": product,
        "failure_code": None,
        "financial_account": "fa_1M0ssL2eZvKYlo2CxpxElcdc",
        "hosted_regulatory_receipt_url": "https://payments.stripe.com/regulatory-receipt/CBQaFwoVYWNjdF8xMDMyRDgyZVp2S1lsbzJDKOWLm5sGMgZpYhYRf1U6NpMOqjC0B048hqZyfKpl_OLHvSyHk6xOGyDC0EWRu2b_C5GFr-arpq9mh6OD622AFR459Sq9SA",
        "initiating_payment_method_details": {
            "billing_details": {
            "address": {
                "city": None,
                "country": None,
                "line1": None,
                "line2": None,
                "postal_code": None,
                "state": None
            },
            "email": None,
            "name": "Jane Austen"
            },
            "type": "us_bank_account",
            "us_bank_account": {
            "bank_name": "STRIPE TEST BANK",
            "last4": "6789",
            "routing_number": "110000000"
            }
        },
        "linked_flows": {
            "debit_reversal": None,
            "inbound_transfer": None,
            "issuing_authorization": None,
            "issuing_transaction": None
        },
        "livemode": False,
        "network": "ach",
        "reversal_details": {
            "deadline": 1667865600,
            "restricted_reason": None
        },
        "status": "succeeded",
        "transaction": "trxn_1M0ssL2eZvKYlo2CiSWY33W5"
    }

In [8]:
records_to_generate = 1000

# Define starting timestamp
ts = datetime(2022, 1, 1, 0, 0, 0)

# Generate list of transactions as list of dictionaries
transactions = []
for i in range(records_to_generate):
    sku = choice(list(PRODUCTS.keys()))
    product = PRODUCTS[sku]
    unit_price = UNIT_PRICES[sku]
    qty = randint(1, 30)
    tax = round(unit_price * qty * TAX_RATE, 2)
    ts = next_timestamp(ts)
    amount = round(unit_price * qty + tax, 2)

    stripe_trx = get_stripe_trx(amount, ts, product)

    transactions.append({"stripe_data": stripe_trx})

# Create DataFrame from records
df = pd.DataFrame(transactions)

# Save DataFrame to database
df.to_sql(
    name="online_transactions",
    con=os.getenv("PG_CONN"),
    if_exists="replace",
    index=False,
    dtype={"stripe_data": sqlalchemy.types.JSON},
)

df.head()

Unnamed: 0,stripe_data
0,{'id': 'rd_f0b3c0bd-44a5-452b-b347-6f76b58199e...
1,{'id': 'rd_940ed7d7-e004-4749-9ab6-429873a3aed...
2,{'id': 'rd_99577654-9636-42d0-a532-4eae6b5e11c...
3,{'id': 'rd_8b532d49-8a6a-4879-a095-051b61cca4d...
4,{'id': 'rd_b22d50f1-6807-4107-8a49-00c5aadc91e...


## Create `market_transactions` table
These are records from sales made by various employees at different farmers markets. The data was manually entered to an Excel table then uploaded to Google Drive. The file name normally follows a schema that identifies the market location, employee and market date.

Since all of this data was entered manually, we can expect all kinds of errors: 
1. Misspelling
2. Missing values
3. Invalid date formats
4. File names not conforming to the schema
We will add all of these to the generated data.

In [9]:
def replace_letters(string):
    """
    With 5% chance it replaces some letters in the passed string. The number of replaced letters
    is also choosen randomly, at minimum 1, at maximum all of the letters gets replaced.
    With another 5% chance, nothing is returned.
    This should imitate typos and missing data in manually input data.
    """

    alphabet = "abcdefghijklmnopqrstuvwxyz"
    roll = random()

    if roll < 0.05:
        letter_count_to_replace = randint(1, len(string))
        for i in range(letter_count_to_replace):
            letter_to_replace = randint(0, len(string) - 1)
            string = list(string)
            string[letter_to_replace] = choice(string)
            string = "".join(string)
        return string
    elif 0.05 <= roll < 0.1:
        return ""
    else:
        return string

In [10]:
def malform_date(ts):
    date_roll = random()
    if date_roll < 0.05:
        # 5% chance of different year formatting
        date = ts.strftime('%y-%m-%d')
    elif 0.05 <= date_roll and date_roll < 0.1:
        # 5% chance of invalid month
        date = ts.strftime('%Y-%m-%d')
        year, month, day = date.split("-")
        month = int(month) + 12
        date = f"{year}-{month}-{day}"
    elif 0.1 <= date_roll and date_roll < 0.15:
        # 5% chance of different separator
        date = ts.strftime('%y %m %d')
    else:
        # 85% chance of valid date
        date = ts.strftime('%Y-%m-%d')

    return date

In [11]:
def malform_filename(location, date, employee):
    filename_roll = random()
    if filename_roll < 0.05:
        # 5% chance for wrong separator
        filename = f"{location}-{date}-{employee}"
    else:
        # 95% chance for correct filename
        filename = f"{location}__{date}__{employee}"
    
    return filename

In [12]:
def malform_time(ts):
    time_roll = random()
    if time_roll < 0.05:
        # 5% chance for invalid minute
        sold_at = ts.strftime("%H:%M")
        hours, minutes = sold_at.split(":")
        sold_at = f"{hours}:{int(minutes) + 60}"
    else:
        sold_at = ts.strftime("%H:%M")
    
    return sold_at

In [13]:
# Remove existing files
!rm -rf /datasets/drive/markets
!mkdir /datasets/drive/markets

In [14]:
files_to_generate = 25

start_date = datetime(2020, 1, 1, 6, 0, 0)
for event_index in range(files_to_generate):
    # Space out events one week from each other
    ts = start_date + timedelta(days=event_index * 7 + 1)

    # Generate location and employee with possible typos
    location = replace_letters(choice(LOCATIONS))
    employee = replace_letters(choice(EMPLOYEES))
    date = malform_date(ts)
    filename = malform_filename(location, date, employee)

    transactions = []
    for i in range(100):
        sku = choice(list(PRODUCTS.keys()))
        sold_at = malform_time(ts)
        product = replace_letters(PRODUCTS[sku])
        unit_price = UNIT_PRICES[sku]
        qty = randint(1, 10)
        ts = next_timestamp(ts)

        transactions.append({
            "sale_number": i + 1,
            "sold_at": sold_at,
            "product": product,
            "unit_price": unit_price,
            "quantity": qty,
        })

        # Stop adding new sales after 4 PM
        if int(ts.hour) > 15:
            break

    # Create DataFrame from records
    df = pd.DataFrame(transactions)

    # Dump DataFrame to disk (which is an emulated Google Drive)
    df.to_excel(f"/datasets/drive/markets/{filename}.xlsx")

## Create `crypto_transactions` table
These records are coming from berry sales that were paid for by Ethereum. These were generated by an automated system, so we don't expect typos here. The only difference is that the totals are saved in ETH, not in USD. To be able to accurately calculate those, we need daily price data of ETH.

In [15]:
# Get ETH price year to date
ts = datetime(2022, 1, 1, 0, 0, 0)

eth_price = web.DataReader('ETH-USD', 'yahoo', ts, datetime.now())
eth_price.head(2)

Unnamed: 0_level_0,High,Low,Open,Close,Volume,Adj Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2022-01-01,3769.917969,3682.286621,3683.047119,3769.697021,9776191466,3769.697021
2022-01-02,3836.161865,3727.357422,3769.29834,3829.564941,9881471548,3829.564941


In [16]:
records_to_generate = 100

transactions = []
for i in range(records_to_generate):
    sku = choice(list(PRODUCTS.keys()))
    unit_price = UNIT_PRICES[sku]
    qty = randint(1, 30)
    tax = round(unit_price * qty * TAX_RATE, 2)
    ts = next_timestamp(ts)

    # Get opening price of ETH in the day of the transaction
    dt_index = ts.strftime("%Y-%m-%d")
    eth_prev_open = eth_price.loc[dt_index]["Open"]

    transactions.append({
        "transaction_id": uuid4(),
        "created_at": ts,
        "location": "main_farm",
        "sku": sku,
        "payment_method": "crypto",
        "currency": "ETH",
        "quantity": qty,
        "total": round(unit_price * qty + tax, 2) / eth_prev_open
    })

# Create DataFrame from records
df = pd.DataFrame(transactions)

# Save DataFrame to the database
df.to_sql(
    name="crypto_transactions",
    con=os.getenv("PG_CONN"),
    if_exists="replace",
    index=False
)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=a40938d6-6870-4a04-a86a-fca6d164ec7f' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>