# Setup and Explore Databases

The core infrastructure required to run this lab is already provisioned for you via Terraform, including an AlloyDB cluster and a Spanner instance. This notebook executes the final setup steps required to prepare the database environment and helps you explore the database schemas and data.

## Basic Setup

### Define Notebook Variables

Update the variables below to match your environment. You will be prompted for the AlloyDB password you chose then you provisioned the environment with Terraform.

In [14]:
# Project variables
project_id = "my-project"
region = "my-region"
vpc = "demo-vpc"
gcs_bucket_name = f"project-files-{project_id}"

# AlloyDB variables
alloydb_cluster = "my-alloydb-cluster"
alloydb_instance = "my-alloydb-instance"
alloydb_database = "finance"
alloydb_password = input("Please enter the password for the AlloyDB 'postgres' database user: ")

# Spanner variables
spanner_instance = "my-spanner-instance"
spanner_database = "finance-graph"
session = None

Please enter the password for the AlloyDB 'postgres' database user:  AdkToolbox123


In [3]:
# Set env variable to suppress annoying system warnings when running shell commands
%env GRPC_ENABLE_FORK_SUPPORT=1

env: GRPC_ENABLE_FORK_SUPPORT=1


### Connect to your Google Cloud Project

In [5]:
# Configure gcloud.
!gcloud config set project {project_id}

Updated property [core/project].


### Configure Logging

In [5]:
import logging
import sys

# Configure the root logger to output messages with INFO level or above
logging.basicConfig(level=logging.INFO, stream=sys.stdout, format='%(asctime)s[%(levelname)5s][%(name)14s] - %(message)s',  datefmt='%H:%M:%S', force=True)

### Install Dependencies

In [6]:
! pip install --quiet google-cloud-storage==2.19.0 \
                      asyncpg==0.30.0 \
                      google.cloud.alloydb.connector==1.9.0 

### Define Helper Functions

#### REST API Helper Function

In [16]:
import requests
import google.auth
import json

# Get an access token based upon the current user
creds, _ = google.auth.default()
authed_session = google.auth.transport.requests.AuthorizedSession(creds)
access_token=creds.token

if project_id:
  authed_session.headers.update({"x-goog-user-project": project_id}) # Required to workaround a project quota bug

def rest_api_helper(
    url: str,
    http_verb: str,
    request_body: dict = None,
    params: dict = None,
    session: requests.Session = authed_session,
  ) -> dict:
  """Calls a REST API using a pre-authenticated requests Session."""

  headers = {"Content-Type": "application/json"}

  try:

    if http_verb == "GET":
      response = session.get(url, headers=headers, params=params)
    elif http_verb == "POST":
      response = session.post(url, json=request_body, headers=headers, params=params)
    elif http_verb == "PUT":
      response = session.put(url, json=request_body, headers=headers, params=params)
    elif http_verb == "PATCH":
      response = session.patch(url, json=request_body, headers=headers, params=params)
    elif http_verb == "DELETE":
      response = session.delete(url, headers=headers, params=params)
    else:
      raise ValueError(f"Unknown HTTP verb: {http_verb}")

    # Raise an exception for bad status codes (4xx or 5xx)
    response.raise_for_status()

    # Check if response has content before trying to parse JSON
    if response.content:
        return response.json()
    else:
        return {} # Return empty dict for empty responses (like 204 No Content)

  except requests.exceptions.RequestException as e:
      # Catch potential requests library errors (network, timeout, etc.)
      # Log detailed error information
      print(f"Request failed: {e}")
      if e.response is not None:
          print(f"Request URL: {e.request.url}")
          print(f"Request Headers: {e.request.headers}")
          print(f"Request Body: {e.request.body}")
          print(f"Response Status: {e.response.status_code}")
          print(f"Response Text: {e.response.text}")
          # Re-raise a more specific error or a custom one
          raise RuntimeError(f"API call failed with status {e.response.status_code}: {e.response.text}") from e
      else:
          raise RuntimeError(f"API call failed: {e}") from e
  except json.JSONDecodeError as e:
      print(f"Failed to decode JSON response: {e}")
      print(f"Response Text: {response.text}")
      raise RuntimeError(f"Invalid JSON received from API: {response.text}") from e


#### AlloyDB Helper Function

In [17]:
# Create AlloyDB Query Helper Function
import sqlalchemy
from sqlalchemy import text, exc
import pandas as pd

