diff --git a/advanced_tutorials/aml/1_aml_feature_pipeline.ipynb b/advanced_tutorials/aml/1_aml_feature_pipeline.ipynb new file mode 100644 index 00000000..3f08c093 --- /dev/null +++ b/advanced_tutorials/aml/1_aml_feature_pipeline.ipynb @@ -0,0 +1,701 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "2768c6ed", + "metadata": {}, + "source": [ + "# **Hopsworks Feature Store** - Part 01: Load, Engineer & Connect\n", + "\n", + " This is the first part of the AML tutorial. As part of this first module, you will work with data related to credit card transactions. \n", + "The objective of this tutorial is to demonstrate how to work with the **Hopworks Feature Store** with a goal of training and deploying a model that can predict fraudulent transactions.\n", + "\n", + "## **๐Ÿ—’๏ธ This notebook is divided into the following sections:** \n", + "1. **Data Loading**: Load the data. \n", + "2. **Feature Engineering**.\n", + "2. **Hopsworks Feature Store Connection**.\n", + "3. **Feature Groups Creation**: Create feature groups and upload them to the feature store.\n", + "4. **Explore feature groups from the UI**.\n", + "\n", + "![tutorial-flow](../../images/01_featuregroups.png)\n", + "\n", + "First of all we will load the data and do some feature engineering on it." + ] + }, + { + "cell_type": "markdown", + "id": "6a080c46", + "metadata": {}, + "source": [ + "## ๐Ÿ“ Imports " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2d4d463b", + "metadata": {}, + "outputs": [], + "source": [ + "import hashlib\n", + "import datetime\n", + "import pandas as pd\n", + "import numpy as np\n", + "\n", + "from pprint import pprint\n", + "\n", + "from features.transactions import get_in_out_transactions\n", + "from features.party import get_transaction_labels, get_party_labels\n", + "from features.graph_embeddings import construct_graph" + ] + }, + { + "cell_type": "markdown", + "id": "e0f1d96b", + "metadata": {}, + "source": [ + "## ๐Ÿ’ฝ Loading the Data \n", + "\n", + "The data you will use comes from three different CSV files:\n", + "\n", + "- `transactions.csv`: Transaction information such as timestamp, location, and the amount. \n", + "- `alert_transactions.csv`: Suspicious Activity Report (SAR) transactions.\n", + "- `party.csv`: User profile information.\n", + "\n", + "In a production system, these CSV files would originate from separate data sources or tables, and probably separate data pipelines. **All three files have a customer id column `id` in common, which we can use for joins.**\n", + "\n", + "Let's go ahead and load the data." + ] + }, + { + "cell_type": "markdown", + "id": "99116f96", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Transactions dataset " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4a81073", + "metadata": {}, + "outputs": [], + "source": [ + "transactions_df = pd.read_csv(\n", + " \"https://repo.hops.works/master/hopsworks-tutorials/data/aml/transactions.csv\", \n", + " parse_dates = ['tran_timestamp'],\n", + ")\n", + "transactions_df.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "30215a9f", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Alert Transactions dataset " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d21c7628", + "metadata": {}, + "outputs": [], + "source": [ + "alert_transactions = pd.read_csv(\n", + " \"https://repo.hops.works/master/hopsworks-tutorials/data/aml/alert_transactions.csv\",\n", + ")\n", + "alert_transactions.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "c73471c8", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Party dataset " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47153ce0", + "metadata": {}, + "outputs": [], + "source": [ + "party = pd.read_csv(\n", + " \"https://repo.hops.works/master/hopsworks-tutorials/data/aml/party.csv\",\n", + ")\n", + "party.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "f17f91ea", + "metadata": {}, + "source": [ + "## ๐Ÿ› ๏ธ Feature Engineering \n", + "\n", + "To investigate patterns of suspicious activities you will make time window aggregates such monthly frequency, total, mean and standard deviation of amount of incoming and outgoing transasactions. \n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b036749b", + "metadata": {}, + "outputs": [], + "source": [ + "# Renaming columns for clarity\n", + "transactions_df.columns = ['tran_id', 'tx_type', 'base_amt', 'tran_timestamp', 'source', 'target']\n", + "\n", + "# Reordering columns for better readability\n", + "transactions_df = transactions_df[[\"source\", \"target\", \"tran_timestamp\", \"tran_id\", \"base_amt\"]]\n", + "\n", + "# Displaying the first few rows of the DataFrame\n", + "transactions_df.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "3877ecea", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Incoming and Outgoing transactions " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "98608cf2", + "metadata": {}, + "outputs": [], + "source": [ + "# Generating a DataFrame with monthly incoming and outgoing transaction statistics\n", + "in_out_df = get_in_out_transactions(transactions_df)\n", + "\n", + "# Displaying the first few rows of the resulting DataFrame\n", + "in_out_df.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "3502fab3", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Transactions identified as suspicious activity \n", + "\n", + "Assign labels to transactions that were identified as suspicius activity." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5d5dccaa", + "metadata": {}, + "outputs": [], + "source": [ + "# Displaying the first few rows of the 'alert_transactions' DataFrame\n", + "alert_transactions.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9b862b99", + "metadata": {}, + "outputs": [], + "source": [ + "# Generating transaction labels based on transaction and alert transaction data\n", + "transaction_labels = get_transaction_labels(\n", + " transactions_df, \n", + " alert_transactions,\n", + ")\n", + "\n", + "# Displaying the first three rows of the resulting DataFrame\n", + "transaction_labels.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "1d5cf91f", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Party dataset \n", + "\n", + "Now lets prepare profile (party) dataset and assign lables whether they have been reported for suspicius activity or not." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f812069f", + "metadata": {}, + "outputs": [], + "source": [ + "# Renaming columns for clarity\n", + "party.columns = [\"id\", \"type\"]\n", + "\n", + "# Mapping 'type' values to numerical values for better representation\n", + "party.type = party.type.map({\"Individual\": 0, \"Organization\": 1})\n", + "\n", + "# Displaying the first three rows of the DataFrame\n", + "party.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3be307af", + "metadata": {}, + "outputs": [], + "source": [ + "# Filtering transactions with SAR(Suspicious Activity Reports) labels from the generated transaction labels DataFrame\n", + "alert_transactions = transaction_labels[transaction_labels.is_sar == 1]\n", + "\n", + "# Displaying the first few rows of transactions flagged as SAR\n", + "alert_transactions.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d5eee83d", + "metadata": {}, + "outputs": [], + "source": [ + "# Generating party labels based on transaction labels and party information\n", + "party_labels = get_party_labels(\n", + " transaction_labels, \n", + " party,\n", + ")\n", + "\n", + "# Displaying the first three rows of the resulting DataFrame\n", + "party_labels.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "1534d0db", + "metadata": {}, + "source": [ + "## ๐Ÿงฌ Graph representational learning using Graph Neural Network\n", + "\n", + "Finanial transactions can be represented as a dynamic network graph. Using technique of graph representation \n", + "give as opportunity to represent transaction with a broader context. In this example you will perfom node \n", + "representation learning. \n", + "\n", + "Network architecture of the graph convolution layer for learning node represantion learning was taken from \n", + "[this Keras example](https://keras.io/examples/graph/gnn_citations/). It performs the following steps:\n", + "\n", + "1. **Prepare**: The input node representations are processed using a FFN to produce a *message*. You can simplify\n", + "the processing by only applying linear transformation to the representations.\n", + "2. **Aggregate**: The messages of the neighbours of each node are aggregated with\n", + "respect to the `edge_weights` using a *permutation invariant* pooling operation, such as *sum*, *mean*, and *max*,\n", + "to prepare a single aggregated message for each node. See, for example, [tf.math.unsorted_segment_sum](https://www.tensorflow.org/api_docs/python/tf/math/unsorted_segment_sum)\n", + "APIs used to aggregate neighbour messages.\n", + "3. **Update**: The `node_repesentations` and `aggregated_messages`โ€”both of shape `[num_nodes, representation_dim]`โ€”\n", + "are combined and processed to produce the new state of the node representations (node embeddings).\n", + "If `combination_type` is `gru`, the `node_repesentations` and `aggregated_messages` are stacked to create a sequence,\n", + "then processed by a GRU layer. Otherwise, the `node_repesentations` and `aggregated_messages` are added\n", + "or concatenated, then processed using a FFN.\n" + ] + }, + { + "cell_type": "markdown", + "id": "d92c001c", + "metadata": {}, + "source": [ + "### ๐Ÿ”ฎ Compute time evolving graph embeddings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c9e66a28", + "metadata": {}, + "outputs": [], + "source": [ + "# Grouping transaction labels by month using pandas Grouper\n", + "transaction_graphs_by_month = transaction_labels.groupby(\n", + " pd.Grouper(key='tran_timestamp', freq='M')\n", + ").apply(lambda x: construct_graph(x, party_labels))\n", + "\n", + "# The resulting variable 'transaction_graphs_by_month' is a pandas DataFrame\n", + "# where each row corresponds to a month, and the 'graph_embeddings' column contains\n", + "# the node embeddings generated for each month using the 'construct_graph' function.\n", + "# The embeddings capture the graph structure of transactions during that month." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "56cb5e76", + "metadata": {}, + "outputs": [], + "source": [ + "# Extracting timestamps and graph embeddings\n", + "timestamps = transaction_graphs_by_month.index.values\n", + "graph_embeddings = transaction_graphs_by_month.tolist()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76882c27", + "metadata": {}, + "outputs": [], + "source": [ + "# Creating an empty DataFrame to store graph embeddings\n", + "graph_embeddings_df = pd.DataFrame()\n", + "\n", + "# Iterating through timestamps and corresponding graph embeddings\n", + "for timestamp, graph_embedding in zip(timestamps, graph_embeddings):\n", + " # Creating a temporary DataFrame for each month's graph embeddings\n", + " df_tmp = pd.DataFrame(graph_embedding)\n", + " \n", + " # Adding a 'tran_timestamp' column to store the timestamp for each row\n", + " df_tmp[\"tran_timestamp\"] = timestamp\n", + " \n", + " # Concatenating the temporary DataFrame to the main DataFrame\n", + " graph_embeddings_df = pd.concat([graph_embeddings_df, df_tmp])\n", + "\n", + "# Displaying the first three rows of the resulting DataFrame\n", + "graph_embeddings_df.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a247a981", + "metadata": {}, + "outputs": [], + "source": [ + "# Converting 'tran_timestamp' values to milliseconds for consistency\n", + "transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6\n", + "graph_embeddings_df.tran_timestamp = graph_embeddings_df.tran_timestamp.values.astype(np.int64) // 10 ** 6\n", + "\n", + "# Converting 'tran_timestamp' values in 'party_labels' to milliseconds\n", + "party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)\n", + "party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)" + ] + }, + { + "cell_type": "markdown", + "id": "67d4654e", + "metadata": {}, + "source": [ + "## ๐Ÿ‘ฎ๐Ÿปโ€โ™‚๏ธ Data Validation\n", + "\n", + "Before you define [feature groups](https://docs.hopsworks.ai/latest/generated/feature_group/) lets define [validation rules](https://docs.hopsworks.ai/latest/generated/feature_validation/) for features. You do expect some of the features to comply with certain *rules* or *expectations*. For example: a transacted amount must be a positive value. In the case of a transacted amount arriving as a negative value you can decide whether to stop it to `write` into a feature group and throw an error or allow it to be written but provide a warning. In the next section you will create feature store `expectations`, attach them to feature groups, and apply them to dataframes being appended to said feature group.\n", + "\n", + "#### Data validation with Greate Expectations in Hopsworks\n", + "You can use GE library for validation in Hopsworks features store. \n", + "\n", + "## Hopsworks feature store\n", + "\n", + "The Hopsworks feature feature store library is Apache V2 licensed and available [here](https://github.com/logicalclocks/feature-store-api). The library is currently available for Python and JVM languages such as Scala and Java.\n", + "In this notebook, we are going to cover Python part.\n", + "\n", + "You can find the complete documentation of the library here: \n", + "\n", + "The first step is to establish a connection with your Hopsworks feature store instance and retrieve the object that represents the feature store you'll be working with. \n", + "\n", + "> By default `project.get_feature_store()` returns the feature store of the project we are working with. However, it accepts also a project name as parameter to select a different feature store." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76afb55e", + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install --quiet hopsworks" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "367782dd", + "metadata": {}, + "outputs": [], + "source": [ + "import hopsworks\n", + "\n", + "project = hopsworks.login()\n", + "\n", + "fs = project.get_feature_store()" + ] + }, + { + "cell_type": "markdown", + "id": "e9ed49fa", + "metadata": {}, + "source": [ + "## ๐Ÿ”ฌ Expectations suite\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d210009e", + "metadata": {}, + "outputs": [], + "source": [ + "import great_expectations as ge" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca968941", + "metadata": {}, + "outputs": [], + "source": [ + "# Creating an Expectation Suite named \"aml_project_validations\"\n", + "expectation_suite = ge.core.ExpectationSuite(\n", + " expectation_suite_name=\"aml_project_validations\",\n", + ")\n", + "\n", + "# Displaying the JSON representation of the Expectation Suite\n", + "pprint(expectation_suite.to_json_dict(), indent=2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fe358b73", + "metadata": {}, + "outputs": [], + "source": [ + "# Adding an expectation to the Expectation Suite\n", + "expectation_suite.add_expectation(\n", + " ge.core.ExpectationConfiguration(\n", + " expectation_type=\"expect_column_max_to_be_between\",\n", + " kwargs={\n", + " \"column\": \"monthly_in_count\", \n", + " \"min_value\": 0, \n", + " \"max_value\": 10000000,\n", + " }\n", + " )\n", + ")\n", + "\n", + "# Displaying the updated Expectation Suite\n", + "pprint(expectation_suite.to_json_dict(), indent=2)" + ] + }, + { + "cell_type": "markdown", + "id": "23f3ea2c", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## ๐Ÿช„ Feature Groups Creation\n", + "\n", + "### Feature Groups\n", + "\n", + "A `Feature Groups` is a logical grouping of features, and experience has shown, that this grouping generally originates from the features being derived from the same data source. The `Feature Group` lets you save metadata along features, which defines how the Feature Store interprets them, combines them and reproduces training datasets created from them.\n", + "\n", + "Generally, the features in a feature group are engineered together in an ingestion job. However, it is possible to have additional jobs to append features to an existing feature group. Furthermore, `feature groups` provide a way of defining a namespace for features, such that you can define features with the same name multiple times, but uniquely identified by the group they are contained in.\n", + "\n", + "> It is important to note that `feature groups` are not groupings of features for immediate training of Machine Learning models. Instead, to ensure reusability of features, it is possible to combine features from any number of groups into training datasets." + ] + }, + { + "cell_type": "markdown", + "id": "872c2456", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Transactions monthly aggregates Feature Group\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81b797fa", + "metadata": {}, + "outputs": [], + "source": [ + "# Get or create the 'transactions_monthly' feature group\n", + "transactions_fg = fs.get_or_create_feature_group(\n", + " name=\"transactions_monthly\",\n", + " version=1,\n", + " primary_key=[\"id\"],\n", + " partition_key=[\"tran_timestamp\"], \n", + " description=\"transactions monthly aggregates features\",\n", + " event_time=['tran_timestamp'],\n", + " online_enabled=True,\n", + " stream=True,\n", + " statistics_config={\n", + " \"enabled\": True, \n", + " \"histograms\": True, \n", + " \"correlations\": True, \n", + " \"exact_uniqueness\": False,\n", + " },\n", + " expectation_suite=expectation_suite,\n", + ") \n", + "# Insert data into the feature group\n", + "transactions_fg.insert(in_out_df)" + ] + }, + { + "cell_type": "markdown", + "id": "eef54d53", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Party Feature Group" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2685d22f", + "metadata": {}, + "outputs": [], + "source": [ + "# Get or create the 'party_labels' feature group\n", + "party_fg = fs.get_or_create_feature_group(\n", + " name = \"party_labels\",\n", + " version = 1,\n", + " primary_key = [\"id\"],\n", + " description = \"party fg with labels\",\n", + " event_time = ['tran_timestamp'], \n", + " online_enabled = True,\n", + " stream=True,\n", + " statistics_config = {\n", + " \"enabled\": True, \n", + " \"histograms\": True, \n", + " \"correlations\": True, \n", + " \"exact_uniqueness\": False,\n", + " },\n", + ")\n", + "# Insert data into the feature group\n", + "party_fg.insert(party_labels)" + ] + }, + { + "cell_type": "markdown", + "id": "68abb141", + "metadata": {}, + "source": [ + "### โ›ณ๏ธ Graph embeddings Feature Group" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6ad641ae", + "metadata": {}, + "outputs": [], + "source": [ + "from hsfs import engine\n", + "features = engine.get_instance().parse_schema_feature_group(graph_embeddings_df)\n", + "for f in features:\n", + " if f.type == \"array\":\n", + " f.online_type = \"VARBINARY(200)\" " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "608667aa", + "metadata": {}, + "outputs": [], + "source": [ + "# Get or create the 'graph_embeddings' feature group\n", + "graph_embeddings_fg = fs.get_or_create_feature_group(\n", + " name=\"graph_embeddings\",\n", + " version=1,\n", + " primary_key=[\"id\"],\n", + " description=\"node embeddings from transactions graph\",\n", + " event_time = ['tran_timestamp'], \n", + " online_enabled=True, \n", + " stream=True,\n", + " statistics_config={\n", + " \"enabled\": False, \n", + " \"histograms\": False, \n", + " \"correlations\": False, \n", + " \"exact_uniqueness\": False,\n", + " },\n", + " features=features,\n", + ")\n", + "# Insert data into the feature group\n", + "graph_embeddings_fg.insert(graph_embeddings_df)" + ] + }, + { + "cell_type": "markdown", + "id": "d35b4de9", + "metadata": {}, + "source": [ + "---\n", + "## ๐Ÿ‘“ Exploration \n", + "\n", + "### Feature groups are now accessible and searchable in the UI\n", + "![fg-overview](images/fg_explore.gif)\n", + "\n", + "## ๐Ÿ“Š Statistics\n", + "We can explore feature statistics in the feature groups. If statistics was not enabled when feature group was created then this can be done by:\n", + "\n", + "```python\n", + "transactions_fg = fs.get_or_create_feature_group(\n", + " name = \"transactions_monthly_fg\", \n", + " version = 1)\n", + "\n", + "transactions_fg.statistics_config = {\n", + " \"enabled\": True,\n", + " \"histograms\": True,\n", + " \"correlations\": True\n", + "}\n", + "\n", + "transactions_fg.update_statistics_config()\n", + "transactions_fg.compute_statistics()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "77d1f19c", + "metadata": {}, + "source": [ + "![fg-stats](images/freature_group_stats.gif)" + ] + }, + { + "cell_type": "markdown", + "id": "df0f9e3e", + "metadata": {}, + "source": [ + "---\n", + "## โญ๏ธ **Next:** Part 02 \n", + " \n", + "In the next notebook you will create a training dataset, train and deploy a trained model." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/advanced_tutorials/aml/2_aml_training_pipeline.ipynb b/advanced_tutorials/aml/2_aml_training_pipeline.ipynb new file mode 100644 index 00000000..6737cafe --- /dev/null +++ b/advanced_tutorials/aml/2_aml_training_pipeline.ipynb @@ -0,0 +1,738 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# **Hopsworks Feature Store** - Part 02: Model training\n", + "\n", + " This notebook explains how to read from a feature group and create training dataset within the feature store. You will train a model on the created training dataset. You will train your model using TensorFlow, although it could just as well be trained with other machine learning frameworks such as Scikit-learn, Keras, and PyTorch. You will also see some of the exploration that can be done in Hopsworks, notably the search functions and the lineage.\n", + "\n", + "## **๐Ÿ—’๏ธ This notebook is divided into the following steps:** \n", + "\n", + "1. **Feature Selection**: Select the features you want to train your model on.\n", + "2. **Feature Transformation**: How the features should be preprocessed.\n", + "3. **Training Dataset Creation**: Create a dataset for training anomaly detection model.\n", + "2. **Model Training**: Train your anomaly detection model.\n", + "3. **Model Registry**: Register model to Hopsworks model registry.\n", + "4. **Model Deployment**: Deploy the model for real-time inference.\n", + "\n", + "![tutorial-flow](../../images/02_training-dataset.png) " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ“ Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import ast\n", + "import numpy as np\n", + "import pandas as pd\n", + "import tensorflow as tf\n", + "import os\n", + "\n", + "from anomaly_detection import GanEncAnomalyDetector" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ“ก Connecting to Hopsworks Feature Store " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import hopsworks\n", + "\n", + "project = hopsworks.login()\n", + "\n", + "# Get the feature store handle for the project's feature store\n", + "fs = project.get_feature_store()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ”ช Feature Selection \n", + "\n", + "You start by selecting all the features you want to include for model training/inference." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Retrieve Feature Groups\n", + "transactions_monthly_fg = fs.get_feature_group(\n", + " name=\"transactions_monthly\", \n", + " version=1,\n", + ")\n", + "\n", + "graph_embeddings_fg = fs.get_feature_group(\n", + " name=\"graph_embeddings\", \n", + " version=1,\n", + ") \n", + "\n", + "party_fg = fs.get_feature_group(\n", + " name=\"party_labels\", \n", + " version=1,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Select features for training data\n", + "selected_features = transactions_monthly_fg.select(\n", + " [\n", + " \"monthly_in_count\", \n", + " \"monthly_in_total_amount\", \n", + " \"monthly_in_mean_amount\", \n", + " \"monthly_in_std_amount\", \n", + " \"monthly_out_count\", \n", + " \"monthly_out_total_amount\", \n", + " \"monthly_out_mean_amount\", \n", + " \"monthly_out_std_amount\",\n", + " ]\n", + ").join(\n", + " graph_embeddings_fg.select([\"graph_embeddings\"]),\n", + ").join(\n", + " party_fg.select([\"type\", \"is_sar\"]), \n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Uncomment this if you would like to view your selected features\n", + "# selected_features.show(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### ๐Ÿค– Transformation Functions \n", + "\n", + "Transformation functions are a mathematical mapping of input data that may be stateful - requiring statistics from the partent feature view (such as number of instances of a category, or mean value of a numerical feature)\n", + "\n", + "We will preprocess our data using *min-max scaling* on numerical features and *label encoding* on categorical features. To do this we simply define a mapping between our features and transformation functions. This ensures that transformation functions such as *min-max scaling* are fitted only on the training data (and not the validation/test data), which ensures that there is no data leakage." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load built in transformation functions.\n", + "min_max_scaler = fs.get_transformation_function(name=\"min_max_scaler\")\n", + "\n", + "# Map features to transformations.\n", + "transformation_functions = {\n", + " \"monthly_in_count\": min_max_scaler,\n", + " \"monthly_in_total_amount\": min_max_scaler,\n", + " \"monthly_in_mean_amount\": min_max_scaler,\n", + " \"monthly_in_std_amount\": min_max_scaler,\n", + " \"monthly_out_count\": min_max_scaler,\n", + " \"monthly_out_total_amount\": min_max_scaler,\n", + " \"monthly_out_mean_amount\": min_max_scaler,\n", + " \"monthly_out_std_amount\": min_max_scaler,\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## โš™๏ธ Feature View Creation \n", + "\n", + "In Hopsworks, you write features to feature groups (where the features are stored) and you read features from feature views. A feature view is a logical view over features, stored in feature groups, and a feature view typically contains the features used by a specific model. This way, feature views enable features, stored in different feature groups, to be reused across many different models. The Feature Views allows schema in form of a query with filters, define a model target feature/label and additional transformation functions.\n", + "In order to create a Feature View we may use `fs.create_feature_view()`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the 'aml_feature_view' feature view\n", + "feature_view = fs.create_feature_view(\n", + " name='aml_feature_view',\n", + " query=selected_features,\n", + " labels=[\"is_sar\"],\n", + " transformation_functions=transformation_functions,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ‹๏ธ Training Dataset Creation\n", + "\n", + "In Hopsworks training data is a query where the projection (set of features) is determined by the parent FeatureView with an optional snapshot on disk of the data returned by the query.\n", + "\n", + "**Training Dataset may contain splits such as:** \n", + "* Training set - the subset of training data used to train a model.\n", + "* Validation set - the subset of training data used to evaluate hparams when training a model\n", + "* Test set - the holdout subset of training data used to evaluate a mode\n", + "\n", + "Training dataset is created using `feature_view.training_data()` method.\n", + "\n", + "**From feature view APIs we can also create training datasts based on even time filters specifing `start_time` and `end_time`**. \n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get training data\n", + "X_train, y_train = feature_view.training_data(\n", + " description='AML training dataset',\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Displaying the first three rows of the training data\n", + "X_train.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Displaying the first three rows of the target data\n", + "y_train.head(3)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ‘“ Exploration \n", + "\n", + "### Similar to Feature Groups Feature Views and Training Tatasets are now accessible and searchable in the UI\n", + "![fv-overview](images/feature_views_explore.gif)\n", + "\n", + "## ๐Ÿ“Š Statistics\n", + "We can explore feature statistics in the feature views. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![fv-stats](images/feature_view_stats.gif)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# ๐Ÿค– Model Building\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Converting string representations of Python literals in 'graph_embeddings' column to actual objects\n", + "X_train['graph_embeddings'] = X_train['graph_embeddings'].apply(ast.literal_eval)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Convert each element in the 'graph_embeddings' column to a NumPy array\n", + "X_train['graph_embeddings'] = X_train['graph_embeddings'].apply(np.array)\n", + "\n", + "# Merge the original DataFrame with a DataFrame of exploded embeddings\n", + "X_train = X_train.merge(\n", + " pd.DataFrame(X_train['graph_embeddings'].to_list()).add_prefix('emb_'), \n", + " left_index=True,\n", + " right_index=True,\n", + ").drop('graph_embeddings', axis=1)\n", + "\n", + "# Display the first three rows of the modified DataFrame\n", + "X_train.head(3)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You are going to train [gan for anomaly detection](https://arxiv.org/pdf/1905.11034.pdf). During training step you will provide only features of accounts that have never been reported for suspicios activity. You will disclose previously reported accounts to the model only in evaluation step. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Filter non-suspicious transactions from X_train based on y_train values equal to 0\n", + "non_sar_transactions = X_train[y_train.values == 0]\n", + "\n", + "# Drop any rows with missing values from the non-suspicious transactions DataFrame\n", + "non_sar_transactions = non_sar_transactions.dropna()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now lets define Tensorflow Dataset as we are going to train keras tensorflow model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def windowed_dataset(dataset, window_size, batch_size):\n", + " # Create a windowed dataset using the specified window_size and shift of 1\n", + " # Drop any remaining elements that do not fit in complete windows\n", + " ds = dataset.window(window_size, shift=1, drop_remainder=True)\n", + "\n", + " # Flatten the nested datasets into a single dataset of windows\n", + " ds = ds.flat_map(lambda x: x.batch(window_size))\n", + "\n", + " # Batch the windows into batches of the specified batch_size\n", + " # Use drop_remainder=True to ensure that all batches have the same size\n", + " # Prefetch one batch to improve performance\n", + " return ds.batch(batch_size, drop_remainder=True).prefetch(1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Convert non_sar_transactions to a TensorFlow dataset, casting the values to float32\n", + "training_dataset = tf.data.Dataset.from_tensor_slices(\n", + " tf.cast(non_sar_transactions.astype('float32'), tf.float32)\n", + ")\n", + "\n", + "# Use the windowed_dataset function to create a windowed dataset\n", + "# Parameters: window_size=2 (sequence length), batch_size=16 (number of sequences in each batch)\n", + "training_dataset = windowed_dataset(\n", + " training_dataset, \n", + " window_size=2, \n", + " batch_size=16,\n", + ")\n", + "\n", + "training_dataset" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿƒ Train Model\n", + "\n", + "Next we'll train a model. Here, we set the class weight of the positive class to be twice as big as the negative class." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿงฌ Model architecture\n", + "\n", + "Key components:\n", + "\n", + "- `Encoder`(encoder_model) takes input data and compresses it into a latent representation. The encoder consists of two Convolutional 1D layers with Batch Normalization and Leaky ReLU activation functions.\n", + "\n", + "- `Generator`(generator_model) takes a latent vector and generates synthetic data. The generator consists of two Convolutional 1D layers with Batch Normalization and Leaky ReLU activation functions. The last layer produces data with the same shape as the input data.\n", + "\n", + "- `Discriminator`(discriminator_model) distinguishes between real and generated (fake) data. It comprises two Convolutional 1D layers with Batch Normalization and Leaky ReLU activation functions, followed by a fully connected layer. The output is a single value representing the probability that the input is real.\n", + "\n", + "![tutorial-flow](images/model_architecture.png)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create an instance of the GanEncAnomalyDetector model with input dimensions [2, n_features]\n", + "model = GanEncAnomalyDetector([2, training_dataset.element_spec.shape[-1]])\n", + "\n", + "# Compile the model\n", + "model.compile()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Iterate through each layer in the model\n", + "for layer in model.layers:\n", + " # Print the name and output shape of each layer\n", + " print(layer.name, layer.output_shape)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Train the model using the training_dataset\n", + "# Set the number of epochs to 2 and suppress verbose output during training\n", + "history = model.fit(\n", + " training_dataset, # Training dataset used for model training\n", + " epochs=2, # Number of training epochs\n", + " verbose=0, # Verbosity mode (0: silent, 1: progress bar, 2: one line per epoch)\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a dictionary to store metrics\n", + "# The key is 'loss', and the value is the initial value of the generator loss from the training history\n", + "metrics = {\n", + " 'loss': history.history[\"g_loss\"][0],\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### โš™๏ธ Model Schema\n", + "\n", + "The model needs to be set up with a [Model Schema](https://docs.hopsworks.ai/3.0/user_guides/mlops/registry/model_schema/), which describes the inputs and outputs for a model.\n", + "\n", + "A Model Schema can be automatically generated from training examples, as shown below." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from hsml.schema import Schema\n", + "from hsml.model_schema import ModelSchema\n", + "\n", + "# Define the input schema using the values of X_train\n", + "input_schema = Schema(X_train)\n", + "\n", + "# Define the output schema using y_train\n", + "output_schema = Schema(y_train)\n", + "\n", + "# Create a ModelSchema object specifying the input and output schemas\n", + "model_schema = ModelSchema(\n", + " input_schema=input_schema, \n", + " output_schema=output_schema,\n", + ")\n", + "\n", + "# Convert the model schema to a dictionary for further inspection or serialization\n", + "model_schema.to_dict()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ“ Register model\n", + "\n", + "One of the features in Hopsworks is the model registry. This is where we can store different versions of models and compare their performance. Models from the registry can then be served as API endpoints." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Set the path for exporting the trained model\n", + "export_path = \"aml_model\"\n", + "print('Exporting trained model to: {}'.format(export_path))\n", + "\n", + "# Get the concrete function for serving the model\n", + "call = model.serve_function.get_concrete_function(tf.TensorSpec([None, None, None], tf.float32))\n", + "\n", + "# Save the model to the specified export path with the serving signature\n", + "tf.saved_model.save(\n", + " model, \n", + " export_path, \n", + " signatures=call,\n", + ")\n", + "\n", + "# Access the model registry in your project\n", + "mr = project.get_model_registry()\n", + "\n", + "# Create a TensorFlow model in the model registry with specified metadata\n", + "mr_model = mr.tensorflow.create_model(\n", + " name=\"aml_model\", # Specify the model name\n", + " metrics=metrics, # Include model metrics\n", + " model_schema=model_schema, # Include model schema\n", + " description=\"Adversarial anomaly detection model.\", # Model description\n", + " input_example=[\"70408aef\"], # Input example\n", + ")\n", + "\n", + "# Save the registered model to the model registry\n", + "mr_model.save(export_path)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿš€ Model Deployment\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile aml_model_transformer.py\n", + "\n", + "import os\n", + "import hsfs\n", + "import numpy as np\n", + "\n", + "class Transformer(object):\n", + " \n", + " def __init__(self): \n", + " # Get feature store handle\n", + " fs_conn = hsfs.connection()\n", + " self.fs = fs_conn.get_feature_store()\n", + " \n", + " # Get feature views\n", + " self.fv = self.fs.get_feature_view(\n", + " name=\"aml_feature_view\", \n", + " version=1,\n", + " )\n", + " \n", + " # Initialise serving\n", + " self.fv.init_serving(1)\n", + " \n", + " def preprocess(self, inputs):\n", + " # Retrieve feature vector using the feature vector provider\n", + " feature_vector = self.fv.get_feature_vector({\"id\": inputs[\"inputs\"][0]})\n", + "\n", + " # Explode embeddings (flatten the list of embeddings)\n", + " feature_vector_exploded_emb = [*self.flat2gen(feature_vector)]\n", + "\n", + " # Reshape feature vector to match the model's input shape\n", + " feature_vector_reshaped = np.array(feature_vector_exploded_emb).reshape(1, 41)\n", + "\n", + " # Convert the feature vector to a TensorFlow constant\n", + " input_vector = tf.constant(feature_vector_reshaped, dtype=tf.float32)\n", + "\n", + " # Add a time dimension (axis=1) to the input vector\n", + " input_vector = tf.expand_dims(input_vector, axis=1)\n", + "\n", + " # Duplicate the input vector to create a pair\n", + " input_vector = tf.tile(input_vector, [1, 2, 1])\n", + "\n", + " # Return the preprocessed input dictionary\n", + " return {\n", + " 'inputs': input_vector\n", + " }\n", + "\n", + " def postprocess(self, outputs):\n", + " return outputs\n", + "\n", + " def flat2gen(self, alist):\n", + " for item in alist:\n", + " if isinstance(item, list):\n", + " for subitem in item: yield subitem\n", + " else:\n", + " yield item\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from hsml.transformer import Transformer\n", + "\n", + "# Get the dataset API from the project\n", + "dataset_api = project.get_dataset_api()\n", + "\n", + "# Upload the transformer script file to the \"Models\" dataset\n", + "uploaded_file_path = dataset_api.upload(\n", + " \"aml_model_transformer.py\", # Name of the script file\n", + " \"Models\", # Destination folder in the dataset\n", + " overwrite=True, # Overwrite the file if it already exists\n", + ")\n", + "\n", + "# Construct the full path to the uploaded transformer script file\n", + "transformer_script_path = os.path.join(\n", + " \"/Projects\", \n", + " project.name, \n", + " uploaded_file_path,\n", + ")\n", + "\n", + "# Create a Transformer object using the uploaded script\n", + "transformer_script = Transformer(\n", + " script_file=transformer_script_path,\n", + ")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Retrieve the \"aml_model\" from the model registry\n", + "model = mr.get_model(\n", + " name=\"aml_model\", \n", + " version=1,\n", + ")\n", + "\n", + "# Deploy the model with the specified name (\"amlmodeldeployment\") and associated transformer\n", + "deployment = model.deploy(\n", + " name=\"amlmodeldeployment\", # Specify the deployment name\n", + " transformer=transformer_script, # Associate the transformer script with the deployment\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Deployment: \" + deployment.name)\n", + "deployment.describe()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> The deployment has now been registered. However, to start it you need to run:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deployment.start(await_running=300)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> For trouble shooting one can use `get_logs` method" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deployment.get_logs()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> To stop the deployment you simply run:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deployment.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## โญ๏ธ Next: Part 03: Online Inference \n", + " \n", + "In the next notebook you will use your deployment for online inference." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/advanced_tutorials/aml/3_aml_online_inference.ipynb b/advanced_tutorials/aml/3_aml_online_inference.ipynb new file mode 100644 index 00000000..a6c58cc8 --- /dev/null +++ b/advanced_tutorials/aml/3_aml_online_inference.ipynb @@ -0,0 +1,289 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# **Hopsworks Feature Store** - Part 03: Online Inference\n", + "\n", + "In this last notebook you will use your deployment for online inference. \n", + "\n", + "## **๐Ÿ—’๏ธ This notebook is divided into the following sections:** \n", + "1. **Deployment Retrieval**: Retrieve your deployment from the model registry.\n", + "2. **Prediction using deployment**.\n", + "3. **REST endpoint usage for model serving**.\n", + "\n", + "![tutorial-flow](../../images/03_model.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ“ก Connecting to Hopsworks Feature Store " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Connected. Call `.close()` to terminate connection gracefully.\n", + "\n", + "Logged in to project, explore it here https://staging.cloud.hopsworks.ai/p/120\n", + "Connected. Call `.close()` to terminate connection gracefully.\n" + ] + } + ], + "source": [ + "import hopsworks\n", + "\n", + "project = hopsworks.login()\n", + "\n", + "# Get the feature store handle for the project's feature store\n", + "fs = project.get_feature_store()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ—„ Model Registry\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Connected. Call `.close()` to terminate connection gracefully.\n" + ] + } + ], + "source": [ + "# Get the Model Registry\n", + "mr = project.get_model_registry()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## โš™๏ธ Fetch Deployment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Access the Model Serving\n", + "ms = project.get_model_serving()\n", + "\n", + "# Specify the deployment name\n", + "deployment_name = \"amlmodeldeployment\"\n", + "\n", + "# Get the deployment with the specified name\n", + "deployment = ms.get_deployment(deployment_name)\n", + "\n", + "# Start the deployment and wait for it to be in a running state for up to 300 seconds\n", + "deployment.start(await_running=300)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ”ฎ Predicting using deployment\n", + "\n", + "\n", + "Finally you can start making predictions with your model!\n", + "\n", + "Send inference requests to the deployed model as follows:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Prepare input data using the input example from the model\n", + "data = {\n", + " \"inputs\": model.input_example,\n", + "}\n", + "\n", + "# Make predictions using the deployed model\n", + "predictions = deployment.predict(data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Prepare input data with a specific input value\n", + "data = {\n", + " \"inputs\": [\"f1c119ed\"],\n", + "}\n", + "\n", + "# Make predictions using the deployed model\n", + "predictions = deployment.predict(data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> For trouble shooting one can use `get_logs` method." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deployment.get_logs()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿš€ Use REST endpoint\n", + "\n", + "You can also use a REST endpoint for your model. To do this you need to create an API key with 'serving' enabled, and retrieve the endpoint URL from the Model Serving UI.\n", + "\n", + "Go to the Model Serving UI and click on the eye icon next to a model to retrieve the endpoint URL. The shorter URL is an internal endpoint that you can only reach from within Hopsworks. If you want to call it from outside, you need one of the longer URLs. \n", + "\n", + "![serving-endpoints](images/serving_endpoints.gif)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import requests\n", + "\n", + "mr = project.get_model_registry()\n", + "\n", + "# Use the model name from the previous notebook.\n", + "model = mr.get_model(\n", + " name=\"fraud_tutorial_model\", \n", + " version=1,\n", + ")\n", + "\n", + "test_inputs = model.input_example\n", + "\n", + "API_KEY = \"...\" # Put your API key here.\n", + "MODEL_SERVING_URL = \"...\" # Put model serving endppoint here.\n", + "HOST_NAME = \"...\" # Put your hopsworks model serving hostname here \n", + "\n", + "data = {\"inputs\": test_inputs}\n", + "headers = {\n", + " \"Content-Type\": \"application/json\", \"Accept\": \"application/json\",\n", + " \"Authorization\": f\"ApiKey {API_KEY}\",\n", + " \"Host\": HOST_NAME}\n", + "\n", + "response = requests.post(MODEL_SERVING_URL, verify=False, headers=headers, json=data)\n", + "response.json()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Now lets test feature vectors from online store\n", + "ids_to_score = [\n", + " \"0016359b\", \n", + " \"001dcc27\", \n", + " \"0054a022\", \n", + " \"00d6b609\", \n", + " \"00e14860\", \n", + " \"00e39a1b\", \n", + " \"014ed5cb\", \n", + " \"01ce3306\", \n", + " \"01fa19ae\", \n", + " \"01fa1d01\", \n", + " \"036dce03\", \n", + " \"03e09be4\", \n", + " \"04b23f4b\",\n", + "]\n", + "\n", + "for node_id in ids_to_score:\n", + " data = {\"inputs\": [node_id]}\n", + " print(\" anomaly score for node_id \", node_id, \" : \", deployment.predict(data)[\"outputs\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Stop Deployment\n", + "To stop the deployment you simply run:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deployment.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐ŸŽ Wrapping things up " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this module you perforemed feature engineering, created feature view and traning dataset, trained advesarial anomaly detection model and depoyed it in production. To setup this pipeline in your enterprise settings contuct us.\n", + "\n", + "" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/advanced_tutorials/aml/anomaly_detection.py b/advanced_tutorials/aml/anomaly_detection.py new file mode 100644 index 00000000..d785b13c --- /dev/null +++ b/advanced_tutorials/aml/anomaly_detection.py @@ -0,0 +1,288 @@ +import tensorflow as tf + +class GanEncAnomalyDetector(tf.keras.Model): + + def __init__(self, input_dim): + super(GanEncAnomalyDetector, self).__init__() + + self.input_dim = input_dim + self.latent_dim = [1, input_dim[1]] + self.d_steps = 3 + self.gp_weight = 10 + + self.encoder = self.make_encoder_model(self.input_dim) + self.generator = self.make_generator(self.input_dim, self.latent_dim) + self.discriminator = self.make_discriminator_model(self.input_dim) + + self.mse = tf.keras.losses.MeanSquaredError() + + self.epoch_e_loss_avg = tf.keras.metrics.Mean(name="epoch_e_loss_avg") + self.epoch_d_loss_avg = tf.keras.metrics.Mean(name="epoch_d_loss_avg") + self.epoch_g_loss_avg = tf.keras.metrics.Mean(name="epoch_g_loss_avg") + self.epoch_a_score_avg = tf.keras.metrics.Mean(name="epoch_a_score_avg") + + @property + def metrics(self): + return [ + self.epoch_e_loss_avg, + self.epoch_d_loss_avg, + self.epoch_g_loss_avg, + self.epoch_a_score_avg, + ] + + # define model architectures + def make_encoder_model(self, input_dim): + inputs = tf.keras.layers.Input(shape=(input_dim[0],input_dim[1])) + x = tf.keras.layers.Conv1D(filters = 64, kernel_size= 1,padding='same', kernel_initializer="uniform")(inputs) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.LeakyReLU(alpha=0.2)(x) + x = tf.keras.layers.MaxPooling1D(pool_size=2, padding='same')(x) + x = tf.keras.layers.Conv1D(filters = input_dim[1], kernel_size= 1,padding='same', kernel_initializer="uniform")(x) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.LeakyReLU(alpha=0.2)(x) + x = tf.keras.layers.MaxPooling1D(pool_size=2, padding='same')(x) + encoder = tf.keras.Model(inputs=inputs, outputs=x, name="encoder_model") + return encoder + + def make_generator(self, input_dim, latent_dim): + latent_inputs = tf.keras.layers.Input(shape=(latent_dim[0],latent_dim[1])) + x = tf.keras.layers.Conv1D(filters = 8, kernel_size= 1,padding='same', kernel_initializer="uniform")(latent_inputs) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.LeakyReLU(alpha=0.2)(x) + x = tf.keras.layers.UpSampling1D(2)(x) + x = tf.keras.layers.Conv1D(filters = 16, kernel_size= 1,padding='same', kernel_initializer="uniform")(x) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.LeakyReLU(alpha=0.2)(x) + #x = tf.keras.layers.UpSampling1D(2)(x) + x = tf.keras.layers.Conv1D(filters = input_dim[1], kernel_size= 1,padding='same', kernel_initializer="uniform")(x) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.LeakyReLU(alpha=0.2)(x) + generator = tf.keras.Model(inputs=latent_inputs, outputs=x, name="generator_model") + return generator + + def make_discriminator_model(self, input_dim): + inputs = tf.keras.layers.Input(shape=(input_dim[0],input_dim[1])) + x = tf.keras.layers.Conv1D(filters = 128, kernel_size= 1,padding='same', kernel_initializer="uniform")(inputs) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.LeakyReLU(alpha=0.2)(x) + x = tf.keras.layers.MaxPooling1D(pool_size=2, padding='same')(x) + x = tf.keras.layers.Conv1D(filters = 64, kernel_size= 1,padding='same', kernel_initializer="uniform")(x) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.LeakyReLU(alpha=0.2)(x) + + # dense output layer + x = tf.keras.layers.Flatten()(x) + x = tf.keras.layers.LeakyReLU(0.2)(x) + x = tf.keras.layers.Dense(128)(x) + x = tf.keras.layers.LeakyReLU(0.2)(x) + prediction = tf.keras.layers.Dense(1)(x) + discriminator = tf.keras.Model(inputs=inputs, outputs=prediction, name="discriminator_model" ) + return discriminator + + # Training function + @tf.function + def train_step(self, real_data): + if isinstance(real_data, tuple): + real_data = real_data[0] + + # Get the batch size + batch_size = tf.shape(real_data)[0] + + # For each batch, we are going to perform the + # following steps as laid out in the original paper: + # 1. Train the generator and get the generator loss + # 1a. Train the encoder and get the encoder loss + # 2. Train the discriminator and get the discriminator loss + # 3. Calculate the gradient penalty + # 4. Multiply this gradient penalty with a constant weight factor + # 5. Add the gradient penalty to the discriminator loss + # 6. Return the generator and discriminator losses as a loss dictionary + + # Train the discriminator first. The original paper recommends training + # the discriminator for `x` more steps (typically 5) as compared to + # one step of the generator. Here we will train it for 3 extra steps + # as compared to 5 to reduce the training time. + for i in range(self.d_steps): + # Get the latent vector + random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim[0], self.latent_dim[1])), + with tf.GradientTape() as tape: + # Generate fake data from the latent vector + fake_data = self.generator(random_latent_vectors, training=True) + + #(somewhere here step forward?) + # Get the logits for the fake data + fake_logits = self.discriminator(fake_data, training=True) + # Get the logits for the real data + real_logits = self.discriminator(real_data, training=True) + + # Calculate the discriminator loss using the fake and real sample logits + d_cost = self.discriminator_loss(real_sample=real_logits, fake_sample=fake_logits) + # Calculate the gradient penalty + gp = self.gradient_penalty(real_data, fake_data) + # Add the gradient penalty to the original discriminator loss + d_loss = d_cost + gp * self.gp_weight + + # Get the gradients w.r.t the discriminator loss + d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables) + # Update the weights of the discriminator using the discriminator optimizer + self.d_optimizer.apply_gradients( + zip(d_gradient, self.discriminator.trainable_variables) + ) + + # Train the generator + # Get the latent vector + random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim[0], self.latent_dim[1])) + with tf.GradientTape() as tape: + # Generate fake data using the generator + generated_data = self.generator(random_latent_vectors, training=True) + # Get the discriminator logits for fake data + gen_sample_logits = self.discriminator(generated_data, training=True) + # Calculate the generator loss + g_loss = self.generator_loss(gen_sample_logits) + + # Get the gradients w.r.t the generator loss + gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables) + # Update the weights of the generator using the generator optimizer + self.g_optimizer.apply_gradients( + zip(gen_gradient, self.generator.trainable_variables) + ) + + # Train the encoder + with tf.GradientTape() as tape: + generated_data = self.generator(random_latent_vectors, training=True) + # Compress generate fake data from the latent vector + encoded_fake_data = self.encoder(generated_data, training=True) + # Reconstruct encoded generate fake data + generator_reconstructed_encoded_fake_data = self.generator(encoded_fake_data, training=True) + # Encode the latent vector + encoded_random_latent_vectors = self.encoder(tf.random.normal(shape=(batch_size, self.input_dim[0], self.input_dim[1])), + training=True) + # Calculate encoder loss + e_loss = self.encoder_loss(generated_data, generator_reconstructed_encoded_fake_data) + + # Get the gradients w.r.t the generator loss + enc_gradient = tape.gradient(e_loss, self.encoder.trainable_variables) + # Update the weights of the generator using the generator optimizer + self.e_optimizer.apply_gradients( + zip(enc_gradient, self.encoder.trainable_variables) + ) + + anomaly_score = self.compute_anomaly_score(real_data) + + self.epoch_d_loss_avg.update_state(d_loss) + self.epoch_g_loss_avg.update_state(g_loss) + self.epoch_e_loss_avg.update_state(e_loss) + self.epoch_a_score_avg.update_state(anomaly_score["anomaly_score"]) + + return {"d_loss": d_loss, "g_loss": g_loss, "e_loss": e_loss, "anomaly_score": anomaly_score["anomaly_score"]} + + @tf.function + def test_step(self, input): + if isinstance(input, tuple): + input = input[0] + + batch_size = tf.shape(input)[0] + random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim[0], self.latent_dim[1])) + # Generate fake data using the generator + generated_data = self.generator(random_latent_vectors, training=False) + # Get the discriminator logits for fake data + gen_sample_logits = self.discriminator(generated_data, training=False) + # Calculate the generator loss + g_loss = self.generator_loss(gen_sample_logits) + + + # Compress generate fake data from the latent vector + encoded_fake_data = self.encoder(generated_data, training=False) + # Reconstruct encoded generate fake data + generator_reconstructed_encoded_fake_data = self.generator(encoded_fake_data, training=False) + + # Calculate encoder loss + e_loss = self.encoder_loss(generated_data, generator_reconstructed_encoded_fake_data) + + anomaly_score = self.compute_anomaly_score(input) + return { + "g_loss": g_loss, + "e_loss": e_loss, + "anomaly_score": anomaly_score["anomaly_score"] + } + + # define custom server function + @tf.function + def serve_function(self, input): + return self.compute_anomaly_score(input) + + def call(self, input): + if isinstance(input, tuple): + input = input[0] + + encoded = self.encoder(input) + decoded = self.generator(encoded) + anomaly_score = self.compute_anomaly_score(input) + return anomaly_score["anomaly_score"], decoded + + def compile(self): + super(GanEncAnomalyDetector, self).compile() + # Define optimizers + self.e_optimizer = tf.keras.optimizers.SGD(learning_rate=0.00001, clipnorm=0.01) + self.d_optimizer = tf.keras.optimizers.SGD(learning_rate=0.00001, clipnorm=0.01) + self.g_optimizer = tf.keras.optimizers.SGD(learning_rate=0.00001, clipnorm=0.01) + + def gradient_penalty(self, real_data, fake_data): + """ Calculates the gradient penalty. + This loss is calculated on an interpolated sample + and added to the discriminator loss. + """ + # Get the interpolated sample + real_data_shape = tf.shape(real_data) + alpha = tf.random.normal(shape=[real_data_shape[0], real_data_shape[1], real_data_shape[2]], mean=0.0, stddev=2.0, dtype=tf.dtypes.float32) + #alpha = tf.random_uniform([self.batch_size, 1], minval=-2, maxval=2, dtype=tf.dtypes.float32) + interpolated = (alpha * real_data) + ((1 - alpha) * fake_data) + + with tf.GradientTape() as gp_tape: + gp_tape.watch(interpolated) + # 1. Get the discriminator output for this interpolated sample. + pred = self.discriminator(interpolated, training=True) + + # 2. Calculate the gradients w.r.t to this interpolated sample. + grads = gp_tape.gradient(pred, [interpolated])[0] + # 3. Calculate the norm of the gradients. + norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[-2, -1])) + gp = tf.reduce_mean((norm - 1.0) ** 2) + return gp + + def encoder_loss(self,generated_fake_data, generator_reconstructed_encoded_fake_data): + generator_reconstracted_data = tf.cast(generator_reconstructed_encoded_fake_data, tf.float32) + loss = self.mse(generated_fake_data, generator_reconstracted_data) + beta_cycle_gen = 10.0 + loss = loss * beta_cycle_gen + return loss + + # Define the loss functions for the discriminator, + # which should be (fake_loss - real_loss). + # We will add the gradient penalty later to this loss function. + def discriminator_loss(self, real_sample, fake_sample): + real_loss = tf.reduce_mean(real_sample) + fake_loss = tf.reduce_mean(fake_sample) + return fake_loss - real_loss + + # Define the loss functions for the generator. + def generator_loss(self, fake_sample): + return -tf.reduce_mean(fake_sample) + + def compute_anomaly_score(self, input): + """anomaly score. + See https://arxiv.org/pdf/1905.11034.pdf for more details + """ + # Encode the real data + encoded_real_data = self.encoder(input, training=False) + # Reconstruct encoded real data + generator_reconstructed_encoded_real_data = self.generator(encoded_real_data, training=False) + # Calculate distance between real and reconstructed data (Here may be step forward?) + gen_rec_loss_predict = self.mse(input,generator_reconstructed_encoded_real_data) + + # # Compute anomaly score + # real_to_orig_dist_predict = tf.math.reduce_sum(tf.math.pow(encoded_random_latent - encoded_real_data, 2), axis=[-1]) + # anomaly_score = (gen_rec_loss_predict * self.anomaly_alpha) + ((1 - self.anomaly_alpha) * real_to_orig_dist_predict) + anomaly_score = gen_rec_loss_predict + return {'anomaly_score': anomaly_score} + \ No newline at end of file diff --git a/advanced_tutorials/aml/features/graph_embeddings.py b/advanced_tutorials/aml/features/graph_embeddings.py new file mode 100644 index 00000000..5348a6cf --- /dev/null +++ b/advanced_tutorials/aml/features/graph_embeddings.py @@ -0,0 +1,299 @@ +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras import layers +import pandas as pd +import numpy as np + +def create_ffn(hidden_units: list, dropout_rate: float, name: str = None) -> keras.Sequential: + """ + Create a feedforward neural network layer. + + Parameters: + - hidden_units (list): List of integers specifying the number of units in each hidden layer. + - dropout_rate (float): Dropout rate for regularization. + - name (str): Name of the layer. + + Returns: + keras.Sequential: Feedforward neural network layer. + """ + fnn_layers = [] + + for units in hidden_units: + fnn_layers.append(layers.BatchNormalization()) + fnn_layers.append(layers.Dropout(dropout_rate)) + fnn_layers.append(layers.Dense(units, activation=tf.nn.gelu)) + + return keras.Sequential(fnn_layers, name=name) + +class GraphConvLayer(layers.Layer): + """ + Graph Convolutional Layer. + + Parameters: + - hidden_units (list): List of integers specifying the number of units in each hidden layer. + - dropout_rate (float): Dropout rate for regularization. + - aggregation_type (str): Type of aggregation for neighbor messages ('sum', 'mean', 'max'). + - combination_type (str): Type of combination for node embeddings ('gated', 'gru', 'concat', 'add'). + - normalize (bool): Flag to normalize node embeddings. + """ + + def __init__( + self, + hidden_units: list, + dropout_rate: float = 0.2, + aggregation_type: str = "mean", + combination_type: str = "concat", + normalize: bool = False, + *args, + **kwargs, + ): + super(GraphConvLayer, self).__init__(*args, **kwargs) + + self.aggregation_type = aggregation_type + self.combination_type = combination_type + self.normalize = normalize + + self.ffn_prepare = create_ffn(hidden_units, dropout_rate) + if self.combination_type == "gated": + self.update_fn = layers.GRU( + units=hidden_units, + activation="tanh", + recurrent_activation="sigmoid", + dropout=dropout_rate, + return_state=True, + recurrent_dropout=dropout_rate, + ) + else: + self.update_fn = create_ffn(hidden_units, dropout_rate) + + def prepare(self, node_repesentations, weights=None) -> tf.Tensor: + """ + Prepare neighbor messages. + + Parameters: + - node_repesentations (tf.Tensor): Node representations. + - weights (tf.Tensor): Weights for neighbor messages. + + Returns: + tf.Tensor: Prepared neighbor messages. + """ + messages = self.ffn_prepare(node_repesentations) + if weights is not None: + messages = messages * tf.expand_dims(weights, -1) + return messages + + def aggregate(self, node_indices, neighbour_messages) -> tf.Tensor: + """ + Aggregate neighbor messages. + + Parameters: + - node_indices (tf.Tensor): Node indices. + - neighbour_messages (tf.Tensor): Neighbor messages. + + Returns: + tf.Tensor: Aggregated messages. + """ + num_nodes = tf.math.reduce_max(node_indices) + 1 + if self.aggregation_type == "sum": + aggregated_message = tf.math.unsorted_segment_sum( + neighbour_messages, node_indices, num_segments=num_nodes + ) + elif self.aggregation_type == "mean": + aggregated_message = tf.math.unsorted_segment_mean( + neighbour_messages, node_indices, num_segments=num_nodes + ) + elif self.aggregation_type == "max": + aggregated_message = tf.math.unsorted_segment_max( + neighbour_messages, node_indices, num_segments=num_nodes + ) + else: + raise ValueError(f"Invalid aggregation type: {self.aggregation_type}.") + + return aggregated_message + + def update(self, node_repesentations, aggregated_messages) -> tf.Tensor: + """ + Update node embeddings. + + Parameters: + - node_repesentations (tf.Tensor): Node representations. + - aggregated_messages (tf.Tensor): Aggregated neighbor messages. + + Returns: + tf.Tensor: Updated node embeddings. + """ + if self.combination_type == "gru": + h = tf.stack([node_repesentations, aggregated_messages], axis=1) + elif self.combination_type == "concat": + h = tf.concat([node_repesentations, aggregated_messages], axis=1) + elif self.combination_type == "add": + h = node_repesentations + aggregated_messages + else: + raise ValueError(f"Invalid combination type: {self.combination_type}.") + + node_embeddings = self.update_fn(h) + if self.combination_type == "gru": + node_embeddings = tf.unstack(node_embeddings, axis=1)[-1] + + if self.normalize: + node_embeddings = tf.nn.l2_normalize(node_embeddings, axis=-1) + return node_embeddings + + def call(self, inputs) -> tf.Tensor: + """ + Process inputs to produce node embeddings. + + Parameters: + inputs: a tuple of three elements: node_repesentations, edges, edge_weights. + + Returns: + tf.Tensor: Node embeddings. + """ + node_repesentations, edges, edge_weights = inputs + node_indices, neighbour_indices = edges[0], edges[1] + neighbour_repesentations = tf.gather(node_repesentations, neighbour_indices) + + neighbour_messages = self.prepare(neighbour_repesentations, edge_weights) + aggregated_messages = self.aggregate(node_indices, neighbour_messages) + return self.update(node_repesentations, aggregated_messages) + + +class GNNNodeClassifier(tf.keras.Model): + """ + Graph Neural Network Node Classifier. + + Parameters: + - graph_info: Tuple of node_features, edges, and edge_weights. + - hidden_units (list): List of integers specifying the number of units in each hidden layer. + - aggregation_type (str): Type of aggregation for neighbor messages ('sum', 'mean', 'max'). + - combination_type (str): Type of combination for node embeddings ('gated', 'gru', 'concat', 'add'). + - dropout_rate (float): Dropout rate for regularization. + - normalize (bool): Flag to normalize node embeddings. + """ + + def __init__( + self, + graph_info: tuple, + hidden_units: list, + aggregation_type: str = "sum", + combination_type: str = "concat", + dropout_rate: float = 0.2, + normalize: bool = True, + *args, + **kwargs, + ): + super(GNNNodeClassifier, self).__init__(*args, **kwargs) + + node_features, edges, edge_weights = graph_info + self.node_features = node_features + self.edges = edges + self.edge_weights = edge_weights + + if self.edge_weights is None: + self.edge_weights = tf.ones(shape=edges.shape[1]) + + self.edge_weights = self.edge_weights / tf.math.reduce_sum(self.edge_weights) + + self.preprocess = create_ffn(hidden_units, dropout_rate, name="preprocess") + self.conv1 = GraphConvLayer( + hidden_units, + dropout_rate, + aggregation_type, + combination_type, + normalize, + name="graph_conv1", + ) + self.conv2 = GraphConvLayer( + hidden_units, + dropout_rate, + aggregation_type, + combination_type, + normalize, + name="graph_conv2", + ) + self.postprocess = create_ffn(hidden_units, dropout_rate, name="postprocess") + self.compute_logits = layers.Dense(hidden_units[0], activation=tf.nn.tanh, name="logits") + + def call(self, input_node_indices) -> tf.Tensor: + """ + Make predictions. + + Parameters: + - input_node_indices (tf.Tensor): Input node indices. + + Returns: + tf.Tensor: Predictions. + """ + x = self.preprocess(self.node_features) + x1 = self.conv1((x, self.edges, self.edge_weights)) + x = x1 + x + x2 = self.conv2((x, self.edges, self.edge_weights)) + x = x2 + x + x = self.postprocess(x) + node_embeddings = tf.gather(x, input_node_indices) + return self.compute_logits(node_embeddings) + + +def construct_graph(input_df: pd.DataFrame, data_party_labels: pd.DataFrame) -> dict: + """ + Construct a graph and generate node embeddings. + + Parameters: + - input_df (pd.DataFrame): Input transaction DataFrame. + - data_party_labels (pd.DataFrame): DataFrame containing party labels. + + Returns: + dict: Dictionary with keys 'id' and 'graph_embeddings'. + """ + sampled_party = data_party_labels[data_party_labels.id.isin(input_df.source) | (data_party_labels.id.isin(input_df.target))] + sampled_party = sampled_party[["id", "type", "is_sar"]] + + unique_ids = set(sampled_party.id.values) + id_dict = {idn: i for i, idn in enumerate(unique_ids)} + + sampled_party['int_id'] = sampled_party['id'].apply(lambda x: id_dict[x]) + input_df['source'] = input_df['source'].apply(lambda x: id_dict[x]) + input_df['target'] = input_df['target'].apply(lambda x: id_dict[x]) + + feature_names = ["type"] + x_train = sampled_party.int_id.to_numpy() + + edges = input_df[["source", "target"]].to_numpy().T + edge_weights = tf.ones(shape=edges.shape[1]) + node_features = tf.cast( + sampled_party.sort_values("id")[feature_names].to_numpy(), dtype=tf.dtypes.float32 + ) + graph_info = (node_features, edges, edge_weights) + + # Hyperparameters for graph embeddings model + hidden_units = [32, 32] + learning_rate = 0.01 + dropout_rate = 0.5 + num_epochs = 2 + batch_size = 256 + + # Construct the model + model = GNNNodeClassifier( + graph_info=graph_info, + hidden_units=hidden_units, + dropout_rate=dropout_rate, + name="gnn_model", + ) + + # Compile the model. + model.compile( + optimizer=keras.optimizers.RMSprop(learning_rate=learning_rate), + loss=keras.losses.MeanSquaredError(), + metrics=[keras.metrics.SparseCategoricalAccuracy(name="acc")], + ) + + # Fit the model. + history = model.fit( + x=x_train, + y=x_train, + epochs=num_epochs, + batch_size=batch_size, + ) + + graph_embeddings = list(model.predict(x_train).reshape(node_features.shape[0], hidden_units[0])) + return {"id": sampled_party.id.to_numpy(), "graph_embeddings": graph_embeddings} diff --git a/advanced_tutorials/aml/features/party.py b/advanced_tutorials/aml/features/party.py new file mode 100644 index 00000000..6fa4ad13 --- /dev/null +++ b/advanced_tutorials/aml/features/party.py @@ -0,0 +1,68 @@ +import datetime +import pandas as pd +import numpy as np + +def get_transaction_labels(data_transactions: pd.DataFrame, data_alert_transactions: pd.DataFrame) -> pd.DataFrame: + """ + Merge transaction data with alert transaction data to get labels indicating SAR occurrences. + + Parameters: + - data_transactions (pd.DataFrame): DataFrame containing transaction information. + - data_alert_transactions (pd.DataFrame): DataFrame with alert transaction information, including SAR labels. + + Returns: + pd.DataFrame: Merged DataFrame with transaction labels indicating SAR occurrences. + """ + transaction_labels = data_transactions[ + ["source", "target", "tran_id", "tran_timestamp"] + ].merge( + data_alert_transactions[["is_sar", "tran_id"]], + on=["tran_id"], + how="left", + ) + transaction_labels.is_sar = transaction_labels.is_sar.map({ + True: 1, + np.nan: 0, + }) + transaction_labels.sort_values( + 'tran_id', + inplace=True, + ) + transaction_labels.rename(columns={"tran_id": "id"}, inplace=True) + return transaction_labels + + +def get_party_labels(data_transaction_labels: pd.DataFrame, data_party: pd.DataFrame) -> pd.DataFrame: + """ + Assign SAR(Suspicious Activity Reports) labels to parties based on transaction data. + + Parameters: + - data_transaction_labels (pd.DataFrame): DataFrame containing transaction labels, including SAR information. + - data_party (pd.DataFrame): DataFrame with party information. + + Returns: + pd.DataFrame: DataFrame with party labels indicating SAR occurrences. + """ + alert_transactions = data_transaction_labels[data_transaction_labels.is_sar == 1] + alert_sources = alert_transactions[["source", "tran_timestamp"]] + alert_sources.columns = ["id", "tran_timestamp"] + alert_targets = alert_transactions[["target", "tran_timestamp"]] + alert_targets.columns = ["id", "tran_timestamp"] + sar_party = alert_sources.append(alert_targets, ignore_index=True) + sar_party.sort_values(["id", "tran_timestamp"], ascending=[False, True]) + + # Find the first occurrence of SAR per ID + sar_party = sar_party.iloc[[sar_party.id.eq(id).idxmax() for id in sar_party['id'].value_counts().index]] + sar_party = sar_party.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'id']).agg(monthly_count=('id', 'count')) + sar_party = sar_party.reset_index(level=["id"]) + sar_party = sar_party.reset_index(level=["tran_timestamp"]) + sar_party.drop(["monthly_count"], axis=1, inplace=True) + + sar_party["is_sar"] = sar_party["is_sar"] = 1 + + party_labels = data_party.merge(sar_party, on=["id"], how="left") + party_labels.is_sar = party_labels.is_sar.map({1.0: 1, np.nan: 0}) + max_time_stamp = datetime.datetime.utcfromtimestamp(int(max(data_transaction_labels.tran_timestamp.values)) / 1e9) + party_labels = party_labels.fillna(max_time_stamp) + + return party_labels diff --git a/advanced_tutorials/aml/features/transactions.py b/advanced_tutorials/aml/features/transactions.py new file mode 100644 index 00000000..7db4f7a4 --- /dev/null +++ b/advanced_tutorials/aml/features/transactions.py @@ -0,0 +1,66 @@ +import pandas as pd +import numpy as np + +def get_out_transactions(data: pd.DataFrame) -> pd.DataFrame: + """ + Calculate monthly outgoing transaction statistics for each source ID. + + Parameters: + - data (pd.DataFrame): DataFrame containing transaction information. + + Returns: + pd.DataFrame: DataFrame with monthly outgoing transaction statistics. + """ + out_df = data.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'source']) \ + .agg(monthly_count=('source', 'count'), + monthly_total_amount=('base_amt', 'sum'), + monthly_mean_amount=('base_amt', 'mean'), + monthly_std_amount=('base_amt', 'std') + ) + out_df = out_df.reset_index(level=["source"]) + out_df = out_df.reset_index(level=["tran_timestamp"]) + out_df.columns = ["tran_timestamp", "id", "monthly_out_count", "monthly_out_total_amount", + "monthly_out_mean_amount", "monthly_out_std_amount"] + out_df.tran_timestamp = out_df.tran_timestamp.values.astype(np.int64) // 10 ** 6 + return out_df + + +def get_in_transactions(data: pd.DataFrame) -> pd.DataFrame: + """ + Calculate monthly incoming transaction statistics for each target ID. + + Parameters: + - data (pd.DataFrame): DataFrame containing transaction information. + + Returns: + pd.DataFrame: DataFrame with monthly incoming transaction statistics. + """ + in_df = data.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'target']) \ + .agg(monthly_count=('target', 'count'), + monthly_total_amount=('base_amt', 'sum'), + monthly_mean_amount=('base_amt', 'mean'), + monthly_std_amount=('base_amt', 'std')) + + in_df = in_df.reset_index(level=["target"]) + in_df = in_df.reset_index(level=["tran_timestamp"]) + in_df.columns = ["tran_timestamp", "id", "monthly_in_count", "monthly_in_total_amount", + "monthly_in_mean_amount", "monthly_in_std_amount"] + in_df.tran_timestamp = in_df.tran_timestamp.values.astype(np.int64) // 10 ** 6 + return in_df + + +def get_in_out_transactions(data_transactions: pd.DataFrame) -> pd.DataFrame: + """ + Merge monthly incoming and outgoing transaction statistics. + + Parameters: + - data_transactions (pd.DataFrame): DataFrame containing transaction information. + + Returns: + pd.DataFrame: Merged DataFrame with monthly incoming and outgoing transaction statistics. + """ + out_df = get_out_transactions(data_transactions) + in_df = get_in_transactions(data_transactions) + in_out_df = in_df.merge(out_df, on=['tran_timestamp', 'id'], how="outer") + in_out_df = in_out_df.fillna(0) + return in_out_df diff --git a/advanced_tutorials/aml/images/contuct_us.png b/advanced_tutorials/aml/images/contuct_us.png new file mode 100755 index 00000000..19b48490 Binary files /dev/null and b/advanced_tutorials/aml/images/contuct_us.png differ diff --git a/advanced_tutorials/aml/images/deployment.gif b/advanced_tutorials/aml/images/deployment.gif new file mode 100755 index 00000000..2f01f22e Binary files /dev/null and b/advanced_tutorials/aml/images/deployment.gif differ diff --git a/advanced_tutorials/aml/images/feature_view_stats.gif b/advanced_tutorials/aml/images/feature_view_stats.gif new file mode 100755 index 00000000..d23f2f97 Binary files /dev/null and b/advanced_tutorials/aml/images/feature_view_stats.gif differ diff --git a/advanced_tutorials/aml/images/feature_views_explore.gif b/advanced_tutorials/aml/images/feature_views_explore.gif new file mode 100755 index 00000000..74f0fa7b Binary files /dev/null and b/advanced_tutorials/aml/images/feature_views_explore.gif differ diff --git a/advanced_tutorials/aml/images/fg_explore.gif b/advanced_tutorials/aml/images/fg_explore.gif new file mode 100755 index 00000000..90c7a4d1 Binary files /dev/null and b/advanced_tutorials/aml/images/fg_explore.gif differ diff --git a/advanced_tutorials/aml/images/freature_group_stats.gif b/advanced_tutorials/aml/images/freature_group_stats.gif new file mode 100755 index 00000000..ad1b2775 Binary files /dev/null and b/advanced_tutorials/aml/images/freature_group_stats.gif differ diff --git a/advanced_tutorials/aml/images/model_architecture.png b/advanced_tutorials/aml/images/model_architecture.png new file mode 100755 index 00000000..8d43b5ff Binary files /dev/null and b/advanced_tutorials/aml/images/model_architecture.png differ diff --git a/advanced_tutorials/aml/images/serving_endpoints.gif b/advanced_tutorials/aml/images/serving_endpoints.gif new file mode 100755 index 00000000..de46c5b4 Binary files /dev/null and b/advanced_tutorials/aml/images/serving_endpoints.gif differ diff --git a/advanced_tutorials/aml/requirements.txt b/advanced_tutorials/aml/requirements.txt new file mode 100644 index 00000000..4dfedb0a --- /dev/null +++ b/advanced_tutorials/aml/requirements.txt @@ -0,0 +1,25 @@ +pandas==1.5.3 +numpy==1.24.2 +tensorflow==2.11.0 +keras==2.11.0 +matplotlib==3.7.0 +matplotlib-inline==0.1.6 +nvidia-ml-py==12.535.133 +nvidia-ml-py3==7.352.0 +scikit-learn==1.1.1 +scipy==1.11.4 +seaborn==0.12.2 +tensorboard==2.11.2 +tensorboard-data-server==0.6.1 +tensorboard-plugin-profile==2.11.1 +tensorboard-plugin-wit==1.8.1 +tensorflow==2.11.0 +tensorflow-estimator==2.11.0 +tensorflow-io==0.29.0 +tensorflow-io-gcs-filesystem==0.29.0 +torch==1.12.0 +torchaudio==0.12.0 +torchvision==0.13.0 +tornado==6.1 +tqdm==4.64.1 +xgboost==1.7.4