In [1]:
# Instalando requiments
!pip install -r requirements.txt



In [2]:
#import sys
#sys.path.append("/work")

import os
import decouple
import sqlalchemy
import pandas as pd
from uuid import uuid4
from pathlib import Path
import pandas_datareader.data as web
import yfinance as yf
from datetime import datetime, timedelta
from random import randint, random
from numpy.random import choice

In [3]:
# Set it to False if running in Deepnote, and True if running locally using Jupyter
is_local = True

In [4]:
# Including constants here instead of importing it, so this notebook is self-contained and can be downloaded
# and run locally

TAX_RATE = 0.05

PAYMENT_METHODS = ["cash", "credit", "debit"]

LOCATIONS = ["Bangor, ME", "Concord, NH", "Portland, ME", "Portsmouth, NH"]

EMPLOYEES = ["james", "sarah", "carmen", "peter"]

PRODUCTS = {
    24625356: "strawberries",
    98320088: "blueberries",
    83846512: "blackberries",
    98623454: "blackcurrants",
    87245676: "salmonberries",
    12635273: "raspberries",
}

UNIT_PRICES = {24625356: 6.99, 98320088: 8.99, 83846512: 4.99, 98623454: 3.49, 87245676: 10.99, 12635273: 10.49}


In [5]:
config = decouple.AutoConfig(' ')

In [6]:
engine = config('PG_CONN')

## Create `products` table
This table wil contain a (short) list of products, with names, SKUs (Stock Keeping Unit, basically a product ID), and unit price.

In [7]:
products_df = pd.DataFrame.from_dict({
    "sku": PRODUCTS.keys(),
    "name": PRODUCTS.values(),
    "unit_price": UNIT_PRICES.values()
})
products_df.to_sql(
    name="products",
    con=engine,
    if_exists="replace"  # If the table already exists, drop and replace it
)

6

## Create `pos_transactions` table
These transactions represent records generated by an online Point-Of-Sale terminal. Since there is no manual process involved, the data should not contain malformed cells.

In [8]:
def next_timestamp(ts):
    """
    A function that takes a timestamp, adds a random amount of minutes to it, and returns the new timestamp.
    It also ensures that the returned time is within working hours, so the generated data is more realistic.
    """
    # Add a random amount of minutes
    ts += timedelta(minutes=randint(1, 59))

    # Keep adding time until we get a timestamp within working hours
    while ts.hour < 8 or 18 < ts.hour:
        ts += timedelta(minutes=randint(1, 59))
        
    return ts

In [9]:
records_to_generate = 10000

# Define starting timestamp
ts = datetime(2020, 1, 1, 0, 0, 0)

# Generate list of transactions as list of dictionaries
transactions = []
for i in range(records_to_generate):
    sku = choice(list(PRODUCTS.keys()), p=[0.1, 0.15, 0.1, 0.4, 0.05, 0.2])
    unit_price = UNIT_PRICES[sku]
    qty = randint(1, 30)
    tax = round(unit_price * qty * TAX_RATE, 2)
    ts = next_timestamp(ts)

    # Avoid generating data in the future
    if ts >= datetime.now():
        break

    transactions.append({
        "transaction_id": uuid4(),
        "created_at": ts,
        "location": choice(LOCATIONS, p=[0.25, 0.1, 0.15, 0.5]),
        "sku": sku,
        "payment_method": choice(PAYMENT_METHODS, p=[0.5, 0.4, 0.1]),
        "unit_price": unit_price,
        "quantity": qty,
        "tax": tax,
        "total": round(unit_price * qty + tax, 2)
    })

# Create DataFrame from records
df = pd.DataFrame(transactions)

# Save DataFrame to the database
df.to_sql(
    name="pos_transactions",
    con=engine,
    if_exists="replace",
    index=False
)
df.head()

Unnamed: 0,transaction_id,created_at,location,sku,payment_method,unit_price,quantity,tax,total
0,0c53ce04-6185-4976-9f51-12222681a25b,2020-01-01 08:06:00,"Portland, ME",83846512,debit,4.99,21,5.24,110.03
1,9888bba8-deea-445c-a48b-ddba6b83cb27,2020-01-01 08:47:00,"Portsmouth, NH",98623454,cash,3.49,20,3.49,73.29
2,c05ac905-23b0-45db-b592-d9a37c3f7185,2020-01-01 08:57:00,"Concord, NH",87245676,credit,10.99,22,12.09,253.87
3,2d1c740d-b09a-48ab-a369-cd7cef701188,2020-01-01 09:47:00,"Portsmouth, NH",83846512,debit,4.99,10,2.5,52.4
4,5bc13f61-78af-4958-95a9-f6c20e84d9b8,2020-01-01 10:12:00,"Portsmouth, NH",98623454,debit,3.49,9,1.57,32.98


## Create `online_transactions` table
These transactions contain data about products sold online through the farm's website. It uses a Stripe integration for payments. A webhook has been set up that receives the transaction data in JSON format and dumps it to a postgres table.

The example was taken from Stripe's [official documentation](https://stripe.com/docs/api/treasury/received_debits)