async def run_alloydb_query(pool, sql: str, params = None, output_as_df: bool = True):
    """Executes a SQL query or statement against the database pool.

    Handles various SQL statements:
    - SELECT/WITH: Returns results as a DataFrame (if output_as_df=True)
      or ResultProxy. Supports parameters. Does not commit.
    - EXPLAIN/EXPLAIN ANALYZE: Executes the explain, returns the query plan
      as a formatted multi-line string. Ignores output_as_df.
      Supports parameters. Does not commit.
    - INSERT/UPDATE/DELETE/CREATE/ALTER etc.: Executes the statement,
      commits the transaction, logs info, and returns the ResultProxy.
      Supports single or bulk parameters (executemany).

    Args:
      pool: An asynchronous SQLAlchemy connection pool.
      sql: A string containing the SQL query or statement template.
      params: Optional.
        - None: Execute raw SQL (Use with caution for non-SELECT/EXPLAIN).
        - dict or tuple: Parameters for a single execution.
        - list of dicts/tuples: Parameters for bulk execution (executemany).
      output_as_df (bool): If True and query is SELECT/WITH, return pandas DataFrame.
                           Ignored for EXPLAIN and non-data-returning statements.

    Returns:
      pandas.DataFrame | str | sqlalchemy.engine.Result | None:
        - DataFrame: For SELECT/WITH if output_as_df=True.
        - str: For EXPLAIN/EXPLAIN ANALYZE, containing the formatted query plan.
        - ResultProxy: For non-SELECT/WITH/EXPLAIN statements, or SELECT/WITH
                       if output_as_df=False.
        - None: If a SQLAlchemy ProgrammingError or other specific error occurs.

    Raises:
        Exception: Catches and logs `sqlalchemy.exc.ProgrammingError`, returning None.
                   May re-raise other database exceptions.

    Example Execution:
      Single SELECT:
        sql_select = "SELECT ticker, company_name from investments LIMIT 5"
        df_result = await run_alloydb_query(pool, sql_select)

      Single non-SELECT - Parameterized (Safe!):
        Parameterized INSERT:
          sql_insert = "INSERT INTO investments (ticker, company_name) VALUES (:ticker, :name)"
          params_insert = {"ticker": "NEW", "name": "New Company"}
          insert_result = await run_alloydb_query(pool, sql_insert, params_insert)

        Parameterized UPDATE:
          sql_update = "UPDATE products SET price = :price WHERE id = :product_id"
          params_update = {"price": 99.99, "product_id": 123}
          update_result = await run_alloydb_query(pool, sql_update, params_update)

      Bulk Update:
        docs = pd.DataFrame([
            {'id': 101, 'sparse_embedding': '[0.1, 0.2]'},
            {'id': 102, 'sparse_embedding': '[0.3, 0.4]'},
            # ... more rows
        ])

        update_sql_template = '''
            UPDATE products
            SET sparse_embedding = :embedding,
                sparse_embedding_model = 'BM25'
            WHERE id = :product_id
        ''' # Using named parameters :param_name

        # Prepare list of dictionaries for params
        data_to_update = [
            {"embedding": row.sparse_embedding, "product_id": row.id}
            for row in docs.itertuples(index=False)
        ]

        if data_to_update:
          bulk_result = await run_alloydb_query(pool, update_sql_template, data_to_update)
          # bulk_result is the SQLAlchemy ResultProxy

    """
    sql_lower_stripped = sql.strip().lower()
    is_select_with = sql_lower_stripped.startswith(('select', 'with'))
    is_explain = sql_lower_stripped.startswith('explain')

    # Determine if the statement is expected to return data rows or a plan
    is_data_returning = is_select_with or is_explain

    # Determine actual DataFrame output eligibility (only for SELECT/WITH)
    effective_output_as_df = output_as_df and is_select_with

    # Check if params suggest a bulk operation (for logging purposes)
    is_bulk_operation = isinstance(params, (list, tuple)) and len(params) > 0 and isinstance(params[0], (dict, tuple, list))

    async with pool.connect() as conn:
        try:
          # Execute with or without params
          if params:
              result = await conn.execute(text(sql), params)
          else:
              # Add warning for raw SQL only if it's NOT data-returning
              #if not is_data_returning:
                  #logging.warning("Executing non-SELECT/EXPLAIN raw SQL without parameters. Ensure SQL is safe.")
              result = await conn.execute(text(sql))

          # --- Handle statements that return data or plan ---
          if is_data_returning:
              if is_explain:
                  # Fetch and format EXPLAIN output as a string
                    try:
                        plan_rows = result.fetchall()
                        # EXPLAIN output is usually text in the first column
                        query_plan = "\n".join([str(row[0]) for row in plan_rows])
                        #logging.info(f"EXPLAIN executed successfully for: {sql[:100]}...")
                        return query_plan
                    except Exception as e:
                        logging.error(f"Error fetching/formatting EXPLAIN result: {e}")
                        return None
              else: # Handle SELECT / WITH
                  if effective_output_as_df:
                      try:
                          rows = result.fetchall()
                          column_names = result.keys()
                          df = pd.DataFrame(rows, columns=column_names)
                          #logging.info(f"SELECT/WITH executed successfully, returning DataFrame for: {sql[:100]}...")
                          return df
                      except Exception as e:
                          logging.error(f"Error converting SELECT result to DataFrame: {e}")
                          logging.info(f"Returning raw ResultProxy for SELECT/WITH due to DataFrame conversion error for: {sql[:100]}...")
                          return result # Fallback to raw result
                  else:
                      # Return raw result proxy for SELECT/WITH if df output not requested
                      #logging.info(f"SELECT/WITH executed successfully, returning ResultProxy for: {sql[:100]}...")
                      return result

          # --- Handle Non-Data Returning Statements (INSERT, UPDATE, DELETE, CREATE, etc.) ---
          else:
              await conn.commit() # Commit changes ONLY for these statements
              operation_type = sql.strip().split()[0].upper()
              row_count = result.rowcount # Note: rowcount behavior varies

              if is_bulk_operation:
                  print(f"Bulk {operation_type} executed for {len(params)} items. Result rowcount: {row_count}")
              elif operation_type in ['INSERT', 'UPDATE', 'DELETE']:
                  print(f"{operation_type} statement executed successfully. {row_count} row(s) affected.")
              else: # CREATE, ALTER, etc.
                  print(f"{operation_type} statement executed successfully. Result rowcount: {row_count}")
              return result # Return the result proxy

        except exc.ProgrammingError as e:
            # Log the error with context
            logging.error(f"SQL Programming Error executing query:\nSQL: {sql[:500]}...\nParams (sample): {str(params)[:500]}...\nError: {e}")
            # Rollback might happen automatically on context exit with error, but explicit can be clearer
            # await conn.rollback() # Consider if needed based on pool/transaction settings
            return None # Return None on handled programming errors
        except Exception as e:
            # Log other unexpected errors
            logging.error(f"An unexpected error occurred during query execution:\nSQL: {sql[:500]}...\nError: {e}")
            # await conn.rollback() # Consider if needed
            raise # Re-raise unexpected errors



