# ⚡️ Building Streaming Features

---

Real-time data can make all the difference for real-time models, but leveraging
it can be quite the challenge.

With Tecton you can build millisecond fresh features using plain Python and
without any complex streaming infrastructure! Best of all, you can test it all
locally and iterate in a notebook to quickly train better models that operate
consistently online and offline.

---

In this tutorial we will:

1. Create a streaming data source
2. Define and test streaming features
3. Query data online and offline

## ⚙️ Install Pre-Reqs

First things first, let's install the Tecton SDK and other libraries used in this tutorial by running the cell below.

In [1]:
%pip install 'tecton[rift]' gcsfs s3fs --quiet --upgrade

Note: you may need to restart the kernel to use updated packages.


## ✅ Log in to Tecton

Next we will authenticate with your organization's Tecton account and import libraries we will need.

*Note: You need to press `enter` after pasting in your authentication code.*

In [2]:
import tecton
import pandas as pd
from datetime import datetime
from pprint import pprint
import random, string

tecton.login("demo-pangolin.tecton.ai")  # replace with your URL

tecton.set_validation_mode("auto")
tecton.conf.set("TECTON_OFFLINE_RETRIEVAL_COMPUTE_MODE", "rift")

tecton.version.summary()

Already logged in to https://demo-pangolin.tecton.ai as UserProfile(name='Jonathan Varley', email='jon@tecton.ai', id='00ut35dahebreB27E357'). To switch users, run `tecton.logout` then `tecton.login`
Version: 0.9.12
Git Commit: 7b1322f6df430b497a8fd0535186da3bf3ee6612
Build Datetime: 2024-06-25T14:37:08


Now we're ready to build!

## 🌊 Create a Stream Source for ingesting real-time data

First, let's define a local Stream Source that supports
[ingesting real-time data](https://docs.tecton.ai/docs/defining-features/data-sources/creating-a-data-source/creating-and-testing-a-push-source).
Once productionized, this will give us an online HTTP endpoint to push events to
in real-time which Tecton will then transform into features for online
inference.

As part of our Stream Source, we also register a historical log of the stream
via the `batch_config` parameter. Tecton uses this historical log for backfills
and offline development.

---

##### 💡 **TIP**

Alternatively, you can have Tecton maintain this historical log for you! Simply add the `log_offline=True` parameter to the `PushConfig` and omit the `batch_config`. With this setup Tecton will log all ingested events and use those to backfill any features that use this source.

---

In [3]:
from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64


transactions_stream = StreamSource(
    name="transactions_stream",
    stream_config=PushConfig(),
    batch_config=FileConfig(
        uri="s3://mft-porter-data/tutorials/transactions.pq",
        file_format="parquet",
        timestamp_field="timestamp",
    ),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)

## 📊 Test the new Stream Source

We can pull a range of offline data from a Stream Source's historical event log
using `get_dataframe()`.

In [4]:
start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)

df = transactions_stream.get_dataframe(start, end).to_pandas()
display(df.head(5))

StreamSource 'transactions_stream': Deriving schema.
StreamSource 'transactions_stream': Successfully validated.


Unnamed: 0,timestamp,user_id,transaction_id,merchant,merch_lat,merch_long,amount
0,2023-05-01 00:04:37.805216,user_8041734544,8e261b034cf32c445ca6e66f648f88d6,LensCrafters,84.660493,162.841237,75.42
1,2023-05-01 00:18:15.088549,user_6971829885,7ff11a5b0167b2bd66f36f2fd2de2fa5,Cabela's,82.088167,-103.099069,11.12
2,2023-05-01 00:24:03.157971,user_9619731767,7c890265fe39dd25ced53d873fd0eb73,Forever 21,-23.56185,102.008712,569.99
3,2023-05-01 00:31:32.125634,user_4856370219,a92220b1702b9e21e80cf9a005e6ae2a,Sonic Automotive,1.146113,-53.365226,1.51
4,2023-05-01 00:32:47.850232,user_4133774204,e2cabe6ef03ea6183df8277962262bce,Value City Furniture,-2.152083,-26.665665,61.67


## 👩‍💻 Define and test streaming features locally

Now that we have a Stream Source defined, we are ready to create some features.

Let's use this data source to create the following 3 features:

- A user's total transaction amount in the last 1 minute
- A user's total transaction amount in the last 1 hour
- A user's total transaction amount in the last 30 days

To build these features, we will define a Stream Feature View that consumes from
our `transactions` Stream Source.

The Stream Feature View transformation operates on events in a Pandas Dataframe
and can do any arbitrary projections, filters, or expressions as needed. It's
just Python!

---

##### ℹ️ **INFO**

The Python transformation runs *before* the aggregations so you can transform data as needed before it is aggregated.

---

In [5]:
from tecton import Entity, stream_feature_view, Aggregation
from datetime import datetime, timedelta