Since this data is coming from Stripe's API, we can reasonable expect that it will conform the the schema, so we don't need to introduce errors.


In [13]:
def get_stripe_trx(amount, timestamp, product):
    import time
    return {
        "id": f"rd_{uuid4()}",
        "object": choice(PAYMENT_METHODS, p=[0, 0.8, 0.2]),
        "amount": amount,
        "created": int(time.mktime(timestamp.timetuple())),
        "currency": "usd",
        "description": product,
        "failure_code": None,
        "financial_account": "fa_1M0ssL2eZvKYlo2CxpxElcdc",
        "hosted_regulatory_receipt_url": "https://payments.stripe.com/regulatory-receipt/CBQaFwoVYWNjdF8xMDMyRDgyZVp2S1lsbzJDKOWLm5sGMgZpYhYRf1U6NpMOqjC0B048hqZyfKpl_OLHvSyHk6xOGyDC0EWRu2b_C5GFr-arpq9mh6OD622AFR459Sq9SA",
        "initiating_payment_method_details": {
            "billing_details": {
            "address": {
                "city": None,
                "country": None,
                "line1": None,
                "line2": None,
                "postal_code": None,
                "state": None
            },
            "email": None,
            "name": "Jane Austen"
            },
            "type": "us_bank_account",
            "us_bank_account": {
            "bank_name": "STRIPE TEST BANK",
            "last4": "6789",
            "routing_number": "110000000"
            }
        },
        "linked_flows": {
            "debit_reversal": None,
            "inbound_transfer": None,
            "issuing_authorization": None,
            "issuing_transaction": None
        },
        "livemode": False,
        "network": "ach",
        "reversal_details": {
            "deadline": 1667865600,
            "restricted_reason": None
        },
        "status": "succeeded",
        "transaction": "trxn_1M0ssL2eZvKYlo2CiSWY33W5"
    }

In [14]:
records_to_generate = 1000

# Define starting timestamp
ts = datetime(2020, 1, 1, 0, 0, 0)

# Generate list of transactions as list of dictionaries
transactions = []
for i in range(records_to_generate):
    sku = choice(list(PRODUCTS.keys()), p=[0.15, 0.2, 0.3, 0.1, 0.1, 0.15])
    product = PRODUCTS[sku]
    unit_price = UNIT_PRICES[sku]
    qty = randint(1, 30)
    tax = round(unit_price * qty * TAX_RATE, 2)
    ts = next_timestamp(ts)

    # Avoid generating data in the future
    if ts >= datetime.now():
        break

    amount = round(unit_price * qty + tax, 2)

    stripe_trx = get_stripe_trx(amount, ts, product)

    transactions.append({"stripe_data": stripe_trx})

# Create DataFrame from records
df = pd.DataFrame(transactions)

# Save DataFrame to database
df.to_sql(
    name="online_transactions",
    con=engine,
    if_exists="replace",
    index=False,
    dtype={"stripe_data": sqlalchemy.types.JSON},
)

df.head()