#### Spanner Helper Functions

In [18]:
import pandas as pd


def get_spanner_sessions(project_id = project_id, spanner_instance = spanner_instance, spanner_database = spanner_database):

  # Get session
  uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{spanner_instance}/databases/{spanner_database}/sessions"
  response = rest_api_helper(uri, "GET")
  return response


# https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/create
def create_spanner_session(project_id = project_id, spanner_instance = spanner_instance, spanner_database = spanner_database):
  print("No Spanner session found. Creating a new session.")

  # Create a new session
  uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{spanner_instance}/databases/{spanner_database}/sessions"
  params = {
      "database": f"projects/{project_id}/instances/{spanner_instance}/databases/{spanner_database}"
  }
  response = rest_api_helper(uri, "POST", {}, params)
  return response['name']
     

# https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/delete
def close_spanner_session(session, project_id = project_id, spanner_instance = spanner_instance, spanner_database = spanner_database):
  """
  Example:
    response = get_spanner_sessions()
    for session in response['sessions']:
      close_spanner_session(session['name'])
  """

  uri = f"https://spanner.googleapis.com/v1/{session}"
  response = rest_api_helper(uri, "DELETE", {}, {"name": f"{session}"})
  return response
     

def run_spanner_query(sql, spanner_database = spanner_database, query_options=None, create_new_session=False):
  """
  Runs a Spanner query and returns the result.

  Args:
      sql: The SQL query to execute.
      query_options: (Optional) A dictionary of advanced query options.
                    See https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/executeSql#queryoptions
                    for available options.

  Returns:
      A dictionary containing the query results.

  Ref:
      https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/executeSql
  """
  # Ensure a session exists
  # Create session
  global session
  if not session or create_new_session == True:
    session = create_spanner_session()

  # Initialize response vars
  commit_response = ""
  response = ""

  # Construct the request URL
  uri = f"https://spanner.googleapis.com/v1/{session}:executeSql"

  # Set transaction type (readOnly/readWrite) and transaction object with commit type (begin/singleUse)
  transaction_type = "readWrite" if any(x in sql.lower() for x in ["insert", "update", "delete"]) else "readOnly"
  transaction = {"begin": {"readWrite": {}}} if transaction_type == "readWrite" else {"singleUse": {"readOnly": {}}}

  request_body = {
      "sql": sql,
      "transaction": transaction
  }
  params = {
      "session": session
  }

  if query_options:
      request_body["queryOptions"] = query_options

  try:
    # Make the request
    response = rest_api_helper(uri, "POST", request_body=request_body, params = params)

  except RuntimeError as e:
    if "Session not found" in str(e):
      print(f"Session not found. Creating a new session and retrying the query...")
      return run_spanner_query(sql, spanner_database, query_options, create_new_session=True)  # Retry with a new session
    else:
      raise  # Re-raise the exception if it's not a "Session not found" error

  # Commit transaction if read/write
  if transaction_type == "readWrite":
      uri = f"https://spanner.googleapis.com/v1/{session}:commit"
      params = {
          "session": session
      }
      commit_response = rest_api_helper(uri, "POST", {"transactionId": response['metadata']['transaction']['id']}, params)
      print(f"commit_response: {commit_response}")

  # Return a DataFrame if
  if (sql.lower().strip().startswith(("select", "with", "graph"))):
    columns = [field.get('name', 'unnamed_column') for field in response['metadata']['rowType']['fields']]

    # Create DataFrame from rows
    if 'rows' in response:
      df = pd.DataFrame(response['rows'], columns=columns)
      return df
    else:
      return response

  else:
    # Return the query results
    return response


