1. Raw data will be stored in BigQuery
2. Features will be calculated using Bigquery SQL
3. Offline Feature Store will be BigQuery
4. Online store will be DataStore
5. Model FrameWork TF, SKLearn
6. Model hosted on GCP AI Platform

In [None]:
!pip install -q feast['gcp']

In [None]:
!pip install -q feast[redis]


In [None]:
!feast version

Project Configuration

In [None]:
PROJECT_ID = "tokyo-country-452614-f7"
BUCKET_NAME = "movielensestorage"
BIGQUERY_DATASET_NAME = "credit_card_dump"
AI_PLATFORM_MODEL_NAME = "Fraud-_detection_model"

!gcloud config set project "$PROJECT_ID"

In [None]:
!echo project_id = $PROJECT_ID 

Create Bucket if not already created

In [None]:
!gsutil mb gs://$BUCKET_NAME

Create BigQuery Dataset IF NOT ALREADY EXISTS

In [None]:
!bq mk $BIGQUERY_DATASET_NAME

Initialize Feature Repository

In [None]:
!feast init fraud_detection -t gcp

Configure Feature Store to run on BQ for batch processing and Datastore for Online predictions

In [None]:
%cd fraud_detection

In [78]:
feature_store = \
f"""project: fraud_detection
registry: gs://{BUCKET_NAME}/registry.db
provider: gcp
offline_store:
  type: bigquery
  dataset: credit_card_dump
  project_id: tokyo-country-452614-f7
online_store:
  type: redis
  redis_type: redis
  connection_string: 10.56.0.6:6379"""

with open('feature_store.yaml',"w") as writer:
    writer.write(feature_store)

In [None]:
%cd feature_repo

In [None]:
%%bigquery

select * from feast-oss.fraud_tutorial.transactions limit 1000

Create frature table using BQSQL

This function will group by user_id for particular day and count number of transactions made on that day. It will run for Past 7 days transactions for each user. 

In [None]:
from datetime import datetime,timedelta
from google.cloud import bigquery
import time

def generate_user_count_features(aggregation_end_date):
    table_id = f"{PROJECT_ID}.{BIGQUERY_DATASET_NAME}.user_count_transactions_7d"
    
    client = bigquery.Client()
    job_config = bigquery.QueryJobConfig(destination=table_id,write_disposition='WRITE_APPEND')
    
    aggregation_start_date = datetime.now() - timedelta(days=7) #April 13 - 7 = April 6
    
    sql = f"""
    SELECT
        src_account AS user_id,
        COUNT(*) AS transaction_count_7d,
        timestamp'{aggregation_end_date.isoformat()}' AS feature_timestamp
    FROM
        feast-oss.fraud_tutorial.transactions
    WHERE
        timestamp BETWEEN TIMESTAMP('{aggregation_start_date.isoformat()}')
        AND TIMESTAMP('{aggregation_end_date.isoformat()}')
    GROUP BY
        user_id
    """
    
    query_job = client.query(sql,job_config=job_config)
    query_job.result()
    print(f"Generated features as of {aggregation_end_date.isoformat()}")
    
    
def backfill_features(earliest_aggregation_end_date,interval,num_iterations):
    aggregation_end_date = earliest_aggregation_end_date  #April 06
    for _ in range(num_iterations):
        generate_user_count_features(aggregation_end_date=aggregation_end_date)
        time.sleep(1)
        aggregation_end_date+=interval
        
if __name__ == '__main__':
    backfill_features(
        earliest_aggregation_end_date=datetime.now() - timedelta(days=7),  #april 06
        interval = timedelta(days=1),
        num_iterations=8
    )
    
    

In [None]:
%%bigquery

select * from tokyo-country-452614-f7.credit_card_dump.user_count_transactions_7d limit 1000

In [None]:
pwd

In [None]:
%%writefile fraud_features.py

from datetime import timedelta
from feast import BigQuerySource, FeatureView, Entity, ValueType

#Add an entity for users
user_entity = Entity(
    name="user_id",
    description="A user that has made or received a transaction",
    value_type=ValueType.STRING
)

#Add a FeatureView based on user_count_transactions_7d table

user_7d_trans_stats_fv = FeatureView(
    name = "user_count_transactions_7d",
    entities=[user_entity],
    ttl=timedelta(weeks=1),
    source=BigQuerySource(
        table=f"tokyo-country-452614-f7.credit_card_dump.user_count_transactions_7d",
        timestamp_field="feature_timestamp"))

