In [11]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Master - Environment Setup

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/00_environment_setup.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the FraudFinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

Before you run this notebook, make sure that you have completed the steps in [README](README.md).

In this notebook, you will setup your environment for Fraudfinder to be used in subsequent labs.

This lab uses the following Google Cloud services and resources:

- [Vertex AI](https://cloud.google.com/vertex-ai/)
- [BigQuery](https://cloud.google.com/bigquery/)
- [Google Cloud Storage](https://cloud.google.com/storage)
- [Pub/Sub](https://cloud.google.com/pubsub/)

Steps performed in this notebook:

- Setup your environment.
- Load historical bank transactions into BigQuery.
- Read data from BigQuery tables.
- Read data from Pub/Sub topics, which contain a live stream of new transactions.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage
* Pub/Sub
* BigQuery

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), [Pub/Sub pricing](https://cloud.google.com/pubsub/pricing), [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Install additional packages

Install the following packages required to execute this notebook.

In [33]:
!pip3 install --upgrade --quiet pip
!pip install --upgrade --user --quiet google-cloud-aiplatform
!pip install --upgrade -q -r 'requirements.txt'

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [34]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. ⚠️</b>
</div>


### Setup your environment

Run the next cells to import libraries used in this notebook and configure some options.

Run the next cell to set your project ID and some of the other constants used in the lab.  

### Replace REGION Below ***

<div class="alert alert-block alert-info">
<b>NOTE: REGION will be detected based on the Persistence Resrouces<br>
    You will need to run from this cell and below after Kernel restart
   </b>
</div>


In [53]:
# Persistent Resource ID
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
PERSISTENT_RESOURCE_ID = "ai-takeoff"

# Dynamically retrieve Persistent Resource location
PERSISTENT_RESOURCE_REGION = ""
check_regions = ["us-central1", "asia-southeast1", "europe-west4"]

for region in check_regions:
    shell_output = !gcloud ai persistent-resources list --project=$PROJECT_ID --region=$region
    if "Listed 0 items." not in shell_output:
        print(f"Persistent Resource found in {region}")
        PERSISTENT_RESOURCE_REGION = region

Persistent Resource found in asia-southeast1


In [54]:
import random
import string
from typing import Union

import pandas as pd
from google.cloud import bigquery

# Generate unique ID to help w/ unique naming of certain pieces
ID = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))

# Replace Region here
REGION = PERSISTENT_RESOURCE_REGION

# static parameters

BUCKET_NAME = f"{PROJECT_ID}-fraudfinder-{ID}"
STAGING_BUCKET = f"{PROJECT_ID}-model-upload-{ID}"
AGENT_BUCKET = f"{PROJECT_ID}-ai-workshops"
TRAINING_DS_SIZE = 1000

with open("id_file.txt", "w") as f:
    f.write(ID)

print(f"ID '{ID}' has been saved to id_file.txt")

ID '3iopv' has been saved to id_file.txt


Read the ID file and set value. This step is just to ensure every participants will have unique buckets and configs across the labs

In [55]:
try:
    with open("id_file.txt", "r") as f:
        ID = f.read().strip()
    print(f"Using ID '{ID}' from id_file.txt")
except FileNotFoundError:
    print("id_file.txt not found. Please make sure the file exists.")
    ID = None 

Using ID '3iopv' from id_file.txt


### Create a Google Cloud Storage bucket and save the config data.

Next, we will create a Google Cloud Storage bucket and will save the config data in this bucket. After the cell operation finishes, you can navigate to [Google Cloud Storage](https://console.cloud.google.com/storage/) to see the GCS bucket. 

In [56]:
config = f"""
BUCKET_NAME          = \"{BUCKET_NAME}\"
STAGING_BUCKET       = \"{STAGING_BUCKET}\"
PROJECT              = \"{PROJECT_ID}\"
PROJECT_ID           = \"{PROJECT_ID}\"
REGION               = \"{REGION}\"
ID                   = \"{ID}\"
FEATURESTORE_ID      = \"fraudfinder_{ID}\"
MODEL_NAME           = \"ff_model\"
ENDPOINT_NAME        = \"ff_model_endpoint\"
TRAINING_DS_SIZE     = \"{TRAINING_DS_SIZE}\"
DATA_DIR             = "data"
TRAIN_DATA_DIR       = "train"
CUSTOMER_ENTITY      = "customer"
TERMINAL_ENTITY      = "terminal"
TARGET               = "tx_fraud"
"""

!gsutil mb -l {REGION} gs://{BUCKET_NAME}
!gsutil mb -l {REGION} gs://{STAGING_BUCKET}


!echo '{config}' | gsutil cp - gs://{BUCKET_NAME}/config/notebook_env_{ID}.py
#!echo '{config}' | gsutil cp - gs://{BUCKET_NAME}/config/notebook_env_v02.py

Creating gs://fraud123-438914-fraudfinder-3iopv/...
Creating gs://fraud123-438914-model-upload-3iopv/...
Copying from <STDIN>...
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              


In [57]:
# Gets the default BUCKET_URI and SERVICE_ACCOUNT if they were not specified by the user.
shell_output = ! gcloud projects describe $PROJECT_ID
project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"
print("Using this default Service Account:", SERVICE_ACCOUNT)

Using this default Service Account: 520607199607-compute@developer.gserviceaccount.com


In [58]:
!gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.admin gs://{STAGING_BUCKET}

In [68]:
config = f"""
PROJECT_ID: {PROJECT_ID}
STAGING_BUCKET: {STAGING_BUCKET}
BUCKET_NAME: {BUCKET_NAME}
REGION: {REGION}
ID: {ID}
CUSTOMER_ENTITY_ID: customer
CUSTOMER_ENTITY_ID_FIELD: customer_id
TERMINAL_ENTITY_ID: terminal
TERMINALS_ENTITY_ID_FIELD: terminal_id
FEATURESTORE_ID: "fraudfinder_{ID}"
FEATUREVIEW_ID: "fraudfinder_view_{ID}"
NETWORK: fraud-finder-network
SUBNET: https://www.googleapis.com/compute/v1/projects/fraud-finder-lab/regions/us-central1/subnetworks/us-central1
MODEL_REGISTRY: ff_model
RAW_BQ_TRANSACTION_TABLE_URI: "{PROJECT_ID}.tx.tx"
RAW_BQ_LABELS_TABLE_URI: "{PROJECT_ID}.tx.txlabels"
FEATURES_BQ_TABLE_URI: "{PROJECT_ID}.tx.wide_features_table"
FEATURE_TIME: feature_ts
ONLINE_STORAGE_NODES: 1
SUBSCRIPTION_NAME: ff-tx-for-feat-eng-sub
SUBSCRIPTION_PATH: "projects/{PROJECT_ID}/subscriptions/ff-tx-for-feat-eng-sub"
DROP_COLUMNS:
- timestamp
- entity_type_customer
- entity_type_terminal
FEAT_COLUMNS:
- customer_id_avg_amount_14day_window
- customer_id_avg_amount_15min_window
- customer_id_avg_amount_1day_window
- customer_id_avg_amount_30min_window
- customer_id_avg_amount_60min_window
- customer_id_avg_amount_7day_window
- customer_id_nb_tx_14day_window
- customer_id_nb_tx_15min_window
- customer_id_nb_tx_1day_window
- customer_id_nb_tx_30min_window
- customer_id_nb_tx_60min_window
- customer_id_nb_tx_7day_window
- terminal_id_avg_amount_15min_window
- terminal_id_avg_amount_30min_window
- terminal_id_avg_amount_60min_window
- terminal_id_nb_tx_14day_window
- terminal_id_nb_tx_15min_window
- terminal_id_nb_tx_1day_window
- terminal_id_nb_tx_30min_window
- terminal_id_nb_tx_60min_window
- terminal_id_nb_tx_7day_window
- terminal_id_risk_14day_window
- terminal_id_risk_1day_window
- terminal_id_risk_7day_window
- tx_amount
TARGET_COLUMN: tx_fraud
DATA_SCHEMA:
  timestamp: object
  tx_amount: float64
  tx_fraud: Int64
  entity_type_customer: Int64
  customer_id_nb_tx_1day_window: Int64
  customer_id_nb_tx_7day_window: Int64
  customer_id_nb_tx_14day_window: Int64
  customer_id_avg_amount_1day_window: float64
  customer_id_avg_amount_7day_window: float64
  customer_id_avg_amount_14day_window: float64
  customer_id_nb_tx_15min_window: Int64
  customer_id_avg_amount_15min_window: float64
  customer_id_nb_tx_30min_window: Int64
  customer_id_avg_amount_30min_window: float64
  customer_id_nb_tx_60min_window: Int64
  customer_id_avg_amount_60min_window: float64
  entity_type_terminal: Int64
  terminal_id_nb_tx_1day_window: Int64
  terminal_id_nb_tx_7day_window: Int64
  terminal_id_nb_tx_14day_window: Int64
  terminal_id_risk_1day_window: float64
  terminal_id_risk_7day_window: float64
  terminal_id_risk_14day_window: float64
  terminal_id_nb_tx_15min_window: Int64
  terminal_id_avg_amount_15min_window: float64
  terminal_id_nb_tx_30min_window: Int64
  terminal_id_avg_amount_30min_window: float64
  terminal_id_nb_tx_60min_window: Int64
  terminal_id_avg_amount_60min_window: float64
MODEL_NAME: ff_model
EXPERIMENT_NAME: ff-experiment-{ID}
DATA_URI: gs://{BUCKET_NAME}/data
TRAIN_DATA_URI: gs://{BUCKET_NAME}/data/train
READ_INSTANCES_TABLE: ground_truth_{ID}
READ_INSTANCES_URI: bq://{PROJECT_ID}.tx.ground_truth_{ID}
DATASET_NAME: fraud_finder_dataset_{ID}
JOB_NAME: fraudfinder-train-xgb-{ID}
ENDPOINT_NAME: ff_model_endpoint
MODEL_SERVING_IMAGE_URI: "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest"
IMAGE_REPOSITORY: fraudfinder-{ID}
IMAGE_NAME: dask-xgb-classificator
IMAGE_TAG: latest
IMAGE_URI: {REGION}-docker.pkg.dev/{PROJECT_ID}/fraudfinder-{ID}/dask-xgb-classificator:latest
TRAIN_COMPUTE: e2-standard-4
DEPLOY_COMPUTE: n1-standard-4
BASE_IMAGE: "python:3.10"
PIPELINE_NAME: "fraud-finder-xgb-pipeline-{ID}"
PIPELINE_ROOT: "gs://{BUCKET_NAME}/pipelines"
BQ_DATASET: tx
METRICS_URI: "gs://{BUCKET_NAME}/deliverables/metrics.json"
AVG_PR_THRESHOLD: 0.2
MODEL_THRESHOLD: 0.5
AVG_PR_CONDITION: avg_pr_condition
PERSISTENT_RESOURCE_ID: ai-takeoff
REPLICA_COUNT: 1
SERVICE_ACCOUNT: "{SERVICE_ACCOUNT}"
"""

!echo '{config}' | gsutil cp - gs://{BUCKET_NAME}/config/vertex_conf_{ID}.yaml

Copying from <STDIN>...
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              


### Copy the historical transaction data into BigQuery tables

Now we will copy the historical transaction data and ingest it into BigQuery tables. For this, we will need to run `copy_bigquery_data.py`.

In [60]:
BUCKET_NAME = f"{PROJECT_ID}-fraudfinder-{ID}"

In [61]:
!python3 scripts/user_copy_bigquery_data.py $BUCKET_NAME

File copied from gs://cymbal-fraudfinder/datagen/hacked_customers_history.txt 
		 to gs://fraud123-438914-fraudfinder-3iopv/datagen/hacked_customers_history.txt
File copied from gs://cymbal-fraudfinder/datagen/hacked_terminals_history.txt 
		 to gs://fraud123-438914-fraudfinder-3iopv/datagen/hacked_terminals_history.txt
File copied from gs://cymbal-fraudfinder/datagen/demographics/customer_profiles.csv 
		 to gs://fraud123-438914-fraudfinder-3iopv/datagen/demographics/customer_profiles.csv
File copied from gs://cymbal-fraudfinder/datagen/demographics/terminal_profiles.csv 
		 to gs://fraud123-438914-fraudfinder-3iopv/datagen/demographics/terminal_profiles.csv
File copied from gs://cymbal-fraudfinder/datagen/demographics/customer_with_terminal_profiles.csv 
		 to gs://fraud123-438914-fraudfinder-3iopv/datagen/demographics/customer_with_terminal_profiles.csv


### Check data in BigQuery

After ingesting our data into BigQuery, it's time to run some queries against the tables to inspect the data. You can also go to the [BigQuery console](https://console.cloud.google.com/bigquery) to see the data.

#### Initialize BigQuery SDK for Python 

Use a helper function for sending queries to BigQuery.

In [62]:
# Wrapper to use BigQuery client to run query/job, return job ID or result as DF
def run_bq_query(sql: str) -> Union[str, pd.DataFrame]:
    """
    Run a BigQuery query and return the job ID or result as a DataFrame
    Args:
        sql: SQL query, as a string, to execute in BigQuery
    Returns:
        df: DataFrame of results from query,  or error, if any
    """

    bq_client = bigquery.Client()
    

    # Try dry run before executing query to catch any errors
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    bq_client.query(sql, job_config=job_config)

    # If dry run succeeds without errors, proceed to run query
    job_config = bigquery.QueryJobConfig()
    client_result = bq_client.query(sql, job_config=job_config)

    job_id = client_result.job_id

    # Wait for query/job to finish running. then get & return data frame
    df = client_result.result().to_arrow().to_pandas()
    print(f"Finished job_id: {job_id}")
    return df

#### tx.tx
The `tx.tx` table contains the basic information about each transaction:
- `TX_ID` is a unique ID per transaction
- `TX_TS` is the timestamp of the transaction, in UTC
- `CUSTOMER_ID` is a unique 16-digit string ID per customer
- `TERMINAL_ID` is a unique 16-digit string ID per point-of-sale terminal
- `TX_AMOUNT` is the amount of money spent by the customer at a terminal, in dollars

In [63]:
run_bq_query(
    """
SELECT
  *
FROM
  tx.tx
LIMIT 5
"""
)

Finished job_id: e1cbda80-f936-41ae-ae64-8563e11d7966


Unnamed: 0,TX_ID,TX_TS,CUSTOMER_ID,TERMINAL_ID,TX_AMOUNT
0,d6fa8007650bf4218837a1e04bacb13d2d386f9c,2025-01-29 21:17:19+00:00,27607282987696,64542,63.91
1,1881ead1958805d97b0f69287faf97ad9fbbd608,2025-01-29 13:58:34+00:00,7217273988983228,64542,73.24
2,9063651ebe0bb45a7ed4aa83a28737850c8a3713,2025-01-29 21:57:11+00:00,5615104591987434,64542,15.27
3,6d2976addd8bd8e3552d3aa59184ad2951182366,2025-01-29 08:11:33+00:00,3513261249150140,64542,28.29
4,e9498e0335bb55fa1050b34cc487e05621a5502c,2025-01-29 17:08:28+00:00,3540440915461489,64542,21.99


#### tx.txlabels
The `tx.txlabels` table contains information on whether each transation was fraud or not:
- `TX_ID` is a unique ID per transaction
- `TX_FRAUD` is 1 if the transaction was fraud, and 0 if the transaction was not fraudulent

In [64]:
run_bq_query(
    """
SELECT
  *
FROM
  tx.txlabels
LIMIT 5
"""
)

Finished job_id: 37aec421-0ae1-48c9-aa5e-7f790b43e575


Unnamed: 0,TX_ID,TX_FRAUD
0,ae4ffdb4e76dd9d03bbb56afaf00e80612b30208,0
1,63deb028bfb852e5bea6e55db4d262160d917c4a,0
2,f91a15520108660cbde23c2592c9baaad13f56a3,0
3,27c92da58403b046dee8aa8b3a494dab7d5be06d,0
4,9995e829db91c581efdba7f6bbb8c088e66f8cd0,0


### Check live streaming transactions via public Pub/Sub topics

As part of the [README](README.md), you've created [subscriptions](https://console.cloud.google.com/cloudpubsub/subscription/) to public Pub/Sub topics, where there is a constant flow of new transactions. This means you have, in your own Google Cloud project, subscriptions to the public Pub/Sub topics. You will receive a Pub/Sub message in your subscription every time a new transaction is streamed into the Pub/Sub topic.

There are two public Pub/Sub topics where there is a constant stream of live transactions occurring.

The following Pub/Sub topics are used for transactions:
```
projects/cymbal-fraudfinder/topics/ff-tx
projects/cymbal-fraudfinder/topics/ff-txlabels
```

Note: If you haven't completed the steps in the README, please make sure that you complete them first before continuing this notebook, otherwise you may not have Pub/Sub subscriptions.