user = Entity(name="user", join_keys=["user_id"])


@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode="pandas",
    aggregations=[
        Aggregation(function="sum", column="amount", time_window=timedelta(minutes=1)),
        Aggregation(function="sum", column="amount", time_window=timedelta(hours=1)),
        Aggregation(function="sum", column="amount", time_window=timedelta(days=30)),
    ],
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)
def user_transaction_amount_totals(transactions_stream):
    return transactions_stream[["user_id", "timestamp", "amount"]]

## 🧪 Test features interactively

Now that we've defined and validated our Feature View, we can use
`get_features_in_range` to produce a range of feature values and check out the
feature data.

---

##### ℹ️ **INFO**

These features are calculated against the Stream Source's historical event log.

---

In [6]:
start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)

df = user_transaction_amount_totals.get_features_in_range(start_time=start, end_time=end).to_pandas()

display(df.head(5))

StreamFeatureView 'user_transaction_amount_totals': Validating 2 of 3 dependencies. (1 already validated)
    Entity 'user': Successfully validated.
    Transformation 'user_transaction_amount_totals': Successfully validated.
StreamFeatureView 'user_transaction_amount_totals': Successfully validated.


Unnamed: 0,user_id,amount_sum_1m_continuous,amount_sum_1h_continuous,amount_sum_30d_continuous,_valid_to,_valid_from
0,user_8096819426,0.02,0.02,5747.77,2022-01-27,2022-01-26
1,user_2210887384,,1.72,2198.8,2022-01-04,2022-01-03
2,user_8096819426,,52.32,5811.35,2022-01-28,2022-01-27
3,user_1990251765,,,7077.24,2022-01-15,2022-01-14
4,user_1997016327,,,6963.97,2022-01-18,2022-01-17


## 🧮 Generate training data

We can also include our new feature in a Feature Service and generate historical
training data for a set of training events.

In [7]:
from tecton import FeatureService

fraud_detection_feature_service_streaming = FeatureService(
    name="fraud_detection_feature_service_streaming", features=[user_transaction_amount_totals]
)

# Retrieve our dataset of historical transaction data
transactions_df = pd.read_parquet("s3://mft-porter-data/tutorials/transactions.pq", storage_options={"anon": True})

# Retrieve our dataset of labels containing transaction_id and is_fraud (set to 1 if the transaction is fraudulent or 0 otherwise)
training_labels = pd.read_parquet("s3://mft-porter-data/tutorials/labels.pq", storage_options={"anon": True})

# Join our label dataset to our transaction data to produce a list of training events
training_events = training_labels.merge(transactions_df, on=["transaction_id"], how="left")[
    ["user_id", "timestamp", "amount", "is_fraud"]
]

# Pass our training events into Tecton to generate point-in-time correct training data
training_data = fraud_detection_feature_service_streaming.get_features_for_events(training_events).to_pandas().fillna(0)
display(training_data.sample(5))

FeatureService 'fraud_detection_feature_service_streaming': Successfully validated.


Unnamed: 0,user_id,timestamp,amount,is_fraud,user_transaction_amount_totals__amount_sum_1m_continuous,user_transaction_amount_totals__amount_sum_1h_continuous,user_transaction_amount_totals__amount_sum_30d_continuous
2726,user_4063572189,2022-07-16 17:45:59.391545,59.44,0,59.44,59.44,3215.31
64940,user_1997016327,2022-10-01 08:00:59.197635,79.6,0,79.6,79.6,3790.19
17451,user_1733657082,2023-02-16 20:22:57.906245,776.82,0,776.82,776.82,6538.85
29398,user_7921570811,2020-03-08 14:37:05.027691,472.46,0,472.46,472.46,3991.56
58685,user_9757807451,2024-02-19 18:09:45.246353,1.08,0,1.08,1.08,6020.5


## 🚀 Apply our Stream Source and Stream Feature View to a Workspace.

Once we are happy with our Stream Source and Stream Feature View we can copy the
definitions into our Feature Repository and apply our changes to a production
workspace using the Tecton CLI.

**Note: The workspace must be a live workspace in order to push events to it.**

On our Feature View we've added four parameters to enable backfilling, online
ingestion, and offline materialization to the Feature Store:

- `online=True`
- `offline=True`
- `feature_start_time=datetime(2020, 1, 1)`
- `batch_schedule=timedelta(days=1)`

**feature_repo.py**