def run_spanner_ddl(ddl_array, project_id = project_id, spanner_instance = spanner_instance, spanner_database = spanner_database):
  # Create tables in Spanner
  # https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.tables/create#try-it

  uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{spanner_instance}/databases/{spanner_database}/ddl"
  http_verb = "PATCH"
  request_body = {
      "statements": ddl_array
  }

  response = rest_api_helper(uri, http_verb, request_body)

  operation_name = response['name']
  uri = f"https://spanner.googleapis.com/v1/{operation_name}"

  while True:
    response = rest_api_helper(uri, "GET", {})
    if response.get("done", False):
      if response.get("error"):
        print(response.get("error"))
      else:
        print("Operation completed successfully.")
      break
    else:
      print("Operation not completed yet.")
      time.sleep(2)

## Finalize AlloyDB Setup

### Connect to the AlloyDB Cluster

This function will create a connection pool to your AlloyDB instance using the AlloyDB Python connector. The AlloyDB Python connector will automatically create secure connections to your AlloyDB instance using mTLS.

In [10]:
import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.alloydb.connector import AsyncConnector, IPTypes

async def init_connection_pool(connector: AsyncConnector, db_name: str = alloydb_database, pool_size: int = 5) -> AsyncEngine:
    # initialize Connector object for connections to AlloyDB
    connection_string = f"projects/{project_id}/locations/{region}/clusters/{alloydb_cluster}/instances/{alloydb_instance}"

    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect(
            connection_string,
            "asyncpg",
            user="postgres",
            password=alloydb_password,
            db=db_name,
            ip_type=IPTypes.PRIVATE, # Optionally use IPTypes.PUBLIC
        )
        return conn

    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
        pool_size=pool_size,
        max_overflow=0,
        isolation_level='AUTOCOMMIT'
    )
    return pool

connector = AsyncConnector()

finance_db_pool = await init_connection_pool(connector, f"{alloydb_database}")

### Create AlloyDB Database

In [22]:
# Use postgres_db_pool to create the database
sql = f"CREATE DATABASE {alloydb_database};"
result = await run_query(postgres_db_pool, sql)

CREATE statement executed successfully. Result rowcount: -1


### Import AlloyDB Data

#### Run the Import

This step generally takes about 5 minutes to complete.

In [33]:
# Reference: https://cloud.google.com/sdk/gcloud/reference/alloydb/clusters/import
!gcloud alloydb clusters import {alloydb_cluster} --region={region} --database={alloydb_database} --gcs-uri=gs://pr-public-demo-data/adk-toolbox-demo/data/finance.sql --sql --user=postgres

I0000 00:00:1751037692.409208    4914 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers


Importing data from cluster...done.                                            
Operation ID: operation-1751037693189-6388f3bb0ddf9-f0f6b5b8-c0fb6d20


#### Validate Row Counts

In [12]:
sql = """
SELECT 'users' AS table_name, (SELECT COUNT(*) FROM users) AS imported_count, 2000 AS target_row_count
UNION ALL
SELECT 'mcc_codes', (SELECT COUNT(*) FROM mcc_codes), 109
UNION ALL
SELECT 'transactions', (SELECT COUNT(*) FROM transactions), 13305915
UNION ALL
SELECT 'fraud_labels', (SELECT COUNT(*) FROM fraud_labels), 8914963
UNION ALL
SELECT 'cards', (SELECT COUNT(*) FROM cards), 6146;
"""

