# Social Money Network

## Introduction

In this notebook, we're going to generate some data for a social money network.

The network consists of user accounts, merchant accounts, and transactions between them:

```mermaid
graph LR;
    UA[User Account] -->|transaction| MA[Merchant Account];
    MA -->|refund| UA;
    UA -->|transaction| UA[User Account];
```

In [12]:
# import packages and perform basic setup
import datetime
import pandas as pd
import numpy as np
import networkx as nx
import gravis as gv

def viz(G): 
    return gv.three(G, show_node_label = False)

from faker import Faker
fake = Faker()
Faker.seed(42)

import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

from itables import init_notebook_mode
init_notebook_mode(all_interactive=True)

from config import user_db, role, native_app_db, user_db, warehouse, schema

def allcaps_columns(df):
    df_copy = df.copy()
    df_copy.columns = [col.upper() for col in df.columns]
    return df_copy

## Generating Data

In a typical use case, data would already be present in Snowflake. However, to make this notebook self-contained, we're going to generate the data and send it to Snowflake.

In this section we generate data to populate two SQL tables: `accounts` and `transactions`:

In [2]:
# generate data:

# Step 1: Generate IDs for user and merchant accounts
num_users = 100
num_merchants = 10
user_ids = [f'user_{i+1}' for i in range(num_users)]
merchant_ids = [f'merchant_{i+1}' for i in range(num_merchants)]

merchant_name_bank = [
    "Blossom Florist & Gifts",
    "The Rustic Loaf Bakery",
    "Bella's Café & Espresso",
    "Sundown Brewery",
    "Green Thumb Plant Nursery",
    "Crystal Clear Jewelers",
    "Sweet Serenity Chocolates",
    "Old Town Bookstore",
    "The Spinning Wheel Yarn Shop",
    "Blue Ribbon Butchers",
    "Harmony Music Instruments",
    "The Artisan's Workshop",
    "Fireside Antiques & Collectibles",
    "Moonlit Cinema",
    "The Gadget Gizmo",
    "Whispering Pines Tea Room",
    "Vintage Vogue Clothing",
    "The Pepper Pot Spice Emporium",
    "Cedar & Sage Home Décor",
    "Lavender Lane Perfumery"
]

user_names = [fake.name() for _ in range(num_users)]
merchant_names = merchant_name_bank[:num_merchants]

# Step 2: Populate a table with accounts
accounts = pd.DataFrame({
    'account_id': user_ids + merchant_ids,
    'account_type': ['User'] * num_users + ['Merchant'] * num_merchants,
    'name': user_names + merchant_names
})

# Step 3: Establish social structures and preferences
np.random.seed(42)  # For reproducibility
user_friends = {user_id: np.random.choice(user_ids, size=np.random.randint(1, 4), replace=False).tolist() for user_id in user_ids}
user_preferred_merchants = {user_id: np.random.choice(merchant_ids, size=np.random.randint(1, 3), replace=False).tolist() for user_id in user_ids}
dates = [
    datetime.date(2023, 11, 1) +
    datetime.timedelta(days=i)
    for i in range(100)
]

# Step 4: Create a transactions table with rounded monetary amounts and correct refund logic
transactions = []
for user_id in user_ids:
    # Transactions with preferred merchants
    for merchant_id in user_preferred_merchants[user_id]:
        transaction_amount = round(np.random.uniform(10, 200), 2)  # Rounded transaction amount
        transactions.append({
            'from_account': user_id,
            'to_account': merchant_id,
            'amount': transaction_amount,
            'type': 'transaction',
            'date': np.random.choice(dates)
        })

# Generate refunds based on previous transactions
for transaction in transactions.copy():  # Use copy() to iterate over the original list while modifying it
    if transaction['type'] == 'transaction' and transaction['to_account'] in merchant_ids:
        # Assuming a random chance that a transaction will have a refund
        if np.random.rand() > 0.7:  # 30% chance of refund for simplicity
            refund_amount = round(transaction['amount'] * np.random.uniform(0.5, 1.0), 2)  # Partial or full refund
            transactions.append({
                'from_account': transaction['to_account'],
                'to_account': transaction['from_account'],
                'amount': refund_amount,
                'type': 'refund',
                'date': (
                    transaction['date']
                    + datetime.timedelta(days=np.random.poisson(5))
                )
            })

# Transactions between users
for user_id in user_ids:
    for friend_id in user_friends[user_id]:
        if np.random.rand() > 0.5:  # Randomly decide whether a transaction happens
            transactions.append({
                'from_account': user_id,
                'to_account': friend_id,
                'amount': round(np.random.uniform(1, 50), 2),  # Random transaction amount among friends
                'type': 'transaction',
                'date': np.random.choice(dates)
            })

transactions = pd.DataFrame(transactions).sort_values('date').reset_index(drop=True)

### The Tables

Let's take a look at these tables:

In [3]:
accounts

account_id,account_type,name
Loading... (need help?),,


In [4]:
transactions

from_account,to_account,amount,type,date
Loading... (need help?),,,,


## Initial Graph Visualization

In [8]:
# define and visualize the transaction graph:

# Create a graph from the transactions DataFrame
G = nx.DiGraph()  # Directed graph to represent the direction of transactions

# Add nodes with more details
for _, row in accounts.iterrows():
    G.add_node(
        row["account_id"],
        account_type=row["account_type"],
        label=row["name"],
        color="#474B77" if row["account_type"] == "User" else "#E1856C",
        hover='\n'.join(f'{key}: {value}' for key, value in row.to_dict().items())
    )

for _, row in transactions.iterrows():
    # Add edges with more details
    G.add_edge(
        row["from_account"],
        row["to_account"],
        transaction_amount=row["amount"],
        transaction_type=row["type"],
        # Add any other edge-specific details here
    )