Unnamed: 0,stripe_data
0,{'id': 'rd_d367c016-d905-4ab9-92de-72dffa6bea9...
1,{'id': 'rd_9f02b22e-1117-4d4c-9f67-115557de3ae...
2,{'id': 'rd_b93e5b70-146d-4920-a90b-8a68ad31add...
3,{'id': 'rd_ea996ee3-6135-49e5-a7bb-11f2ebf567f...
4,{'id': 'rd_06a272da-c9d9-4a4e-a329-18d8ffad45f...


## Create `market_transactions` table
These are records from sales made by various employees at different farmers markets. The data was manually entered to an Excel table then uploaded to Google Drive. The file name normally follows a schema that identifies the market location, employee and market date.

Since all of this data was entered manually, we can expect all kinds of errors: 
1. Misspelling
2. Missing values
3. Invalid date formats
4. File names not conforming to the schema
We will add all of these to the generated data.

In [15]:
def replace_letters(string):
    """
    With 5% chance it replaces some letters in the passed string. The number of replaced letters
    is also choosen randomly, at minimum 1, at maximum all of the letters gets replaced.
    With another 5% chance, nothing is returned.
    This should imitate typos and missing data in manually input data.
    """

    alphabet = "abcdefghijklmnopqrstuvwxyz"
    roll = random()

    if roll < 0.05:
        letter_count_to_replace = randint(1, len(string))
        for i in range(letter_count_to_replace):
            letter_to_replace = randint(0, len(string) - 1)
            string = list(string)
            string[letter_to_replace] = choice(string)
            string = "".join(string)
        return string
    elif 0.05 <= roll < 0.1:
        return ""
    else:
        return string

In [16]:
def malform_date(ts):
    date_roll = random()
    if date_roll < 0.05:
        # 5% chance of different year formatting
        date = ts.strftime('%y-%m-%d')
    elif 0.05 <= date_roll and date_roll < 0.1:
        # 5% chance of invalid month
        date = ts.strftime('%Y-%m-%d')
        year, month, day = date.split("-")
        month = int(month) + 12
        date = f"{year}-{month}-{day}"
    elif 0.1 <= date_roll and date_roll < 0.15:
        # 5% chance of different separator
        date = ts.strftime('%y %m %d')
    else:
        # 85% chance of valid date
        date = ts.strftime('%Y-%m-%d')

    return date

In [17]:
def malform_filename(location, date, employee):
    filename_roll = random()
    if filename_roll < 0.05:
        # 5% chance for wrong separator
        filename = f"{location}-{date}-{employee}"
    else:
        # 95% chance for correct filename
        filename = f"{location}__{date}__{employee}"
    
    return filename

In [18]:
def malform_time(ts):
    time_roll = random()
    if time_roll < 0.05:
        # 5% chance for invalid minute
        sold_at = ts.strftime("%H:%M")
        hours, minutes = sold_at.split(":")
        sold_at = f"{hours}:{int(minutes) + 60}"
    else:
        sold_at = ts.strftime("%H:%M")
    
    return sold_at

In [19]:
if is_local:
    root_dir = "./"
else:
    root_dir = '/datasets/drive/'

markets_path = Path(root_dir) / "markets"
    
# Create directory if doesn't exist
if not os.path.exists(markets_path):
   os.makedirs(markets_path)

# Remove existing files
for f in os.scandir(markets_path):
    os.remove(f.path)

In [20]:
files_to_generate = 75

start_date = datetime(2020, 1, 1, 6, 0, 0)
for event_index in range(files_to_generate):
    # Space out events one week from each other
    ts = start_date + timedelta(days=event_index * 2 + 1)

    # Avoid generating data in the future
    if ts >= datetime.now():
        break

    # Generate location and employee with possible typos
    location = replace_letters(choice(LOCATIONS))
    employee = replace_letters(choice(EMPLOYEES))
    date = malform_date(ts)
    filename = malform_filename(location, date, employee)

    transactions = []
    for i in range(100):
        sku = choice(list(PRODUCTS.keys()), p=[0.2, 0.05, 0.2, 0.3, 0.05, 0.2])
        sold_at = malform_time(ts)
        product = replace_letters(PRODUCTS[sku])
        unit_price = UNIT_PRICES[sku]

        # Sell more in the summer
        factor = 1
        if ts.month in [5, 6, 7, 8]:
            factor = 1.2
            
        qty = round(randint(1, 10) * factor)
        ts = next_timestamp(ts)

        transactions.append({
            "sale_number": i + 1,
            "sold_at": sold_at,
            "product": product,
            "unit_price": unit_price,
            "quantity": qty,
        })

        # Stop adding new sales after 4 PM
        if int(ts.hour) > 15:
            break

    # Create DataFrame from records
    df = pd.DataFrame(transactions)

    # Dump DataFrame to disk (which is an emulated Google Drive)
    if is_local:
        path = "./markets/{filename}.xlsx"
    else:
        path = f"/datasets/drive/markets/{filename}.xlsx"
    df.to_excel(path)

## Create `crypto_transactions` table
These records are coming from berry sales that were paid for by Ethereum. These were generated by an automated system, so we don't expect typos here. The only difference is that the totals are saved in ETH, not in USD. To be able to accurately calculate those, we need daily price data of ETH.

In [21]:
datetime.now()

datetime.datetime(2023, 8, 12, 21, 41, 30, 656237)

In [22]:
# Get ETH price year to date
yf.pdr_override()
ts = datetime(2020, 1, 1, 0, 0, 0)

eth_price = web.get_data_yahoo('ETH-USD', ts, datetime.now())
eth_price.head(2)

[*********************100%***********************]  1 of 1 completed


Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2020-01-01,129.630661,132.835358,129.198288,130.802002,130.802002,7935230330
2020-01-02,130.820038,130.820038,126.95491,127.410179,127.410179,8032709256


In [23]:
records_to_generate = 100

transactions = []
for i in range(records_to_generate):
    sku = choice(list(PRODUCTS.keys()), p=[0.1, 0.15, 0.1, 0.4, 0.05, 0.2])
    unit_price = UNIT_PRICES[sku]
    qty = randint(1, 30)
    tax = round(unit_price * qty * TAX_RATE, 2)
    ts = next_timestamp(ts)

    # Avoid generating data in the future
    if ts >= datetime.now():
        break

    # Get opening price of ETH in the day of the transaction
    dt_index = ts.strftime("%Y-%m-%d")
    eth_prev_open = eth_price.loc[dt_index]["Open"]

    transactions.append({
        "transaction_id": uuid4(),
        "created_at": ts,
        "location": "main_farm",
        "sku": sku,
        "payment_method": "crypto",
        "currency": "ETH",
        "quantity": qty,
        "total": round(unit_price * qty + tax, 2) / eth_prev_open
    })

# Create DataFrame from records
df = pd.DataFrame(transactions)

# Save DataFrame to the database
df.to_sql(
    name="crypto_transactions",
    con=engine,
    if_exists="replace",
    index=False
)

100

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=c04fa194-d26c-4dab-9fd8-89b201f057ca' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>