#Add two feature views based on existing tables in bigquery

user_account_fv = FeatureView(
    name="user_account_features",
    entities=[user_entity],
    ttl=timedelta(weeks=52),
    source=BigQuerySource(
        table=f"feast-oss.fraud_tutorial.user_account_features",
        timestamp_field="feature_timestamp"))

user_has_fraudulent_transactions_fv = FeatureView(
    name = "user_has_fraudulent_transactions",
    entities=[user_entity],
    ttl=timedelta(weeks=52),
    source=BigQuerySource(
        table=f"feast-oss.fraud_tutorial.user_has_fraudulent_transactions",
        timestamp_field="feature_timestamp"))

In [None]:
!rm example_repo.py

In [46]:
rm -rf .ipynb_checkpoints

In [None]:
!feast apply

In [None]:
from feast import FeatureStore

store = FeatureStore(repo_path=".")
feature_views = store.list_feature_views()

for fv in feature_views:
    print(f"- Name: {fv.name}")
    print(f"  Entities: {fv.entities}")
    print(f"  TTL: {fv.ttl}")
    print(f"  Source: {fv.batch_source.table}")
    print("  Schema:")
    
    for field in fv.schema:
        print(f"    - {field.name}: {field.dtype}")
    print()


In [None]:
from feast import FeatureStore

store = FeatureStore(repo_path=".")
entities = store.list_entities()
feature_views = store.list_feature_views()

for entity in entities:
    print(f"Entity: {entity.name}")
    print(f"  Join Key: {entity.join_key}")
    print(f"  Used in FeatureViews:")

    for fv in feature_views:
        if entity.name in fv.entities:
            print(f"    - FeatureView: {fv.name}")
            for feature in fv.features:
                print(f"        • {feature.name} ({feature.dtype})")


In [None]:
from datetime import datetime,timedelta
from feast import FeatureStore

store = FeatureStore(repo_path=".")
now = datetime.now()
two_days_ago = datetime.now() - timedelta(days=2)

training_data = store.get_historical_features(
    entity_df=f"""
    select 
        src_account as user_id,
        timestamp AS event_timestamp,
        is_fraud
    from
        feast-oss.fraud_tutorial.transactions
    where
        timestamp between timestamp('{two_days_ago.isoformat()}') 
        and timestamp('{now.isoformat()}')""",
    features=[
        "user_count_transactions_7d:transaction_count_7d",
        "user_account_features:credit_score",
        "user_account_features:account_age_days",
        "user_account_features:user_has_2fa_installed",
        "user_has_fraudulent_transactions:user_has_fraudulent_transactions_7d"
    ],
    full_feature_names=True
).to_df()

training_data.head()

Model Training

In [None]:
!pip install scikit-learn xgboost

In [None]:
# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
#from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

training_data.dropna(inplace=True)

# Define features (X) and target (y)
X = training_data[[
    "user_count_transactions_7d__transaction_count_7d", 
    "user_account_features__credit_score",
    "user_account_features__account_age_days",
    "user_account_features__user_has_2fa_installed",
    "user_has_fraudulent_transactions__user_has_fraudulent_transactions_7d"
]]
y = training_data["is_fraud"]

# Split the data into training and test sets (80-20 split)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define models
log_reg_model = LogisticRegression()
rf_model = RandomForestClassifier(n_estimators=100,class_weight='balanced', random_state=42)
#xgb_model = XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)

# Train models
log_reg_model.fit(X_train, y_train)
rf_model.fit(X_train, y_train)

#xgb_model.fit(X_train, y_train)


In [14]:
# Make predictions
log_reg_pred = log_reg_model.predict(X_test)
rf_pred = rf_model.predict(X_test)
#xgb_pred = xgb_model.predict(X_test)

In [None]:
# Evaluate models using accuracy, precision, recall, F1 score, and ROC AUC
def evaluate_model(predictions, model_name):
    accuracy = accuracy_score(y_test, predictions)
    precision = precision_score(y_test, predictions)
    recall = recall_score(y_test, predictions)
    f1 = f1_score(y_test, predictions)
    roc_auc = roc_auc_score(y_test, predictions)
    
    print(f"{model_name} Performance:")
    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall: {recall:.4f}")
    print(f"  F1 Score: {f1:.4f}")
    print(f"  ROC AUC: {roc_auc:.4f}")
    print("-" * 50)

# Evaluate each model
evaluate_model(log_reg_pred, "Logistic Regression")
evaluate_model(rf_pred, "Random Forest")
#evaluate_model(xgb_pred, "XGBoost")


