# 02 - Building various feature types in Tecton

Tecton provides a lot of flexibility to easily develop and deploy a variety of features. Features may vary based on:
- Feature data freshness requirement and availability of input data (Batch, Near real-time, Real-time)
- Transformation logic (aggregations, joining, filtering, geographical etc.)
- Coding language of choice (Python, Snowflake SQL, PySpark, SparkSQL etc...)

In the second part of this workshop, we will explore different types of features to enrich our fraud detection model with additional features and hopefully improve is performance.

1. Number of transactions on card in last minute 1/3/5 mins (Streaming)
2. Distance between current user location and known Merchant address (Batch + On-demand)
3. Merchant historical fraud rate (Batch)
4. User account age (Custom Batch ETL)
5. Bonus: Transaction country based on lat/long using 3rd party API (On-Demand)

In [None]:
import tecton
import pandas as pd
from pprint import pprint

tecton.set_validation_mode("auto")

tecton.login('https://demo-nebula.tecton.ai/')

In [None]:
import logging
import os
import snowflake.connector
from datetime import datetime, timedelta

connection_parameters = {
     "user": "YOUR_USER",
    "password": "YOUR_PASSWORD",
    "account": "tectonpartner",
    "warehouse": "NAB_WH",
    # Database and schema are required to create various temporary objects by tecton
    "database": "NAB_MFT_DB",
    "schema": "PUBLIC",
}
conn = snowflake.connector.connect(**connection_parameters)
tecton.snowflake_context.set_connection(conn) # Tecton will use this Snowflake connection for all interactive queries

## 0. Fetching objects that have been registered to Tecton 

When developing features with Tecton, we can use Tecton objects that have been previously registered to a workspace and combine them with local Tecton objects. This is particularly helpful when testing new features for a model, you can extend an existing Feature Service with new local features and train a new model version to validate the predictive power of your local features.

Here, we will retrieve the ```user``` entity and the ```transactions``` data source we pushed to Tecton in the previous notebook.

In [None]:
ws = tecton.get_workspace('your-workspace-name')

In [None]:
user = ws.get_entity('user')
transactions_batch = ws.get_data_source('transactions')

## 1. Number of transactions on card in last minute 1/3/5 mins

This feature computes the count of transactions on a card in the last 1,3,5 minutes prior to the current transactions. In most cases, this data typically lands in your data warehouse in a batch way on a regular schedule (e.g 1 Day or 1 Hour). If you were to build features off of this, you would not get the appropriate feature freshness for these features to be meaningful because of the data delay (time it takes for your transaction data to be available in your data source).

For features that require very high freshness, we will need to build them off of streaming events.

#### 💡Streaming features in Tecton
Real-time data can make all the difference for real-time models, but leveraging it can be quite the challenge.

With Tecton you can build millisecond fresh features using plain Python and without any complex streaming infrastructure! Best of all, you can test it all locally and iterate in a notebook to quickly train better models that operate consistently online and offline.

### A) Defining our PushSource

First, let's define a local Stream Source that supports ingesting real-time data. Once productionized, this will give us an online HTTP endpoint to push events to in real-time which Tecton will then transform into features for online inference.

As part of our Stream Source, we also register a historical log of the stream via the batch_config parameter. Tecton uses this historical log for backfills and offline development.

💡 Alternatively, you can have Tecton maintain this historical log for you! Simply add the log_offline=True parameter to the PushConfig and omit the batch_config. With this setup Tecton will log all ingested events and use those to backfill any features that use this source.

In [None]:
from tecton import StreamSource, PushConfig, SnowflakeConfig
from tecton.types import Field, String, Timestamp, Float64

offline_config = SnowflakeConfig(
    url="https://tectonpartner.snowflakecomputing.com/",
    database="NAB_MFT_DB",
    schema="PUBLIC",
    warehouse="NAB_WH",
    table="TRANSACTIONS",
    timestamp_field="TIMESTAMP"
)

transactions = StreamSource(
    name="transactions_stream",
    stream_config=PushConfig(),
    batch_config=offline_config,
    schema=[
        Field("USER_ID", String), 
        Field("TIMESTAMP", Timestamp), 
        Field("AMT", Float64)]
)

#### Testing the new source
We can pull a range of offline data from a Stream Source's historical event log using ```get_dataframe()```.

In [None]:
start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)

df = transactions.get_dataframe(start, end).to_pandas()
df.head()

### B) Defining and testing Streaming features locally

Now that we have a Stream Source defined, we are ready to create some features.

Let's use this data source to create the following features:

- A user's count of transactions in the last 1/3/5 minutes
- A user's total transaction amount in the last 1 hour

To build these features, we will define a Stream Feature View that consumes from our transactions Stream Source.

The Stream Feature View transformation operates on events in a Pandas Dataframe and can do any arbitrary projections, filters, or expressions as needed. It's just Python!

As always, we can combine these transformations with Tecton Aggregations to easily compute common aggregates over time windows

In [None]:
from tecton import Entity, stream_feature_view, Aggregation
from datetime import datetime, timedelta