# Visualize the graph with Gravis
viz(G)

## Sending the Data to Snowflake

Next we're going to create tables in Snowflake and populate them with the data above.

In [6]:
# Snowflake Setup

def connection(database=native_app_db):
    return snowflake.connector.connect(
        connection_name="default",
        role=role,
        database=database,
        warehouse=warehouse,
        schema=schema,
    )

def sql(code: str, database: str = native_app_db):
    with connection(database) as conn:
        with conn.cursor() as cur:
            result = cur.execute(code)
            try:
                return result.fetch_pandas_all()
            except:
                return result.fetchall()

### Create Tables

In [9]:
create_table_statement = """
CREATE OR REPLACE TABLE accounts (
    account_id VARCHAR PRIMARY KEY,
    account_type VARCHAR NOT NULL,
    name VARCHAR NOT NULL
);
"""

sql(create_table_statement, user_db)

[('Table ACCOUNTS successfully created.',)]

In [19]:
create_table_statement = """
CREATE OR REPLACE TABLE transactions (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    from_account VARCHAR NOT NULL,
    to_account VARCHAR NOT NULL,
    amount DECIMAL NOT NULL,
    type VARCHAR NOT NULL,
    date DATE NOT NULL,
    FOREIGN KEY (from_account) REFERENCES accounts(account_id),
    FOREIGN KEY (to_account) REFERENCES accounts(account_id)
);
"""

sql(create_table_statement, user_db)

[('Table TRANSACTIONS successfully created.',)]

In [20]:
with connection(user_db) as conn:
    write_pandas(
        conn=conn,
        df=allcaps_columns(accounts), 
        table_name="ACCOUNTS",
    )

In [21]:
with connection(user_db) as conn:
    write_pandas(
        conn=conn,
        df=allcaps_columns(transactions),
        table_name="TRANSACTIONS",
    )

Let's check that this worked:

In [22]:
sql("select * from accounts", user_db)

ACCOUNT_ID,ACCOUNT_TYPE,NAME
Loading... (need help?),,


In [23]:
sql("select * from transactions", user_db)

ID,FROM_ACCOUNT,TO_ACCOUNT,AMOUNT,TYPE,DATE
Loading... (need help?),,,,,


## Snowflake-RAI Data Stream

We begin by creating a RelationalAI database in the Native-App-equipped Snowflake database:

In [24]:
sql("CALL API.DELETE_DATABASE('social_money_network');")
sql("CALL API.CREATE_DATABASE('social_money_network');")

[('5abfa1cf-8573-5bd9-6720-623a3f45fdcd',
  'social_money_network',
  datetime.datetime(2024, 2, 16, 7, 48, 59, 648000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>),
  'CREATED')]

In [26]:
#sql("CALL API.DELETE_ENGINE('social_money_network_engine');")
sql("CALL API.CREATE_ENGINE('social_money_network_engine');")

ProgrammingError: 001044 (42P13): SQL compilation error: error line 0 at position -1
Invalid argument types for function 'CREATE_ENGINE': (VARCHAR(27))

Next establish a data stream for these two tables:

In [None]:
sql("ALTER TABLE accounts SET CHANGE_TRACKING = TRUE;", user_db)

In [None]:
sql("ALTER TABLE transactions SET CHANGE_TRACKING = TRUE;", user_db)

In [None]:
sql("""
CALL API.CREATE_DATA_STREAM(
    API.OBJECT_REFERENCE('TABLE','PL_DEMO.PUBLIC.ACCOUNTS'),
    'social_money_network',
    'ACCOUNTS'
);
""")

In [None]:
sql("""
CALL API.CREATE_DATA_STREAM(
    API.OBJECT_REFERENCE('TABLE','PL_DEMO.PUBLIC.TRANSACTIONS'),
    'social_money_network',
    'TRANSACTIONS'
);
""")

We define a procedure called `COMPUTE_PAGERANK_INTO` that uses Rel to compute PageRank and writes the PageRank scores into a new table in the user database:

In [None]:
sql("""
CREATE OR REPLACE PROCEDURE PUBLIC.COMPUTE_PAGERANK_INTO(db varchar, engine varchar, nodes_relation varchar, edges_relation varchar, target varchar)
RETURNS STRING
AS
BEGIN
    CALL NA_APP_WITH_STAGING_VERSIONS_APP.API.EXEC_INTO(
    :db,
    :engine,
    '
    
    // Format relation into src, dst, wgt format
    def weighted_edge_format(src, dst, wgt) {
      src = '||:edges_relation||':SOURCE_ID[pkey],
      dst = '||:edges_relation||':TARGET_ID[pkey],
      wgt = '||:edges_relation||':WEIGHT[pkey]
      from pkey
    }

    def W = weighted_edge_format
    def N = '||:nodes_relation||'
    def A = {(:is_weighted); (:is_directed)}
    def D = {(:weight, W); (:node, N)}
    def G = create_graph[D, A]
    @inline def my_graphlib = rel:graphlib[G]

    def results = {score, node : my_graphlib:pagerank(node, score)}
    ',
    true,
    'results',
    :target,
    
    parse_json('[{"name":"score","type": "FLOAT"},{"name":"node","type": "INT"}]')
    );
END;
""", user_db)

Lastly, we can compute the PageRank values and retrieve them:

In [None]:
sql(f"""
    CALL PUBLIC.COMPUTE_PAGERANK_INTO(
        'social_money_network_engine',
        'accounts',
        'transactions',
        'pagerank_full_results'
    );
""", user_db)    
sql("SELECT * FROM results.pagerank_full_results ORDER BY score DESC;", user_db)