In [16]:
from sklearn import metrics
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

In [None]:
# Print matrix
cm_RF = metrics.confusion_matrix(y_test, rf_pred)
print("Confusion Matrix:")
print(cm_RF)

# Optional: visualize it
disp = ConfusionMatrixDisplay(confusion_matrix=cm_RF)
disp.plot(cmap=plt.cm.Blues)
plt.title("Random Forest Confusion Matrix")
plt.show()

WE have high recall here which is good for fraud detection as we do not want to miss any fradulent ransaction but if any non fraud transaction marked as fraudulet then system can review and proceed further!

In [None]:
import joblib

# Save the model to a local file
joblib.dump(rf_model, "model.joblib")


In [21]:
loaded_model = joblib.load('model.joblib')

In [None]:
y_pred_loaded = loaded_model.predict(X_test)
accuracy_loaded = accuracy_score(y_test, y_pred_loaded)
print(f"Loaded Model Accuracy: {accuracy_loaded:.4f}")

In [None]:
import sklearn
import joblib

# Print version of scikit-learn and joblib
print(f"scikit-learn version: {sklearn.__version__}")
print(f"joblib version: {joblib.__version__}")


In [None]:
# Upload to GCS
!gsutil cp model.joblib gs://$BUCKET_NAME/model_dir/model.joblib


In [None]:
from google.cloud import storage
import joblib

# Initialize the GCS client
client = storage.Client()

# Specify the bucket and model file path
bucket_name = 'movielensestorage'
model_blob_name = 'model_dir/model.joblib'

# Reference the GCS bucket and blob
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(model_blob_name)

# Download the model file to local disk
local_model_path = '/tmp/model.joblib'
blob.download_to_filename(local_model_path)

print(f'Model downloaded to {local_model_path}')


In [None]:
# Load the model using joblib
model2 = joblib.load('/tmp/model.joblib')
print('Model loaded successfully.')

# You can now use the model to make predictions


In [None]:
import pandas as pd

# Prepare your input data (make sure the features match what the model expects)
input_data = pd.DataFrame({
    'user_count_transactions_7d__transaction_count_7d': [10],
    'user_account_features__credit_score': [700],
    'user_account_features__account_age_days': [365],
    'user_account_features__user_has_2fa_installed': [1],
    'user_has_fraudulent_transactions__user_has_fraudulent_transactions_7d': [0]
})

# Make predictions
predictions = model2.predict(input_data)
print(f'Predictions: {predictions}')


In [28]:
from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    location="us-central1",  
    staging_bucket="gs://movielensestorage",
)


In [None]:
model = aiplatform.Model.upload(
    display_name="random-forest-fraud-detection-model7",
    artifact_uri="gs://movielensestorage/model_dir/", 
    serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-5:latest",
    serving_container_environment_variables={
        "MODEL_FILENAME": "model.joblib"
    }
)

In [None]:
endpoint = model.deploy(
    deployed_model_display_name="rf-fraud-detector7",
    machine_type="n1-standard-2",
)


In [None]:
!feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

In [None]:
from google.cloud import aiplatform
from google.cloud.aiplatform.gapic.schema import predict

def predict(entity_rows):
    # Fetch feature data from Feast
    feature_vector = store.get_online_features(
        features=[
            "user_count_transactions_7d:transaction_count_7d",
            "user_account_features:credit_score",
            "user_account_features:account_age_days",
            "user_account_features:user_has_2fa_installed",
            "user_has_fraudulent_transactions:user_has_fraudulent_transactions_7d"
        ],
        entity_rows=entity_rows
    ).to_dict()

    # Delete entity keys
    del feature_vector["user_id"]

    # Flatten the response from Feast
    instances = [
        [feature_values[i] for feature_values in feature_vector.values()]
        for i in range(len(entity_rows))
    ]
    
    ENDPOINT_ID = "8312823576939986944"
    # Initialize Vertex AI SDK with the project and endpoint
    aiplatform.init(project=PROJECT_ID, location="us-central1")  # You can set location if necessary

    # Define the endpoint where the model is deployed
    endpoint = aiplatform.Endpoint(endpoint_name="projects/738666983022/locations/us-central1/endpoints/8312823576939986944")

    # Predict online using the deployed model
    prediction = endpoint.predict(instances=instances)

    # Return the prediction response
    return prediction.predictions


# Test the prediction function with an entity row
response = predict([{"user_id": "v5zlw0"}])
print(response)