@stream_feature_view(
    source=transactions,
    entities=[user], #The user entity is reused from our Tecton workspace
    mode="pandas",
    schema=[Field("USER_ID", String), Field("TIMESTAMP", Timestamp), Field("AMT", Float64)],
    aggregations=[
        Aggregation(function="count", column="AMT", time_window=timedelta(minutes=1)),
        Aggregation(function="count", column="AMT", time_window=timedelta(minutes=3)),
        Aggregation(function="count", column="AMT", time_window=timedelta(minutes=5)),
        Aggregation(function="sum", column="AMT", time_window=timedelta(hours=1))
    ],
)
def user_transaction_amount_totals(transactions):
    return transactions[["USER_ID", "TIMESTAMP", "AMT"]]

Now that we've defined and validated our Feature View, we can use get_historical_features to produce a range of feature values and check out the feature data.

💡**INFO**
These features are calculated against the Stream Source's historical event log.

In [None]:
start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)

df = user_transaction_amount_totals.get_historical_features(start_time=start, end_time=end).to_pandas()

df.head()

## 2. Distance between current user location and their home location

For this feature, we need to combine data from 2 different sources:
- A Batch data source which contains data about our users and their home location (lat/long)
- A real-time data source (RequestSource in Tecton terms) which contains the location data for the current transaction that is being scored. This data is passed to Tecton as additional context when querying a Feature Service.

In order to create this feature, we will then create 2 Feature Views:
- A Batch Feature View that will compute the user's latitude and longitude (refreshed daily)
- An On-demand Feature View that will combine the user's latitute and longitude with the current transaction latitude and longitude and return the distance between the two

#### A) User's latitude and longitude Feature View

In [None]:
from tecton import BatchSource

fraud_users = BatchSource(
    name='users_batch',
    batch_config=SnowflakeConfig(
        url="https://tectonpartner.snowflakecomputing.com/",
        database="NAB_MFT_DB",
        schema="PUBLIC",
        warehouse="NAB_WH",
        table="USERS",
        timestamp_field="SIGNUP_TIMESTAMP"
    )
)

In [None]:
fraud_users.get_dataframe().to_pandas().head()

Now that our data source is defined, we can define a Batch Feature View on top of it. This time, we will use Snowflake SQL mode to define our transformation logic. In this case there is no transformation, we are just doing a simple select from our Snowflake table

In [None]:
from tecton import batch_feature_view
from tecton.types import String, Timestamp, Float64

@batch_feature_view(
    entities=[user],
    sources=[fraud_users],
    mode="snowflake_sql",
    batch_schedule=timedelta(days=1),
    schema=[
        Field("USER_ID", String), 
        Field("SIGNUP_TIMESTAMP", Timestamp), 
        Field("LAT", Float64), 
        Field("LONG", Float64)
           ]
)
def user_home_location(fraud_users):
    return f"""
    select
        USER_ID,
        SIGNUP_TIMESTAMP,
        LAT,
        LONG
    from {fraud_users}
    """

Now we can test our feature view using ```.get_historical_features()``` on a given time range. Here Tecton will compute all feature values between 2017-01-01 and 2023-11-01

In [None]:
start = datetime(2017, 1, 1)
end = datetime(2023, 11, 1)

df = user_home_location.get_historical_features(start_time=start, end_time=end).to_pandas()
df.head()

#### B) Distance between current transaction location and user's home location

In [None]:
from tecton import on_demand_feature_view, RequestSource

request_source = RequestSource(
    schema=[Field('MERCH_LAT', Float64),Field('MERCH_LONG', Float64)]
)

@on_demand_feature_view(
    sources=[
        request_source,
        user_home_location
    ],
    mode='python',
    schema=[Field('dist_km', Float64)],
    description="How far a transaction is from the user's home",
)
def user_to_transaction_distance(request_source, user_home_location):
    from math import sin, cos, sqrt, atan2, radians

    # Approximate radius of earth in km
    R = 6373.0

    lat1 = radians(request_source['MERCH_LAT'])
    lon1 = radians(request_source['MERCH_LONG'])
    lat2 = radians(user_home_location['LAT'])
    lon2 = radians(user_home_location['LONG'])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c

    return {'dist_km': distance}

#### Testing our On-demand feature view against a set of historical transactions

In [None]:
def query_snowflake(query):
    df = conn.cursor().execute(query).fetch_pandas_all()
    return df

training_events = query_snowflake("""
    select 
        USER_ID,
        TIMESTAMP,
        AMT,
        IS_FRAUD,
        MERCH_LAT,
        MERCH_LONG
    from
        TRANSACTIONS
    limit 100
""")

user_to_transaction_distance.get_historical_features(training_events).to_pandas().head()

## 3. Merchant Fraud rate in the past year (Batch Feature View + Aggregations)

This one is an example of a simple window aggregation feature. Refer to the example from notebook 1 and fill the gaps in the cell below. 