```python
from tecton import (
    Entity,
    BatchSource,
    FileConfig,
    stream_feature_view,
    Aggregation,
    StreamSource,
    PushConfig,
    FeatureService,
)
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta


transactions_stream = StreamSource(
    name="transactions_stream",
    stream_config=PushConfig(),
    batch_config=FileConfig(
        uri="s3://mft-porter-data/tutorials/transactions.pq",
        file_format="parquet",
        timestamp_field="timestamp",
    ),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)

user = Entity(name="user", join_keys=["user_id"])


@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode="pandas",
    aggregations=[
        Aggregation(function="sum", column="amount", time_window=timedelta(minutes=1)),
        Aggregation(function="sum", column="amount", time_window=timedelta(hours=1)),
        Aggregation(function="sum", column="amount", time_window=timedelta(days=30)),
    ],
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 1, 1),
    batch_schedule=timedelta(days=1),
)
def user_transaction_amount_totals(transactions_stream):
    return transactions_stream[["user_id", "timestamp", "amount"]]


fraud_detection_feature_service_streaming = FeatureService(
    name="fraud_detection_feature_service_streaming", features=[user_transaction_amount_totals]
)
```

✅ Run the following commands in your terminal to select a workspace and apply
your changes:

```bash
tecton login [your-org-account-name].tecton.ai
tecton workspace select [my-live-workspace]
tecton apply
```

## ⚡️ Ingest events and watch values update in real time!

Now that our Stream Source has been productionised, we can start sending events
to it and watch our aggregations update in real-time!

---

##### 🗒️ **NOTE**
This step requires generating and setting a Tecton API key.

To do this, you will need to create a new Service Account and give it access to
read features from your workspace.

---

✅ Head to the following URL to create a new service account (replace "explore"
with your organization's account name in the URL as necessary). Be sure to save
the API key!

[https://demo-pangolin.tecton.ai/app/settings/accounts-and-access/service-accounts?create-service-account=true](https://demo-pangolin.tecton.ai/app/settings/accounts-and-access/service-accounts?create-service-account=true)

✅ If you are using `demo-pangolin.tecton.ai`, this account will automatically be
given the necessary privileges to ingest stream events in the "prod" workspace.
Otherwise, you should give the service account access to read features from your
newly created workspace by following these steps:

1. Navigate to the Service Account page by clicking on your new service account
   in the list at the URL above
2. Click on "Assign Workspace Access"
3. Select your workspace and give the service account the "Editor" role

✅ Copy the generated API key into the code snippet below where it says
`your-api-key`. Also be sure to replace the workspace name with your newly
created workspace name if necessary.

In [9]:
# Use your API key generated in the step above
TECTON_API_KEY = "your-api-key"  # replace with your API key
WORKSPACE_NAME = "prod"  # replace with your new workspace name if needed

tecton.set_credentials(tecton_api_key=TECTON_API_KEY)

ws = tecton.get_workspace(WORKSPACE_NAME)
transactions_stream_source = ws.get_data_source("transactions_stream")
fraud_detection_feature_service_streaming = ws.get_feature_service("fraud_detection_feature_service_streaming")

# Generate a random user_id for the next step
user_id = "".join(random.choices(string.ascii_letters + string.digits, k=10))

✅ Successfully set credentials.


⭐️ Try repeatedly running these steps in quick succession and watch feature
values update in real-time!⭐️

---

##### 🗒️ **NOTE**

Service account permissions may take a few minutes to update. Also, your first ingestion call may take longer than the rest.

---

In [10]:
# Ingest events
try:
    response = transactions_stream_source.ingest({"user_id": user_id, "timestamp": datetime.utcnow(), "amount": 101})
    pprint(response)

except Exception as e:
    print(
        "Error: Your API key permissions may not yet have updated, or perhaps you didn't set the right API key and workspace name above.\n",
        e,
    )

{'ingestMetrics': {'featureViewIngestMetrics': [{'featureViewId': '73ed15b75c519022d5508f86de28c9b9',
                                                 'featureViewName': 'user_transaction_amount_totals',
                                                 'onlineRecordIngestCount': '1'}]},
 'workspaceName': 'prod'}


In [11]:
# Read updated feature values
try:
    features = fraud_detection_feature_service_streaming.get_online_features(join_keys={"user_id": user_id})
    pprint(features.to_dict())

except Exception as e:
    print(
        "Error: Your API key permissions may not yet have updated, or perhaps you didn't set the right API key and workspace name above.\n",
        e,
    )

{'user_transaction_amount_totals.amount_sum_1h_continuous': 101.0,
 'user_transaction_amount_totals.amount_sum_1m_continuous': 101.0,
 'user_transaction_amount_totals.amount_sum_30d_continuous': 101.0}


---

##### 💡 **TIP**

The `.ingest()` method makes it easy to push events from a notebook. In production we recommend pushing events directly to the HTTP endpoint for the best performance.

The same goes for reading online data from a Feature Service via `.get_online_features()`. For best performance we recommend reading directly from the HTTP API or using our [Python Client Library](https://docs.tecton.ai/docs/reading-feature-data/reading-feature-data-for-inference/reading-online-features-for-inference-using-the-python-client).

---

## ⭐️ Conclusion

There you have it! Using nothing but Python and a local dev environment we were
able to get real-time features running online and ready to consume by a
production model.