await run_alloydb_query(finance_db_pool, sql)

Unnamed: 0,table_name,imported_count,target_row_count
0,users,2000,2000
1,mcc_codes,109,109
2,transactions,13305915,13305915
3,fraud_labels,8914963,8914963
4,cards,6146,6146


## Explore AlloyDB Data

### Review AlloyDB Schema

The AlloyDB database contains data related to transactions, credit cards, users, mcc codes, and historical fraud cases. See the ERD diagram below for details on the tables, columns, and relationships. 

![AlloyDB Schema](img/alloydb_finance_db_erd.png)

### Run AlloyDB Queries

In [26]:
sql = "SELECT * FROM transactions LIMIT 10;"
await run_alloydb_query(finance_db_pool, sql)

Unnamed: 0,id,date,client_id,card_id,amount,use_chip,merchant_id,merchant_city,merchant_state,zip,mcc,errors
0,7475327,2010-01-01 00:01:00,1556,2972,-77.0,Swipe Transaction,59935,Beulah,ND,58523.0,5499,
1,7475328,2010-01-01 00:02:00,561,4575,14.57,Swipe Transaction,67570,Bettendorf,IA,52722.0,5311,
2,7475329,2010-01-01 00:02:00,1129,102,80.0,Swipe Transaction,27092,Vista,CA,92084.0,4829,
3,7475331,2010-01-01 00:05:00,430,2860,200.0,Swipe Transaction,27092,Crown Point,IN,46307.0,4829,
4,7475332,2010-01-01 00:06:00,848,3915,46.41,Swipe Transaction,13051,Harwood,MD,20776.0,5813,
5,7475333,2010-01-01 00:07:00,1807,165,4.81,Swipe Transaction,20519,Bronx,NY,10464.0,5942,
6,7475334,2010-01-01 00:09:00,1556,2972,77.0,Swipe Transaction,59935,Beulah,ND,58523.0,5499,
7,7475335,2010-01-01 00:14:00,1684,2140,26.46,Online Transaction,39021,ONLINE,,,4784,
8,7475336,2010-01-01 00:21:00,335,5131,261.58,Online Transaction,50292,ONLINE,,,7801,
9,7475337,2010-01-01 00:21:00,351,1112,10.74,Swipe Transaction,3864,Flushing,NY,11355.0,5813,


In [27]:
sql = "SELECT * FROM cards LIMIT 10;"
await run_alloydb_query(finance_db_pool, sql)

Unnamed: 0,id,client_id,card_brand,card_type,card_number,expires,cvv,has_chip,num_cards_issued,credit_limit,acct_open_date,year_pin_last_changed,card_on_dark_web
0,4524,825,Visa,Debit,4344676511950444,12/2022,623,True,2,"$24,295.00",2002-09-01,2008,False
1,2731,825,Visa,Debit,4956965974959986,12/2020,393,True,2,"$21,968.00",2014-04-01,2014,False
2,3701,825,Visa,Debit,4582313478255491,02/2024,719,True,2,"$46,414.00",2003-07-01,2004,False
3,42,825,Visa,Credit,4879494103069057,08/2024,693,False,1,"$12,400.00",2003-01-01,2012,False
4,4659,825,Mastercard,Debit (Prepaid),5722874738736011,03/2009,75,True,1,$28.00,2008-09-01,2009,False
5,4537,1746,Visa,Credit,4404898874682993,09/2003,736,True,1,"$27,500.00",2003-09-01,2012,False
6,1278,1746,Visa,Debit,4001482973848631,07/2022,972,True,2,"$28,508.00",2011-02-01,2011,False
7,3687,1746,Mastercard,Debit,5627220683410948,06/2022,48,True,2,"$9,022.00",2003-07-01,2015,False
8,3465,1746,Mastercard,Debit (Prepaid),5711382187309326,11/2020,722,True,2,$54.00,2010-06-01,2015,False
9,3754,1746,Mastercard,Debit (Prepaid),5766121508358701,02/2023,908,True,1,$99.00,2006-07-01,2012,False


In [28]:
sql = "SELECT * FROM users LIMIT 10;"
await run_alloydb_query(finance_db_pool, sql)

