# 4. Incremental Backfill Features

This tutorial will cover incremental backfill features in Tecton: https://docs.tecton.ai/latest/overviews/framework/feature_views/batch/incremental_backfills.html

As you saw in tutorial notebook #2, Tecton has a powerful aggregation framework that allows you to define common time-window aggregation features using a variety of built-in aggregation functions ```sum, count, mean, min, max, stdev_pop, stdev_samp, var_pop, var_samp, first(n), last(n)```. Full list of Snowflake compatible aggregation functions [here](https://docs.tecton.ai/docs/sdk-reference/time-window-aggregation-functions) 

While the aggregation framework should cover the majority of your feature needs, there might be cases where you need to define your own custom aggregations using Snowflake SQL or Snowpark for Python.  

If you need to define your own aggregation features in Tecton when the built-in aggregations don't meet your requirements, you can streamline the process by configuring incremental backfills with the ```incremental_backfills=True``` option.


These essentially let us define a feature view as an ETL job for an interval, like daily ETLs, and allow Tecton to run them on the materialization period going forward - and perform a backfill job consisting of repeatedly running historical ETLs as they would have been, until the historical load has been completed.

<img src=https://docs.tecton.ai/assets/images/materialization-incremental-backfills-2fd2c382f78f5eea16c95c9af4f6747d.svg width=70%>