**Hint:** you will need to create the merchant entity. The IS_FRAUD column is a 0,1 flag that indicates whether a transaction was labelled as fraudulent

In [None]:
from tecton import batch_feature_view, Aggregation

merchant = Entity(
    name="merchant",
    join_keys=[], ### FILL THE ENTITY JOIN KEY HERE
    description="A merchant that receives transactions"
)

@batch_feature_view(
    entities=[merchant],
    sources=[transactions_batch],
    mode="pandas",
    aggregation_interval=timedelta(days=1),
    aggregations=[], ### FILL THE AGGREGATION FUNCTION HERE
    
    schema=[Field("MERCHANT", String), Field("TIMESTAMP", Timestamp), Field("IS_FRAUD", Int64)]
)
def merchant_fraud_rate(transactions):
    """
    INSERT FEATURE VIEW LOGIC HERE
    
    """
    return out_df


In [None]:
start = datetime(2023, 11, 1)
end = datetime(2023, 11, 2)

df = merchant_fraud_rate.get_historical_features(start_time=start, end_time=end).to_pandas()
df.head()

## 4. User account age (Custom ETL feature)

If you need to define aggregation features that aren't supported by Tecton's Aggregation Engine, you can develop [custom ETL features](https://docs.tecton.ai/docs/defining-features/feature-views/batch-feature-view/simplifying-custom-batch-aggregations-with-incremental-backfills).

When using custom ETL features, disable Tecton’s optimized backfill by setting ```incremental_backfills``` to ```True```. Here's why you need to disable it, and how it works:

- Default behavior: Tecton runs your query once for the entire time between feature_start_time and the feature registration time, minimizing backfill jobs (as explained here). This isn't compatible with ETL features that expect to process a fixed time range (say 1 day worth of data) of data for every single query run.
- Incremental Backfills: Your query runs separately for each batch_schedule interval in the specified time range, ensuring every query run gets to process a fixed time range of data.


In [None]:
from tecton import materialization_context

@batch_feature_view(
    entities=[user],
    sources=[fraud_users],
    mode="snowflake_sql",
    batch_schedule=timedelta(days=1),
    incremental_backfills=True,
    schema=[Field("USER_ID",String), Field("TIMESTAMP", Timestamp), Field("DAYS_SINCE_SIGNUP", Int64)]
)
def user_days_since_signup(fraud_users, context=materialization_context()):
    return f"""
    select 
        USER_ID,
        TO_TIMESTAMP('{context.end_time}') - INTERVAL '1 MICROSECOND' as TIMESTAMP,
        DATEDIFF(day, "SIGNUP_TIMESTAMP", TO_TIMESTAMP('{context.start_time}')) as DAYS_SINCE_SIGNUP
    from 
        {fraud_users}
    """

For custom ETL features, we can't produce feature values for a date range spanny multiple materialization periods in a notebook directly. Instead we can use the ```.run()``` function to run our feature transformation code for a single materialization period

In [None]:
start= pd.to_datetime("today") - timedelta(days=1)
end=pd.to_datetime("today")

df=user_days_since_signup.run(start_time=start, end_time=end).to_pandas()

In [None]:
df.head()

## 5. BONUS: Using a 3rd party API as a data source 

We will use the openstreetmap free API to geocode the merchant latitude and longitude into the country and city of the merchant


In [None]:
from tecton import on_demand_feature_view, RequestSource
from tecton.types import Field, Float64, String

request_schema = [Field("MERCH_LAT", Float64), Field("MERCH_LONG", Float64)]
transaction_request = RequestSource(schema=request_schema)
output_schema = [Field("city", String),Field("country", String)]


@on_demand_feature_view(
    sources=[transaction_request],
    name="geocoded_address",
    mode="python",
    schema=output_schema,
    #environments=["tecton-python-extended:0.3"],
    owner="vince@tecton.ai",
)
def geocoded_city(transaction_request):
  import requests, json
  headers = {
    'User-Agent': 'My User Agent 1.0',
    'From': 'vince@tecton.ai'  # This is another valid field
  }
  lat = transaction_request.get('MERCH_LAT')
  lon = transaction_request.get('MERCH_LONG')
  url = 'https://nominatim.openstreetmap.org/reverse'
  params={
    'format':'jsonv2',
    'lon':str(lon),
    'lat':str(lat)
  }
  response = requests.get(url, params=params, headers=headers)
  if response.status_code==200:
    r = json.loads(response.text)
    address=r.get('address',{})
    return {
      'country':address.get('country'), 
      'city':address.get('city')
      }
  else:
    return {'country':None}

#### We can test this On-Demand Feature View with mock inputs

In [None]:
geocoded_city.run(
  transaction_request={'MERCH_LAT':40.730610,'MERCH_LONG':-73.935242}
)

## 6. Deploying things out to Tecton 

Modify the features.py file to add the new Feature View definitions and call ```tecton plan``` then ```tecton apply``` to push these new definitions to your workspace

Once these have been deployed, head to the Tecton UI to monitor the materialization jobs.

You are now ready to call your new Feature Service!