Unnamed: 0,id,current_age,retirement_age,birth_year,birth_month,gender,address,latitude,longitude,per_capita_income,yearly_income,total_debt,credit_score,num_credit_cards
0,825,53,66,1966,11,Female,462 Rose Lane,34.15,-117.76,"$29,278.00","$59,696.00","$127,613.00",787,5
1,1746,53,68,1966,12,Female,3606 Federal Boulevard,40.76,-73.74,"$37,891.00","$77,254.00","$191,349.00",701,5
2,1718,81,67,1938,11,Female,766 Third Drive,34.02,-117.89,"$22,681.00","$33,483.00",$196.00,698,5
3,708,63,63,1957,1,Female,3 Madison Street,40.71,-73.99,"$163,145.00","$249,925.00","$202,328.00",722,4
4,1164,43,70,1976,9,Male,9620 Valley Stream Drive,37.76,-122.44,"$53,797.00","$109,687.00","$183,855.00",675,1
5,68,42,70,1977,10,Male,58 Birch Lane,41.55,-90.6,"$20,599.00","$41,997.00",$0.00,704,3
6,1075,36,67,1983,12,Female,5695 Fifth Street,38.22,-85.74,"$25,258.00","$51,500.00","$102,286.00",672,3
7,1711,26,67,1993,12,Male,1941 Ninth Street,45.51,-122.64,"$26,790.00","$54,623.00","$114,711.00",728,1
8,1116,81,66,1938,7,Female,11 Spruce Avenue,40.32,-75.32,"$26,273.00","$42,509.00","$2,895.00",755,5
9,1752,34,60,1986,1,Female,887 Grant Street,29.97,-92.12,"$18,730.00","$38,190.00","$81,262.00",810,1


In [29]:
sql = "SELECT * FROM mcc_codes LIMIT 10;"
await run_alloydb_query(finance_db_pool, sql)

Unnamed: 0,mcc,description
0,5812,Eating Places and Restaurants
1,5541,Service Stations
2,7996,"Amusement Parks, Carnivals, Circuses"
3,5411,"Grocery Stores, Supermarkets"
4,4784,Tolls and Bridge Fees
5,4900,"Utilities - Electric, Gas, Water, Sanitary"
6,5942,Book Stores
7,5814,Fast Food Restaurants
8,4829,Money Transfer
9,5311,Department Stores


In [30]:
sql = "SELECT * FROM fraud_labels LIMIT 10;"
await run_alloydb_query(finance_db_pool, sql)

Unnamed: 0,transaction_id,is_fraud
0,10649266,False
1,23410063,False
2,9316588,False
3,12478022,False
4,9558530,False
5,12532830,False
6,19526714,False
7,9906964,False
8,13224888,False
9,13749094,False


## Explore Spanner Data

### Review the Spanner Schema

The Spanner dataset is already loaded for you via Terraform automation. It contains data about accounts, account transfers, account audits, loans, and loan repayments.

The schema is defined as follows:

```
CREATE TABLE Account (
  id INT64 NOT NULL,
  create_time TIMESTAMP,
  is_blocked BOOL,
  type STRING(MAX),
) PRIMARY KEY(id);

CREATE TABLE AccountAudits (
  id INT64 NOT NULL,
  audit_timestamp TIMESTAMP,
  audit_details STRING(MAX),
) PRIMARY KEY(id, audit_timestamp),
  INTERLEAVE IN PARENT Account ON DELETE CASCADE;

CREATE TABLE AccountRepayLoan (
  id INT64 NOT NULL,
  loan_id INT64 NOT NULL,
  amount FLOAT64,
  create_time TIMESTAMP NOT NULL,
) PRIMARY KEY(id, loan_id, create_time),
  INTERLEAVE IN PARENT Account ON DELETE CASCADE;

CREATE TABLE AccountTransferAccount (
  id INT64 NOT NULL,
  to_id INT64 NOT NULL,
  amount FLOAT64,
  create_time TIMESTAMP NOT NULL,
) PRIMARY KEY(id, to_id, create_time),
  INTERLEAVE IN PARENT Account ON DELETE CASCADE;

CREATE TABLE Loan (
  id INT64 NOT NULL,
  loan_amount FLOAT64,
  balance FLOAT64,
  create_time TIMESTAMP,
  interest_rate FLOAT64,
) PRIMARY KEY(id);

CREATE TABLE Person (
  id INT64 NOT NULL,
  name STRING(MAX),
) PRIMARY KEY(id);

CREATE TABLE PersonOwnAccount (
  id INT64 NOT NULL,
  account_id INT64 NOT NULL,
  create_time TIMESTAMP,
) PRIMARY KEY(id, account_id),
  INTERLEAVE IN PARENT Person ON DELETE CASCADE;

CREATE PROPERTY GRAPH FinGraph
  NODE TABLES(
    Account
      KEY(id)
      LABEL Account PROPERTIES(
        create_time,
        id,
        is_blocked,
        type),

    Loan
      KEY(id)
      LABEL Loan PROPERTIES(
        balance,
        create_time,
        id,
        interest_rate,
        loan_amount),

    Person
      KEY(id)
      LABEL Person PROPERTIES(
        id,
        name)
  )
  EDGE TABLES(
    AccountRepayLoan
      KEY(id, loan_id, create_time)
      SOURCE KEY(id) REFERENCES Account(id)
      DESTINATION KEY(loan_id) REFERENCES Loan(id)
      LABEL Repays PROPERTIES(
        amount,
        create_time,
        id,
        loan_id),

    AccountTransferAccount
      KEY(id, to_id, create_time)
      SOURCE KEY(id) REFERENCES Account(id)
      DESTINATION KEY(to_id) REFERENCES Account(id)
      LABEL Transfers PROPERTIES(
        amount,
        create_time,
        id,
        to_id),

    PersonOwnAccount
      KEY(id, account_id)
      SOURCE KEY(id) REFERENCES Person(id)
      DESTINATION KEY(account_id) REFERENCES Account(id)
      LABEL Owns PROPERTIES(
        account_id,
        create_time,
        id)
  );
```

