## Qwak Streaming Feature Set with Streaming Aggregations, Offline, and Online Querying

Welcome to the Qwak Feature Store example! In this tutorial, we'll guide you through creating a sample Streaming Data Source, defining the streaming ingestion pipeline and sample aggregations, and extracting features from both the Offline and Online store using the Qwak SDK. 

Guides like this one aim to provide you with a starting point by offering a straightforward framework for working with Qwak. However, we encourage you to explore our [feature store overview](https://docs-saas.qwak.com/docs/feature-store-overview) and [streaming documentation](https://docs-saas.qwak.com/docs/streaming-feature-set) for a more comprehensive explanation.

Before diving in, make sure you have the Qwak SDK installed and authenticated. If you haven't done so already, follow these steps:

1. [Install the Qwak SDK](https://docs-saas.qwak.com/docs/installing-the-qwak-sdk) - Ensure you have the SDK installed on your local environment.
2. [Authenticate](https://docs-saas.qwak.com/docs/installing-the-qwak-sdk#1-via-qwak-cli) - Authenticate with a new Personal or Service Qwak API Key.



## Create the Kafka Streaming Data Source

In Qwak, a Data Source serves as a configuration object that specifies how to access and fetch your data. It includes metadata such as name and description, connection details to the Kafka brokers, instructions for handling the schema of the incoming events, and any relevant configuration settings specific to your Kafka environment.

### Components of a Streaming Data Source:

1. **Metadata**: Includes information like name, description, etc.
2. **Topic**: The Kafka topic to subscribe and ingest from
3. **Bootstrap Servers**: The explicit Kafka brokers to connect to in your Kafka cluster. 
4. **Deserializer**: A function to handle the schema of the incoming Kafka event. This can be done automatically using our [Generic Deserializer](https://docs-saas.qwak.com/docs/streaming-data-sources#generic-deserializer-1) or using a [Custom Deserializer](https://docs-saas.qwak.com/docs/streaming-data-sources#custom-deserializer-1)
5. **Passthrough Configs**: Any additional Kafka related settings such as offset related configurations. 




In the following example, we'll demonstrate connecting to an example Kafka topic that we have configured within the Qwak enviornment. The topic produces Transaction events that have the following JSON schema:

```
{
    "timestamp": "2024-01-01T00:00:00",
    "user_id": "eab027fc-3a65-4d02-95af-d8754e27b7d0",
    "transaction_amount": 151.76
}
```
In addition to ingesting the transaction events, we will also define transformations on the ingestion pipeline that will generate aggregated calculations on the `transaction_amount` column to produce additional data points for our Machine Learning model. 


## To create the ingestion pipeline, we will walk through the following steps:
1. Configure the deserialization function to handle the incoming events with proper schema
2. Define the Kafka Streaming Data Source
3. Define Kakfa Streaming Feature Set Pipeline including metadata and streaming aggregations

## Defining the Deserializtion Function

As mentioned above, Qwak provides two methods for handling the schema of incoming events:
- Generic Deserializer
- Custom Deserializer

# Generic Deserializer

The Generic Deserializer will automatically infer and parse the incoming Kafka message based on event schema. The Generic Deserializer is simple to use, but is best used for cases of non-complex Kafka messages, like the schema we have mentioned above. The Generic Deserializer makes a few assumptions:
- The schema of the message/event is either AVRO or JSON
- The message data to be parsed out will be stored under the `value` field
- Compatible data types are in accordance to Apache Spark data types - https://spark.apache.org/docs/3.1.1/sql-ref-datatypes.html

Here is a sample configuration using the message schema above

In [10]:
from qwak.feature_store.data_sources import GenericDeserializer, MessageFormat


event_schema = {
  "type": "struct", 
  "fields": [ 
    { 
      "metadata": {}, 
      "name": "timestamp",
      "nullable": True,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "key",
      "nullable": True,
      "type": "integer"
    },
    {
      "metadata": {},
      "name": "value",
      "nullable": True,
      "type": "integer",
      "fields": [
          {
              "metadata": {},
              "name": "timestamp",
              "nullable": True,
              "type": "string"
          },
          {
              "metadata": {},
              "name": "user_id",
              "nullable": True,
              "type": "string"
          },
          {
              "metadata": {},
              "name": "transaction_amount",
              "nullable": True,
              "type": "decimal"
          }
      ]
    }
  ]
}

deserializer = GenericDeserializer(message_format=MessageFormat.JSON, schema=str(event_schema))

# Custom Deserializer

The custom deserializer allows you to specify a python function that can be used to interpret, parse and format the incoming event. The custom deserializer function should accept a Pyspark Dataframe as an input return a Pyspark Dataframe as an output. The custom deserializer allows you to handle complex schemas in an easier way, allowing you to manipulate the keys before passing into the ingestion pipeline. 


In the function you will define an output schema using the Spark StructType and StructField objects. Then using the from_json Spark function, you will parse the input json into the schema you defined.

Here is a sample deserialization function and CustomDeserializer implementation using the same transaction event schema above

In [13]:
from qwak.feature_store.data_sources import KafkaSource, CustomDeserializer
from pyspark.sql.functions import col, from_json
def deser_function(df):
    schema = StructType(
        [
            StructField("timestamp", TimestampType()),
            StructField("user_id", StringType()),
            StructField("transaction_amount", IntegerType()),
        ]
    )
    deserialized = df.select(col("partition"), col("topic"), col("offset"),
                             from_json(col("value").cast(StringType()), schema).alias("data")
                             ).select(col("data.*"), col("partition"), col("topic"), col("offset")) \
        .select(col("partition"), col("topic"), col("offset"), col("timestamp"), col("user_id"),
                col("transaction_amount"))

    return deserialized
deserializer = CustomDeserializer(function=deser_function)


# Define the DataSource

Now that we have a function to handle the incoming event schemas, we can define our data source. We'll use the KafkaSource from the Qwak SDK. When defining the Data source using the Qwak SDK, we'll include the deserialization function in the same file as the Data Source definition. In this example, we'll use the custom deserializer defined above. 


In [1]:
%%writefile data_source.py

from pyspark.sql.types import (
    IntegerType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)
from qwak.feature_store.data_sources import KafkaSource, CustomDeserializer
from pyspark.sql.functions import col, from_json
def deser_function(df):
    schema = StructType(
        [
            StructField("timestamp", TimestampType()),
            StructField("user_id", StringType()),
            StructField("transaction_amount", IntegerType()),
        ]
    )
    deserialized = df.select(col("partition"), col("topic"), col("offset"),
                             from_json(col("value").cast(StringType()), schema).alias("data")
                             ).select(col("data.*"), col("partition"), col("topic"), col("offset")) \
        .select(col("partition"), col("topic"), col("offset"), col("timestamp"), col("user_id"),
                col("transaction_amount"))

    return deserialized
deserializer = CustomDeserializer(function=deser_function)

# replace with your own Kafka brokers
bootstrap_servers = 'b-2.qwak-cluster.za36zh.c6.kafka.us-east-1.amazonaws.com:9094,b-1.qwak-cluster.za36zh.c6.kafka.us-east-1.amazonaws.com:9094,b-3.qwak-cluster.za36zh.c6.kafka.us-east-1.amazonaws.com:9094'

# replace with your own Kafka passthrough configurations
passthrough_configs = {
    "qwak.online.maxOffsetsPerTrigger": "200000",
    "startingOffsets": "earliest",
}


kafka_source = KafkaSource(
    # metadata name of the DataSource within Qwak
    name="transactions",
    # metadata description of the DataSource within Qwak
    description="Transaction Test Source",
    # Bootstrap servers of the Kafka client to be connected to
    bootstrap_servers=bootstrap_servers,
    # Kafka topic to be ingested
    subscribe="transactions",
    # Deserialization function described above
    deserialization=deserializer,
    # Passthrough Kafka configs for offset settings, defined above
    passthrough_configs=passthrough_configs,
)


Overwriting data_source.py


### Additional Considerations for Registering Data Sources

When registering Data Sources in Qwak, it's essential to ensure that the underlying data store is accessible by the platform. Depending on your deployment model (Hybrid or SaaS), there are different ways to grant Qwak access to your data.

#### Accessing AWS Resources:

If your data is stored in AWS services, you can grant access to Qwak using an IAM role ARN. For detailed instructions, refer to our documentation on [Accessing AWS Resources with IAM Role](https://docs-saas.qwak.com/docs/accessing-aws-resources-with-iam-role).

#### Using Qwak Secrets:

Alternatively, you can pass the credentials as Qwak Secrets. This approach provides a secure way to manage and authenticate access to your data. For more information, see [Qwak Secret Management](https://docs-saas.qwak.com/docs/secret-management).

For more information about the types of Data Sources supported by Qwak, refer to our documentation:
- [Batch Data Sources](https://docs-saas.qwak.com/docs/batch-data-sources)
- [Streaming Data Sources](https://docs-saas.qwak.com/docs/streaming-data-sources)

<br>

### Sampling Data from the Data Source

It's important to note that the data source cannot be used as a query engine independently (for now). Instead, it serves as a sampling mechanism to verify that the data is being queried properly.


In [2]:
%run data_source.py

df_sample = kafka_source.get_sample()
print (f"Data Source Data Types:\n\n{df_sample.dtypes}\n")
print (f"Data Source Sample :\n\n{df_sample.head(7).to_string()}\n")

Data Source Data Types:

partition                      int64
topic                         object
offset                         int64
timestamp             datetime64[ns]
user_id                       object
transaction_amount             int64
dtype: object

Data Source Sample :

   partition         topic     offset               timestamp                               user_id  transaction_amount
0          0  transactions  163191965 2024-04-16 19:39:49.972  02d7f29f-90b3-47f2-9952-126bdd18c378             1788481
1          0  transactions  163191966 2024-04-16 19:39:50.012  5abb1d49-a1aa-4f2a-92a2-6f537ec7421d             1788483
2          0  transactions  163191967 2024-04-16 19:39:50.052  72878824-25ec-4a5b-a4db-6ed18009691d             1788485
3          0  transactions  163191968 2024-04-16 19:39:50.092  455d686b-d8f2-4e2c-aa6b-851e9b3de446             1788487
4          0  transactions  163191969 2024-04-16 19:39:50.132  15b6edce-9b91-4c1f-aae9-2247d0d1a427             1788

## Registering the Data Source with the Qwak Platform

After verifying that the Data Source returns the desired results, the next step is to register it with the Qwak Platform.

In [5]:
!echo "Y" | qwak features register -p data_source.py

[K[?25h[34m✅[0m Finding Entities to register (0:00:00.04)
👀 Found 0 Entities
----------------------------------------
[K[?25h[34m✅[0m Finding Data Sources to register (0:00:00.00)
👀 Found 1 Data Sources
Validating 'transactions' data source
[K[?25h[34m✅[0m  (0:00:03.83)
✅ Validation completed successfully, got data source columns:
column name         type
------------------  ---------
partition           int
topic               string
offset              bigint
timestamp           timestamp
user_id             string
transaction_amount  int
Update existing Data Source 'transactions' from source file '/Users/hudsonbuzby/dev/qwak-examples/credit_risk_streaming/data_source.py'?
continue? [y/N]: ----------------------------------------
[K[?25h[34m✅[0m Finding Feature Sets to register (0:00:00.00)
👀 Found 0 Feature Set(s)


<hr><br>

## Creating the Streaming Feature Set from the Kafka Data Source

The Streaming Feature Set is creating using the @streaming.feature_set decorator from the Qwak SDK. To create a streaming feature set, you'll need to define the following properties :

- **@Metadata:** Includes feature set name, key, data sources, and the timestamp column used for indexing.
- **@streaming.Metadata:** Additional metadata including display name, feature set description, and feature set owner.
- **Scheduling Expression:** The rate at which the offline and online store are refreshed
- **Cluster Type:** Specifies the resources to use for running both the online and offline ingestion job.
- **Backfill:** Determines how far back in time the Feature Set should ingest data.
- **Transformation:** Can be SQL-based, UDF-based (currently Koalas), or Qwak Aggregation logic for data transformation

For more information related to the resources used in the cluster templates, check out [Qwak docs](https://docs-saas.qwak.com/docs/instance-sizes#feature-store).


Let's take a look at a sample streaming feature set pipeline using the Aggregation capabilities of the Qwak Streaming Feature Store                                                            

## Defining the Base Feature Set Object

In [None]:
from qwak.feature_store.feature_sets import streaming

# Streaming Feature Set decorator from Qwak
@streaming.feature_set(
    # Name of the feature set
    name="credit-risk-streaming",
    # Kafka Data Source created in previous step
    data_sources=["transactions"],
    # Entity or Key Column, Qwak will use this to distinguish uniqueness in the feature set
    key="user_id",
    # Timestamp column, Qwak will use this to deduplicate records and schedule trigger windows
    timestamp_column_name="timestamp"),
    # A crontab definition of the the offline ingestion policy - which affects the data freshness the offline store. defaults to */30 * * * * (every 30 minutes)
    offline_scheduling_policy="0 * * * *",
    # Defines the online ingestion policy - which affects the data freshness of the online store. Defaults to 5 seconds.
    online_trigger_interval=30
)

## Defining the Feature Set MetaData

In [None]:
from qwak.feature_store.feature_sets import streaming

# Streaming Feature Set decorator from Qwak
@streaming.metadata(
        # Display name to be used by the UI
        display_name="Credit Risk Streaming",
        # Description to be displayed in the UI
        description="streaming transaction aggregations over 1,15,30,60 minutes",
        # User owner of the feature set
        owner="hudson@qwak.com"
)

## Defining the Feature Set Cluster Specification

In [None]:
from qwak.feature_store.feature_sets.execution_spec import ClusterTemplate

# Streaming Feature Set decorator from Qwak
@streaming.execution_specification(
    # Cluster resources to be used for Online Ingestion
    online_cluster_template=ClusterTemplate.SMALL,
    # Cluster resources to be used for Offline Ingestion
    offline_cluster_template=ClusterTemplate.MEDIUM,
)


## Defining the Feature Set Transformation

You can define the Streaming Feature set transformation logic as a SparkSQL query, a series of UDF transformations, or Aggregations using the Qwak SDK. At the end of your Streaming Feature Set definition, you will define a function with no arguments that returns a SparkSqlTransformation. For a simple SQL transformation, you can do something like the following command. 

### SQL Transformation

In [None]:
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

def transform():
    return SparkSqlTransformation(sql="""
        SELECT timestamp,
        user_id,
        transaction_amount
        FROM transactions""")

### Defining the Feature Set Transformation Logic with Aggregation

Qwak provides advanced support for Streaming Aggregations using the QwakAggregation functionality of the Qwak SDK. An aggregation takes a Data Source column as an argument. The column must be defined in the schema of the Data Source. For aggregations that are `last_n` or percentile based, you can pass in an additional argument that specifies the quantity of last_n or the percentile to be aggregated. 

The following list is the aggregations currently supported by Qwak


- **SUM** - a sum of column, for example, QwakAggregation.sum("transaction_amount")
- **COUNT** - count (not distinct), a column is specified for API uniformity. for example, QwakAggregation.count("transaction_amount")
- **AVERAGE** - mean value, for example QwakAggregation.avg("transaction_amount")
- **MIN** - minimum value, for example QwakAggregation.min("transaction_amount")
- **MAX** - maximum value, for example QwakAggregation.max("transaction_amount")
- **BOOLEAN OR** - boolean or, defined over a boolean column, for example QwakAggregation.boolean_or("is_remote")
- **BOOLEAN AND** - boolean and, defined over a boolean column, for example QwakAggregation.boolean_and("is_remote")
- **Sample Variance** - QwakAggregation.sample_variance("transaction_amount")
- **Sample STDEV** - QwakAggregation.sample_stdev("transaction_amount")
- **Population Variance** - QwakAggregation.population_variance("transaction_amount")
- **Population STDEV** - QwakAggregation.population_stdev("transaction_amount")


Take a look at an example aggregation transformation defined below. This aggregation takes our transaction data source, and takes the sum, count, max, standard deviation, last 5 records, last 5 disctinct records, and the 50th percentile aggregation amounts. We also specify the window amount after the aggregation, which determines how many windows of time we will aggregate across. So for the 7 aggregations defined below, we are able to create 28 aggregation features all in the same application! You can also add alias notation to rename some of the aggregations. 

In [None]:
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation, QwakAggregation


def transform():
    return SparkSqlTransformation(sql="""SELECT * FROM transactions""")\
            .aggregate(QwakAggregation.sum("transaction_amount"))\
            .aggregate(QwakAggregation.count("transaction_amount"))\
            .aggregate(QwakAggregation.max("transaction_amount"))\
            .aggregate(QwakAggregation.sample_stdev("transaction_amount"))\
            .aggregate(QwakAggregation.last_n("transaction_amount", 5))\
            .aggregate(QwakAggregation.last_distinct_n("transaction_amount", 5))\
            .aggregate(QwakAggregation.percentile("transaction_amount", 50)\
                    .alias("median_transaction_amount"))\
            .by_windows("1 minute", "15 minutes", "30 minutes", "1 hour")

# Build the Streaming Feature Set

Now, lets put the streaming feature set definition, metadata, cluster spec, and transformation logic all together so we can define the feature set. 

In [3]:
%%writefile streaming_feature_set.py

from qwak.feature_store.feature_sets import streaming
from qwak.feature_store.feature_sets.execution_spec import ClusterTemplate
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation, QwakAggregation


@streaming.feature_set(
    name="credit-risk-streaming",
    data_sources=["transactions"],
    key="user_id",
    timestamp_column_name="timestamp",
    offline_scheduling_policy="0 * * * *",
    online_trigger_interval=30
)
@streaming.metadata(
        display_name="Credit Risk Streaming",
        description="streaming transaction aggregations over 1,15,30,60 minutes",
        owner="hudson@qwak.com"
)

@streaming.execution_specification(
    online_cluster_template=ClusterTemplate.SMALL,
    offline_cluster_template=ClusterTemplate.MEDIUM,
)
def transform():
    return SparkSqlTransformation(sql="""SELECT * FROM transactions""")\
            .aggregate(QwakAggregation.sum("transaction_amount"))\
            .aggregate(QwakAggregation.count("transaction_amount"))\
            .aggregate(QwakAggregation.max("transaction_amount"))\
            .aggregate(QwakAggregation.sample_stdev("transaction_amount"))\
            .aggregate(QwakAggregation.last_n("transaction_amount", 5))\
            .aggregate(QwakAggregation.last_distinct_n("transaction_amount", 5))\
            .aggregate(QwakAggregation.percentile("transaction_amount", 50)\
                    .alias("median_transaction_amount"))\
            .by_windows("1 minute", "15 minutes", "30 minutes", "1 hour")

Overwriting streaming_feature_set.py


## Sampling the Data Source and Printing Data and Data Types

If your data source takes more than 5 minutes to query or fetch a sample of the data (for example, due to long-running queries), your sampling process may fail with a timeout error. In such cases, you can skip validation during registration with Qwak and proceed to register your feature set, allowing it to run an ingestion job.

### Note:
The sampling process is essential for verifying that the data is queried properly. However, if it takes too long, you can proceed with the registration without validation and rely on the ingestion job to ensure data correctness.


In [4]:
%run streaming_feature_set.py

df_sample = transform.get_sample()
print (f"Data Source Data Types:\n\n{df_sample.dtypes}\n")
print (f"Data Source Sample :\n\n{df_sample}\n")

Data Source Data Types:

user_id                                    object
count_transaction_amount_1h                 int64
last_distinct_5_transaction_amount_1h      object
sum_transaction_amount_1h                   int64
sample_stdev_transaction_amount_1h        float64
median_transaction_amount_1h                int64
last_5_transaction_amount_1h               object
max_transaction_amount_1h                   int64
count_transaction_amount_30m                int64
last_distinct_5_transaction_amount_30m     object
sum_transaction_amount_30m                  int64
sample_stdev_transaction_amount_30m       float64
median_transaction_amount_30m               int64
last_5_transaction_amount_30m              object
max_transaction_amount_30m                  int64
count_transaction_amount_1m                 int64
last_distinct_5_transaction_amount_1m      object
sum_transaction_amount_1m                   int64
sample_stdev_transaction_amount_1m        float64
median_transaction_amount

## Registering the Streaming Feature Set

Now that your Streaming Aggregation Feature Set is defined, you can register it with Qwak so that it continuously runs and ingests data. Run the following command to complete the registration process. 

In [30]:
!echo "Y" | qwak features register -p streaming_feature_set.py

[K[?25h[34m✅[0m Finding Entities to register (0:00:00.14)
👀 Found 0 Entities
----------------------------------------
[K[?25h[34m✅[0m Finding Data Sources to register (0:00:00.00)
👀 Found 0 Data Sources
----------------------------------------
[K[?25h[34m✅[0m Finding Feature Sets to register (0:00:00.00)
👀 Found 1 Feature Set(s)
Update existing feature set 'credit-risk-streaming' from source file '/Users/hudsonbuzby/dev/qwak-examples/credit_risk_streaming/streaming_feature_set.py'?
continue? [y/N]: Validating 'credit-risk-streaming' feature set
[34m⠏[0m  (0:00:17.52)[?25h^C
[K[?25hm  (0:00:17.77)

<hr><br>

## Consuming Features from the Offline Feature Store (Training/Batch Inference)

To retrieve features from the Offline Feature Store for training or batch inference, you can use the `get_feature_values` method:

1. **get_feature_values**:
   - Fetches records associated with the provided set of keys, inserted at a specific timestamp.
   - Query date must fall between the start and end timestamp.


You can read more about the feature store retrieval methods here [our docs](https://docs-saas.qwak.com/docs/getting-features-for-training#get-feature-values). 

In [45]:
from qwak.feature_store.offline import OfflineClientV2
from qwak.feature_store.offline.feature_set_features import FeatureSetFeatures
import os

from datetime import datetime
import pandas as pd


offline_feature_store = OfflineClientV2()

streaming_features = FeatureSetFeatures(
            feature_set_name='transaction-aggregates-demo',
            feature_names=['count_transaction_amount_1m',
                        'last_distinct_5_transaction_amount_1m',
                        'sum_transaction_amount_1m',
                        'sample_stdev_transaction_amount_1m',
                        'median_transaction_amount_1m',
                        'max_transaction_amount_1m',
                        'count_transaction_amount_1h',
                        'sum_transaction_amount_1h',
                        'sample_stdev_transaction_amount_1h',
                        'median_transaction_amount_1h',
                        'max_transaction_amount_1h'
                        ]
        )

# Provide a Dataframe of entity id's and timestamps within the feature set ingestion 
# user_id | timestamp | label

population_df = pd.read_csv("main/population.csv")


features: pd.DataFrame = offline_feature_store.get_feature_values(
            features=[streaming_features],
            population=population_df
        )

print(features)   


 


    label                timestamp                               user_id  \
0       0  2024-03-27 15:21:32.000  149ca9e4-0e4a-48e8-b9e2-263b23d371c0   
1       0  2024-03-20 15:21:32.000  149ca9e4-0e4a-48e8-b9e2-263b23d371c0   
2       0  2024-03-27 15:21:32.000  742f09a6-b88b-4518-aa7b-431aa8ae8a16   
3       0  2024-03-27 15:21:32.000  72878824-25ec-4a5b-a4db-6ed18009691d   
4       0  2024-03-27 15:21:32.000  8410fd1f-4f76-4375-8125-df3fe383cc60   
..    ...                      ...                                   ...   
95      0  2024-03-20 15:21:32.000  95ec0c53-4e27-4490-b85f-1448de70fc26   
96      0  2024-03-20 15:21:32.000  ca2da30a-ffde-4334-87e5-7dae3bebb5db   
97      0  2024-03-20 15:21:32.000  89161639-e300-4789-afee-d675cfa383e1   
98      0  2024-03-27 15:21:32.000  5a114060-1038-4dc6-a038-3b7423cf2c16   
99      0  2024-03-27 15:21:32.000  2c39a950-04e0-43d5-bbba-bea628309b0c   

    transaction-aggregates-demo.count_transaction_amount_1m  \
0                       

### Joining Features

You can also join feature sets when retrieving from the feature store, including joining streaming and batch feature sets. As long as the feature sets share a common key/entity, you can pull features from both sets in the same query. Imagine we have a batch feature set of user credit features  such as job, credit_amount, duration, etc. Check out the example below to join streaming and batch feature sets

In [8]:
from qwak.feature_store.offline import OfflineClientV2
from qwak.feature_store.offline.feature_set_features import FeatureSetFeatures
import os

from datetime import datetime
import pandas as pd

offline_feature_store = OfflineClientV2()
population_df = pd.read_csv("population.csv")

streaming_features = FeatureSetFeatures(
    feature_set_name='transaction-aggregates-demo',
    feature_names=['count_transaction_amount_1m',
                'last_distinct_5_transaction_amount_1m',
                'sum_transaction_amount_1m',
                'sample_stdev_transaction_amount_1m',
                'median_transaction_amount_1m',
                'max_transaction_amount_1m',
                'count_transaction_amount_1h',
                'sum_transaction_amount_1h',
                'sample_stdev_transaction_amount_1h',
                'median_transaction_amount_1h',
                'max_transaction_amount_1h'
                ]
)

batch_features = FeatureSetFeatures(
    feature_set_name='qwak-snowflake-webinar',
    feature_names=['job','credit_amount','duration','purpose','risk']
)
features = [streaming_features, batch_features]
offline_feature_store.get_feature_values(
    features=features,
    population=population_df
)

Unnamed: 0,label,timestamp,user_id,qwak-snowflake-webinar.job,qwak-snowflake-webinar.credit_amount,qwak-snowflake-webinar.duration,qwak-snowflake-webinar.purpose,qwak-snowflake-webinar.risk,transaction-aggregates-demo.count_transaction_amount_1m,transaction-aggregates-demo.last_distinct_5_transaction_amount_1m,transaction-aggregates-demo.sum_transaction_amount_1m,transaction-aggregates-demo.sample_stdev_transaction_amount_1m,transaction-aggregates-demo.median_transaction_amount_1m,transaction-aggregates-demo.max_transaction_amount_1m,transaction-aggregates-demo.count_transaction_amount_1h,transaction-aggregates-demo.sum_transaction_amount_1h,transaction-aggregates-demo.sample_stdev_transaction_amount_1h,transaction-aggregates-demo.median_transaction_amount_1h,transaction-aggregates-demo.max_transaction_amount_1h
0,0,2024-03-27 15:21:32.000,5f97899e-de8a-4527-b48b-633ec77dc722,1,3590,12,furniture/equipment,good,60,"[1500164, 1500114, 1500064, 1500014, 1499964]",89921340,873.21246,1498664,1500164,3600,5076680400,51968.740604,1410164,1500164
1,0,2024-03-27 15:21:32.000,742f09a6-b88b-4518-aa7b-431aa8ae8a16,0,750,18,education,bad,60,"[1500166, 1500116, 1500066, 1500016, 1499966]",89921460,873.21246,1498666,1500166,3600,5076687600,51968.740604,1410166,1500166
2,0,2024-03-27 15:21:32.000,d5eef17c-b25b-4c47-b29c-f02a5f1bd0ef,1,10613,21,car,good,60,"[1500179, 1500129, 1500079, 1500029, 1499979]",89922240,873.21246,1498679,1500179,3600,5076734400,51968.740604,1410179,1500179
3,0,2024-03-20 15:21:32.000,5f97899e-de8a-4527-b48b-633ec77dc722,1,3590,12,furniture/equipment,good,60,"[3239664, 3239614, 3239564, 3239514, 3239464]",194291340,873.21246,3238164,3239664,3600,11338880400,51968.740604,3149664,3239664
4,0,2024-03-20 15:21:32.000,742f09a6-b88b-4518-aa7b-431aa8ae8a16,0,750,18,education,bad,60,"[3239666, 3239616, 3239566, 3239516, 3239466]",194291460,873.21246,3238166,3239666,3600,11338887600,51968.740604,3149666,3239666
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,0,2024-03-27 15:21:32.000,73142300-493b-45f5-b355-3dadd54b0c13,3,2994,39,education,bad,60,"[1500191, 1500141, 1500091, 1500041, 1499991]",89922960,873.21246,1498691,1500191,3600,5076777600,51968.740604,1410191,1500191
96,1,2024-03-20 15:21:32.000,a3a2eaf9-5d8f-46dd-9cf4-9d186c973200,1,9883,41,radio/TV,good,60,"[3239656, 3239606, 3239556, 3239506, 3239456]",194290860,873.21246,3238156,3239656,3600,11338851600,51968.740604,3149656,3239656
97,0,2024-03-20 15:21:32.000,5d1d338c-4ac9-447b-b388-823951ede8f6,2,13194,11,radio/TV,bad,60,"[3239684, 3239634, 3239584, 3239534, 3239484]",194292540,873.21246,3238184,3239684,3600,11338952400,51968.740604,3149684,3239684
98,0,2024-03-20 15:21:32.000,181153df-581b-4cf9-9649-aee2eedb25d5,1,5813,20,car,bad,60,"[3239640, 3239590, 3239540, 3239490, 3239440]",194289900,873.21246,3238140,3239640,3600,11338794000,51968.740604,3149640,3239640


In [None]:
###  Consuming Streaming Features from the Online Feature Store using the Online Client

Similar to the OfflineClient, the OnlineClient allows you to directly query the streaming aggregated features being stored in the Online Store

Like we queried in the previous step, we'll pass in a list of entity id's that will be used to retrieve the values from the online feature store. Because the Online Store functions as a key/value lookup, we won't need to pass in a timestamp as there is only one value stored per entity in the Online Feature Store




In [None]:
import pandas as pd
from qwak.feature_store.online.client import OnlineClient
from qwak.model.schema_entities import FeatureStoreInput
from qwak.model.schema import ModelSchema, InferenceOutput, FeatureStoreInput, Entity

FEATURE_SET = 'transaction-aggregates-demo'

model_schema = ModelSchema(
            inputs=[
                FeatureStoreInput(name=f'{FEATURE_SET}.count_transaction_amount_1m'),
                FeatureStoreInput(name=f'{FEATURE_SET}.last_distinct_5_transaction_amount_1m'),
                FeatureStoreInput(name=f'{FEATURE_SET}.sum_transaction_amount_1m'),
                FeatureStoreInput(name=f'{FEATURE_SET}.sample_stdev_transaction_amount_1m'),
                FeatureStoreInput(name=f'{FEATURE_SET}.median_transaction_amount_1m')
            ],
            outputs=[InferenceOutput(name="credit_score", type=float)]
        )
    
online_client = OnlineClient()

df = pd.DataFrame(columns=['user',],
                  data   =[['e41160de-0a56-47cf-8193-a0c97fe2e752'],
                           ['b0ca3ac4-5432-4c21-8251-a6ae0d3ad874'],
                           ['4b7af572-b249-4bae-9815-10ed3a2cd01d']])
                  
online_features = online_client.get_feature_values(model_schema, df)


print(f"\n\Realtime features extracted:\n\n{online_features.to_string()}\n")

###  Enriching Inference Requests with Features from Online Store

Qwak also natively integrates the Model runtime with the Feature Store, offering an easy way to leverage very low-latency feature retrieval. This is done without specifically running a query, just by sending the feature set key in the model request input. This will automatically extract the latest features for that `key`, in our case `user_id` during a model serving request.

Below is a sample model end to end CreditRisk Model that utilizes both the OfflineClient for retrieving training data, and the OnlineClient for retrieving online data for inference. In the `predict()` method, you'll notice the `extract_features` flag set to True. Once this flat is enabled, Qwak will natively integrate the Online Feature Store, pulling features based on the entity key, `user_id`, provided in the input requests. 

You can also find a reference to this model in the credit_risk_streaming model in the examples repository - https://github.com/qwak-ai/qwak-examples/blob/feature_store_examples/credit_risk_streaming/main/model.py. Check out the full example with enviornment file and deployment steps to build and deploy this model as a real-time endpoint. 

In [None]:
import numpy as np
from qwak.feature_store.offline import OfflineClientV2
from qwak.feature_store.offline.feature_set_features import FeatureSetFeatures
import datetime
import pandas as pd
import qwak
from qwak.model.base import QwakModel
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from catboost import CatBoostClassifier, Pool
import pandas as pd
from qwak.model.tools import run_local



import os

RUNNING_FILE_ABSOLUTE_PATH = os.path.dirname(os.path.abspath(__file__))


class StreamingRiskModel(QwakModel):

    def __init__(self):
        self.params = {
            'iterations': 100,
            'learning_rate': 0.1,
            'eval_metric': 'Accuracy',
            'logging_level': 'Silent',
            'use_best_model': True
        }
        self.catboost = CatBoostClassifier(**self.params)
        self.metrics = {
            'accuracy': 90,
            'random_state': 42,
            'test_size': .25
        }
        qwak.log_param(self.params)


    def fetch_features(self):
        """
        Read data from the offline feature store
        :return: Feature Store DF
        """
        print("Fetching data from the feature store")
        offline_feature_store = OfflineClientV2()
        population_df = pd.read_csv(f"{RUNNING_FILE_ABSOLUTE_PATH}/population.csv")

        streaming_features = FeatureSetFeatures(
            feature_set_name='transaction-aggregates-demo',
            feature_names=['count_transaction_amount_1m',
                        'last_distinct_5_transaction_amount_1m',
                        'sum_transaction_amount_1m',
                        'sample_stdev_transaction_amount_1m',
                        'median_transaction_amount_1m',
                        'max_transaction_amount_1m',
                        'count_transaction_amount_1h',
                        'sum_transaction_amount_1h',
                        'sample_stdev_transaction_amount_1h',
                        'median_transaction_amount_1h',
                        'max_transaction_amount_1h'
                        ]
        )

        batch_features = FeatureSetFeatures(
            feature_set_name='qwak-snowflake-webinar',
            feature_names=['job','credit_amount','duration','purpose','risk']
        )
        features = [streaming_features, batch_features]
        return offline_feature_store.get_feature_values(
            features=features,
            population=population_df
        )

    def build(self):
        """
        Build the Qwak model:
            1. Fetch the feature values from the feature store
            2. Train a naive Catboost model
        """
        df = self.fetch_features()
        print(df.columns)
        train_df = df[["qwak-snowflake-webinar.job", "qwak-snowflake-webinar.credit_amount", "qwak-snowflake-webinar.duration", "qwak-snowflake-webinar.purpose","transaction-aggregates-demo.count_transaction_amount_1m","transaction-aggregates-demo.sum_transaction_amount_1m","transaction-aggregates-demo.sample_stdev_transaction_amount_1m","transaction-aggregates-demo.median_transaction_amount_1m","transaction-aggregates-demo.max_transaction_amount_1m","transaction-aggregates-demo.count_transaction_amount_1h","transaction-aggregates-demo.sum_transaction_amount_1h","transaction-aggregates-demo.sample_stdev_transaction_amount_1h","transaction-aggregates-demo.median_transaction_amount_1h","transaction-aggregates-demo.max_transaction_amount_1h" ]]

        y = df["qwak-snowflake-webinar.risk"].map({'good':1,'bad':0})


        categorical_features_indices = np.where(train_df.dtypes != np.float64)[0]
        X_train, X_validation, y_train, y_validation = train_test_split(train_df, y, test_size=0.25, random_state=42)

        train_pool = Pool(X_train, y_train, cat_features=categorical_features_indices)
        validate_pool = Pool(X_validation, y_validation, cat_features=categorical_features_indices)

        print("Fitting catboost model")
        self.catboost.fit(train_pool, eval_set=validate_pool)

        y_predicted = self.catboost.predict(X_validation)
        f1 = f1_score(y_validation, y_predicted)
        
        qwak.log_metric({'f1_score': f1})
        qwak.log_metric({'iterations': self.params['iterations']})
        qwak.log_metric({'learning_rate': self.params['learning_rate']})
        qwak.log_metric({'accuracy': self.metrics['accuracy']})
        qwak.log_metric({'random_state': self.metrics['random_state']})
        qwak.log_metric({'test_size': self.metrics['test_size']})
        
    
        




    def schema(self):
        from qwak.model.schema import ModelSchema, InferenceOutput, FeatureStoreInput, Entity
        user_id = Entity(name="user_id", type=str)
        model_schema = ModelSchema(
            entities=[user_id],
            inputs=[
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.count_transaction_amount_1m"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.sum_transaction_amount_1m"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.sample_stdev_transaction_amount_1m"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.median_transaction_amount_1m"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.max_transaction_amount_1m"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.count_transaction_amount_1h"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.sum_transaction_amount_1h"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.sample_stdev_transaction_amount_1h"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.median_transaction_amount_1h"),
                FeatureStoreInput(entity=user_id, name="transaction-aggregates-demo.max_transaction_amount_1h"),
                FeatureStoreInput(entity=user_id, name='qwak-snowflake-webinar.job'),
                FeatureStoreInput(entity=user_id, name='qwak-snowflake-webinar.credit_amount'),
                FeatureStoreInput(entity=user_id, name='qwak-snowflake-webinar.duration'),
                FeatureStoreInput(entity=user_id, name='qwak-snowflake-webinar.purpose'),

            ],
            outputs=[
                InferenceOutput(name="Risk", type=float)
            ])
        return model_schema

    @qwak.api(feature_extraction=True)
    def predict(self, df, extracted_df):
        #### {"user_id": "xxxx-xxx-xxx-xxxx"}
        return pd.DataFrame(self.catboost.predict(extracted_df[["qwak-snowflake-webinar.job", "qwak-snowflake-webinar.credit_amount", "qwak-snowflake-webinar.duration", "qwak-snowflake-webinar.purpose","transaction-aggregates-demo.count_transaction_amount_1m","transaction-aggregates-demo.sum_transaction_amount_1m","transaction-aggregates-demo.sample_stdev_transaction_amount_1m","transaction-aggregates-demo.median_transaction_amount_1m","transaction-aggregates-demo.max_transaction_amount_1m","transaction-aggregates-demo.count_transaction_amount_1h","transaction-aggregates-demo.sum_transaction_amount_1h","transaction-aggregates-demo.sample_stdev_transaction_amount_1h","transaction-aggregates-demo.median_transaction_amount_1h","transaction-aggregates-demo.max_transaction_amount_1h" ]]),
                            columns=['Risk'])