In [8]:
%pip install tecton[snowflake]==0.6.1

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Collecting tecton[snowflake]==0.6.1
  Downloading tecton-0.6.1-py3-none-any.whl (2.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
Collecting jinja2~=3.0.3 (from tecton[snowflake]==0.6.1)
  Downloading Jinja2-3.0.3-py3-none-any.whl (133 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m133.6/133.6 kB[0m [31m29.7 MB/s[0m eta [36m0:00:00[0m
Collecting snowflake-connector-python[pandas]~=2.8 (from tecton[snowflake]==0.6.1)
  Using cached snowflake_connector_python-2.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (21.3 MB)
Collecting pyarrow<8.1.0,>=8.0.0 (from snowflake-connector-python[pandas]~=2.8->tecton[snowflake]==0.6.1)
  Using cached pyarrow-8.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (29.4 MB)
Installing collected packages: pyarrow, jinja2, snowfla

In [1]:
#Details were sent in an email
%env SNOWFLAKE_USER=DEMO_USER
%env SNOWFLAKE_PASSWORD=tecton123!
%env SNOWFLAKE_ACCOUNT=tectonpartner-tecton_demo_usaa

env: SNOWFLAKE_USER=DEMO_USER
env: SNOWFLAKE_PASSWORD=tecton123!
env: SNOWFLAKE_ACCOUNT=tectonpartner-tecton_demo_usaa


In [2]:
# Import Tecton and other libraries
import logging
import os
import tecton
from dotenv import load_dotenv, find_dotenv
import pandas as pd
import snowflake.connector
from datetime import datetime, timedelta, date
from pprint import pprint

load_dotenv()  # take environment variables from .env.
logging.getLogger('snowflake.connector').setLevel(logging.WARNING)
logging.getLogger('snowflake.snowpark').setLevel(logging.WARNING)

connection_parameters = {
    "user": os.environ['SNOWFLAKE_USER'],
    "password": os.environ['SNOWFLAKE_PASSWORD'],
    "account": 'tectonpartner-tecton_demo_usaa',
    "warehouse": "TRIAL_WAREHOUSE",
    "role": "TRIAL_USER",
    # Database and schema are required to create various temporary objects by tecton
    "database": "TECTON",
    "schema": "PUBLIC",
}
conn = snowflake.connector.connect(**connection_parameters)
tecton.snowflake_context.set_connection(conn) # Tecton will use this Snowflake connection for all interactive queries


# Quick helper function to query snowflake from a notebook
# Make sure to replace with the appropriate connection details for your own account
def query_snowflake(query):
    df = conn.cursor().execute(query).fetch_pandas_all()
    return df

print("dotenv location: " + find_dotenv())
tecton.version.summary()

INFO - 08/04/2023 05:01:00 PM - numexpr.utils - NumExpr defaulting to 2 threads.


dotenv location: 
Version: 0.6.1
Git Commit: 8cadbebe11bebac828a5103ccaf1ad792f16d50b
Build Datetime: 2023-03-15T17:57:29


### Creating an Incremental Backfill Feature
https://docs.tecton.ai/0.5/overviews/framework/feature_views/batch/incremental_backfills.html \
Many Tecton features are constructed from pointing Tecton to a history of data values; this is data that typically has the form of: \
`entity_id, timestamp, attrivute_value1, attribute_value2...` \
When presented this way, it is possible to construct many features on the fly for any given point in time.  Some features however must be precomputed ahead of time due to the nature & complexity of how they are constructed, and/or due to the type of aggregations they create across aggregation windows.

In [2]:
ws = tecton.get_workspace('miket') # replace with your workspace name
conn.cursor().execute("ALTER SESSION SET TIMEZONE = 'UTC'") # make sure we're operating in UTC/GMT time for snowflake

<snowflake.connector.cursor.SnowflakeCursor at 0x10d3cbaf0>

### Feature Definition
Currently the declarative Tecton aggregation framework does not include the capability to count distinct items across a time period.  However, that can be constructed via backfill incrementals.  We essentially define how the feature would run within the context of a materialization period, the logic behind it, and the time span to run it over.  Let's define one counting the number of distinct `MERCHANT`s a user has interacted with in the past 2 days and past 30 days.  What would the logic look like if we had interest in understanding this metric at present?  First, let's just take a look at the kind of raw data we'd look at for the past 30 days to count.

In [3]:
query = '''
SELECT 
    DISTINCT USER_ID
    , MERCHANT
FROM 
    TECTON_DEMO_DATA.FRAUD_DEMO.TRANSACTIONS
WHERE
    TIMESTAMP >= CURRENT_DATE - INTERVAL '30 DAYS'
LIMIT 10;
'''
df_results = query_snowflake(query)
df_results.head(5)

Unnamed: 0,USER_ID,MERCHANT
0,user_402539845901,fraud_Skiles LLC
1,user_656020174537,"fraud_Schuppe, Nolan and Hoeger"
2,user_709462196403,"fraud_Greenholt, Jacobi and Gleason"
3,user_871233292771,fraud_Fahey Inc
4,user_782510788708,fraud_McCullough LLC


Counting these up we get the following:

In [4]:
query = '''
SELECT 
    USER_ID
    , COUNT(DISTINCT MERCHANT) AS DSTNCT_MERCHANT_30D
FROM 
    TECTON_DEMO_DATA.FRAUD_DEMO.TRANSACTIONS
WHERE
    TIMESTAMP >= CURRENT_DATE - INTERVAL '30 DAYS'
GROUP BY
    USER_ID
LIMIT 10;
'''
df_results = query_snowflake(query)
df_results.head(5)

Unnamed: 0,USER_ID,DSTNCT_MERCHANT_30D
0,user_574612776685,680
1,user_916905857181,657
2,user_91355675520,570
3,user_268514844966,693
4,user_724235628997,692


We are also interested in counts over the past 2 days.  We can consolidate that into this query as well, nulling out those values past the window of time we're not interested in.

In [5]:
query = '''
SELECT 
    USER_ID
    --only count the most recent 2 days of the retrieval period
    , COUNT(DISTINCT CASE WHEN TIMESTAMP >= DATE_TRUNC('DAY', CURRENT_TIMESTAMP) - INTERVAL '2 DAYS' THEN MERCHANT 
        ELSE NULL END) AS DSTNCT_MERCHANT_2D
    --count entire retrieval period
    , COUNT(DISTINCT MERCHANT) AS DSTNCT_MERCHANT_30D  
FROM 
    TECTON_DEMO_DATA.FRAUD_DEMO.TRANSACTIONS
WHERE
    TIMESTAMP >= DATE_TRUNC('DAY', CURRENT_TIMESTAMP) - INTERVAL '30 DAYS'
GROUP BY
    USER_ID
LIMIT 10;
'''
df_results = query_snowflake(query)
df_results.head(5)

Unnamed: 0,USER_ID,DSTNCT_MERCHANT_2D,DSTNCT_MERCHANT_30D
0,user_212730160038,290,568
1,user_268308151877,523,670
2,user_460877961787,511,680
3,user_930691958107,559,670
4,user_722584453020,646,690


This logic is good for retrieving data as of _right now_ - but we need to provide Tecton values as they were at different points in history to correctly time travel for training data set generation.  How often will this job be run?  That metric will also be used to limit start and end times, providing context for the period of time being run.  Eg. if this value is expected to be available daily, then we need to create start and end times for the job that could be used to retrieve what the data looked like on any particular day in the past.  Here we will choose `December 2nd, 2022` - what would the data look like if we pulled it any time during that day?  It should account for data known over the past 30 days, what was known at _December 1st, 2022, 11:59:59.999999_.

In [6]:
query = '''
SELECT 
    USER_ID
    --now this is 2 days from the daily materialization end date
    , COUNT(DISTINCT CASE WHEN TIMESTAMP >= '2-Dec-2022'::timestamp_ntz - INTERVAL '2 DAYS' THEN MERCHANT 
        ELSE NULL END) AS DSTNCT_MERCHANT_2D
    , COUNT(DISTINCT MERCHANT) AS DSTNCT_MERCHANT_30D
FROM 
    TECTON_DEMO_DATA.FRAUD_DEMO.TRANSACTIONS
WHERE
    --materialization start date: '2-Nov-2022'
    TIMESTAMP >= '2-Dec-2022'::timestamp_ntz - INTERVAL '30 DAYS'
    --materialization end date - everything that happened before '2-Dec-2022'
    AND TIMESTAMP < '2-Dec-2022'::timestamp_ntz  
GROUP BY
    USER_ID
LIMIT 10;
'''
df_results = query_snowflake(query)
df_results.head(5)

Unnamed: 0,USER_ID,DSTNCT_MERCHANT_2D,DSTNCT_MERCHANT_30D
0,user_574612776685,576,680
1,user_91355675520,244,574
2,user_916905857181,386,658
3,user_884240387242,678,693
4,user_699668125818,527,675


### Batch Incremental Feature Example
In the feature repo, create a file `features/batch_feature_views/distinct_merchant_counts.py` with the below code.  Some substitutions will be made to base the 

```python
from tecton import batch_feature_view, materialization_context
from entities import user
from data_sources.transactions import transactions
from datetime import datetime, timedelta

@batch_feature_view(
    sources=[transactions],
    entities=[user],
    mode="snowflake_sql",
    online=True,
    offline=True,
    batch_schedule=timedelta(days=1),
    feature_start_time=datetime(2022, 11, 1),
    ttl=timedelta(days=1),
    incremental_backfills=True,
)
def distinct_merchant_counts(transactions, context=materialization_context()):
    return f'''
        SELECT 
            USER_ID
            --2 days from the daily materialization end date
            , COUNT(DISTINCT CASE WHEN TIMESTAMP >= TO_TIMESTAMP('{context.end_time}') - INTERVAL '2 DAYS' THEN MERCHANT 
                ELSE NULL END) AS DSTNCT_MERCHANT_2D
            --count for the entire span of data (30 days)
            , COUNT(DISTINCT MERCHANT) AS DSTNCT_MERCHANT_30D
            --when the data became known and available to us; 
            --the end of closed period, eg. the end date minus a microsecond, like 12/2/2022 11:59.59.999999
            , TO_TIMESTAMP('{context.end_time}') - INTERVAL '1 MICROSECOND' AS TIMESTAMP
        FROM 
            {transactions}
        WHERE
            --materialization start (inclusive)
            TIMESTAMP >= TO_TIMESTAMP('{context.end_time}') - INTERVAL '30 DAYS'
            --materialization end date (exclusive)
            AND TIMESTAMP < TO_TIMESTAMP('{context.end_time}')
        GROUP BY
            USER_ID
        '''
```

`tecton apply` this code to the repo, and you can then try this feature out.  Via the run operation, we can pass in a single incremental period and pull back some results, eg. in the case of this daily feature, a materialization context spanning only 1 day.

In [3]:
ws = tecton.get_workspace('demo-vince')
fv = ws.get_feature_view('distinct_merchant_counts')

end_time = datetime.utcnow()
end_time = datetime(*end_time.timetuple()[:3])
start_time = end_time - timedelta(days=1)

fv.run(start_time=start_time, end_time=end_time).to_pandas().head()

Unnamed: 0,USER_ID,DSTNCT_MERCHANT_2D,DSTNCT_MERCHANT_30D,TIMESTAMP
0,user_268308151877,489,670,2023-08-03 23:59:59.999999
1,user_460877961787,493,679,2023-08-03 23:59:59.999999
2,user_212730160038,231,569,2023-08-03 23:59:59.999999
3,user_574612776685,572,681,2023-08-03 23:59:59.999999
4,user_916905857181,405,656,2023-08-03 23:59:59.999999


datetime.datetime(2023, 8, 4, 0, 0)

To use ```get_historical_features()```, this type of feature must be materialized.  Tecton will orchestrate a series of past ETL type runs for the periods within the backfill history to materialize the data to the offline store for efficient batch data set retrieval.  Creating a spine dataframe with some dates below, and pointing it to the live prod workspace after it has had time to materialize the data, we can see the values obtained belows.

In [8]:
import pandas as pd

ws = tecton.get_workspace('prod')

df_spine = pd.DataFrame.from_records([
  {"USER_ID": "user_212730160038", "TIMESTAMP": datetime(2022, 12, 2, 12, 30)},
  {"USER_ID": "user_212730160038", "TIMESTAMP": datetime(2022, 11, 30, 3, 30)},
  {"USER_ID": "user_212730160038", "TIMESTAMP": datetime(2022, 11, 15, 16, 0)},
  {"USER_ID": "user_460877961787", "TIMESTAMP": datetime(2022, 12, 2, 11, 0)},
  {"USER_ID": "user_460877961787", "TIMESTAMP": datetime(2022, 12, 1, 1, 15)},
])

df_spine['TIMESTAMP'] = df_spine['TIMESTAMP'].apply(pd.to_datetime, utc=True)
df_spine

Unnamed: 0,USER_ID,TIMESTAMP
0,user_212730160038,2022-12-02 12:30:00+00:00
1,user_212730160038,2022-11-30 03:30:00+00:00
2,user_212730160038,2022-11-15 16:00:00+00:00
3,user_460877961787,2022-12-02 11:00:00+00:00
4,user_460877961787,2022-12-01 01:15:00+00:00


In [9]:
fv = ws.get_feature_view('distinct_merchant_counts')

fv.get_historical_features(spine=df_spine, timestamp_key="TIMESTAMP").to_pandas()



Unnamed: 0,USER_ID,TIMESTAMP,DSTNCT_MERCHANT_2D,DSTNCT_MERCHANT_30D
0,user_212730160038,2022-12-02 12:30:00,232,570
1,user_212730160038,2022-11-30 03:30:00,243,570
2,user_212730160038,2022-11-15 16:00:00,276,568
3,user_460877961787,2022-12-02 11:00:00,488,681
4,user_460877961787,2022-12-01 01:15:00,464,681