Notice that the Spanner schema takes advantage of both the core relational database model, as well as Spanner's built-in property graph model. The high-level graph nodes and edges are visualized below. 

![Spanner Graph Schema](img/spanner_graph_schema.png)

### Run Relational Spanner Queries

In [31]:
sql = "SELECT * FROM Account LIMIT 10;"
run_spanner_query(sql)

Unnamed: 0,id,create_time,is_blocked,type
0,1,2020-01-10T06:22:20.222Z,False,brokerage account
1,2,2020-01-27T17:55:09.206Z,False,prepaid card
2,3,2020-02-18T05:44:20.655Z,False,brokerage account
3,4,2020-02-29T08:49:53.902Z,False,debit card
4,5,2020-03-02T12:47:18.726Z,False,brokerage account
5,6,2020-03-21T15:25:34.327Z,False,custodial account
6,7,2020-04-13T17:53:48.932Z,False,brokerage account
7,8,2020-04-14T20:08:15.427Z,True,trust account
8,9,2020-04-20T06:20:25.717Z,False,certificate of deposit
9,10,2020-04-25T17:12:17.773Z,False,debit card


In [32]:
sql = "SELECT * FROM AccountAudits LIMIT 10;"
run_spanner_query(sql)

Unnamed: 0,id,audit_timestamp,audit_details
0,3,2020-03-29T00:27:36Z,Account password reset successfully.
1,3,2020-07-21T12:26:54Z,Data import completed.
2,3,2021-04-06T09:18:37Z,Support ticket closed.
3,3,2021-06-08T04:45:13Z,Privacy settings updated.
4,3,2021-07-16T10:01:41Z,Subscription renewed.
5,3,2021-10-10T19:27:26Z,Two-factor authentication disabled.
6,3,2021-11-21T06:44:08Z,Account deleted by user.
7,3,2021-12-20T05:03:37Z,Account locked due to multiple failed login at...
8,4,2020-01-14T11:59:52Z,Two-factor authentication enabled.
9,4,2020-07-10T23:35:21Z,API key generated.


In [33]:
sql = "SELECT * FROM AccountRepayLoan LIMIT 10;"
run_spanner_query(sql)

Unnamed: 0,id,loan_id,amount,create_time
0,1,152,36633.6,2022-03-07T21:27:06.265Z
1,1,152,22836.6,2022-05-02T05:40:52.562Z
2,5,22,15999.3,2022-03-14T14:28:25.972Z
3,5,22,32920.2,2022-03-14T14:56:12.148Z
4,7,171,41031.0,2022-01-22T13:58:10.191Z
5,7,171,27316.8,2022-01-28T21:11:00.03Z
6,12,12,23469.3,2022-02-25T09:18:44.347Z
7,13,80,47489.4,2021-10-07T03:12:58.076Z
8,16,41,55886.4,2021-11-01T03:31:23.627Z
9,22,75,10572.3,2022-02-02T14:31:15.764Z


In [34]:
sql = "SELECT * FROM AccountTransferAccount LIMIT 10;"
run_spanner_query(sql)

Unnamed: 0,id,to_id,amount,create_time
0,1,630,3763.8,2022-02-06T19:47:40.868Z
1,3,362,7196.4,2021-09-17T05:39:10.898Z
2,8,92,3861.0,2022-02-12T23:49:27.801Z
3,8,446,7475.4,2021-06-20T16:00:10.768Z
4,10,427,1664.1,2021-11-11T02:38:05.14Z
5,11,8,5501.7,2021-09-28T20:34:03.612Z
6,11,689,3934.8,2021-10-17T11:32:39.602Z
7,16,372,1791.0,2021-11-30T05:02:50.894Z
8,17,186,1194.3,2022-02-14T09:47:39.007Z
9,24,248,5845.5,2021-09-24T20:06:47.816Z


In [35]:
sql = "SELECT * FROM Loan LIMIT 10;"
run_spanner_query(sql)

Unnamed: 0,id,loan_amount,balance,create_time,interest_rate
0,1,2022278.5,123359.0,2020-03-18T16:42:57.719Z,0.064
1,2,96346.3,4721.0,2020-03-23T19:03:05.567Z,0.097
2,3,469577.7,32400.9,2020-04-12T13:48:43.486Z,0.089
3,4,2473730.7,74211.9,2020-04-22T15:58:41.804Z,0.053
4,5,975530.0,35119.1,2020-04-29T10:25:02.334Z,0.098
5,6,2121293.2,86973.0,2020-05-02T13:46:09.681Z,0.055
6,7,1997288.1,71902.4,2020-05-09T14:11:12.092Z,0.067
7,8,340396.3,13956.2,2020-05-09T21:11:49.05Z,0.018
8,9,1087311.7,53278.3,2020-05-16T10:13:16.034Z,0.064
9,10,489205.4,11251.7,2020-05-25T10:20:29.381Z,0.032


In [36]:
sql = "SELECT * FROM Person LIMIT 10;"
run_spanner_query(sql)

Unnamed: 0,id,name
0,1,Bertrand
1,2,Menville
2,3,Rowe
3,4,Rowlands
4,5,Kerschensteiner
5,6,Abutalybov
6,7,Skala
7,8,Bukofzer
8,9,Shabazz
9,10,Zakharov


In [37]:
sql = "SELECT * FROM Account LIMIT 10;"
run_spanner_query(sql)

Unnamed: 0,id,create_time,is_blocked,type
0,1,2020-01-10T06:22:20.222Z,False,brokerage account
1,2,2020-01-27T17:55:09.206Z,False,prepaid card
2,3,2020-02-18T05:44:20.655Z,False,brokerage account
3,4,2020-02-29T08:49:53.902Z,False,debit card
4,5,2020-03-02T12:47:18.726Z,False,brokerage account
5,6,2020-03-21T15:25:34.327Z,False,custodial account
6,7,2020-04-13T17:53:48.932Z,False,brokerage account
7,8,2020-04-14T20:08:15.427Z,True,trust account
8,9,2020-04-20T06:20:25.717Z,False,certificate of deposit
9,10,2020-04-25T17:12:17.773Z,False,debit card


### Run Spanner Graph Queries

Read more about running graph queries on Spanner in the [Spanner docs](https://cloud.google.com/spanner/docs/graph/queries-overview).

In [38]:
sql = """
GRAPH FinGraph
MATCH
  (person:Person {name: "Jacoby"})-[own:Owns]->
  (account:Account)-[repay:Repays]->(loan:Loan)
RETURN
  account.id AS account_id,
  repay.create_time AS repay_time,
  repay.amount AS loan_repay_amount,
  loan.id AS loan_id,
  loan.loan_amount AS loan_amount
ORDER BY repay.create_time;
"""
run_spanner_query(sql)

Unnamed: 0,account_id,repay_time,loan_repay_amount,loan_id,loan_amount
0,569,2021-06-04T21:21:02.395Z,54792.0,110,741791
1,569,2021-10-16T10:26:12.783Z,16573.5,110,741791
2,569,2021-12-06T12:22:39.682Z,10248.3,110,741791
3,569,2022-02-27T11:00:17.263Z,27054.0,110,741791
4,569,2022-03-23T19:33:27.261Z,32437.8,110,741791
5,569,2022-04-11T14:36:17.742Z,32462.1,110,741791
6,569,2022-05-02T09:21:18.264Z,43720.2,110,741791


In [39]:
sql = """
GRAPH FinGraph
MATCH ANY SHORTEST
  (src_accnt:Account {id:75} )-[transfers:Transfers]->{3,6}
  (dst_accnt:Account {id:199})
RETURN
  ARRAY_LENGTH(transfers) AS num_hops,
  TO_JSON(transfers) AS transfer_edges;
"""

run_spanner_query(sql)

Unnamed: 0,num_hops,transfer_edges
0,4,"[{""destination_node_identifier"":""mUZpbkdyYXBoL..."
