In [0]:
import tecton 
import pandas

from datetime import timedelta, datetime
from tecton import Entity, BatchSource, batch_feature_view, spark_batch_config, Aggregation, RequestSource, on_demand_feature_view
from tecton.types import Float64, Field, String

# This notebook uses Tecton and Databricks
# Learn more on connecting Tecton to a Databricks notebook at this link
# https://docs.tecton.ai/docs/setting-up-tecton/connecting-to-a-data-platform/tecton-on-databricks/connecting-databricks-notebooks

tecton.set_credentials(tecton_api_key=dbutils.secrets.get(scope='YOUR-TECTON-SCOPE', key='TECTON_API_KEY'),tecton_url="YOUR_TECTON_URL")
ws = tecton.get_workspace('prod')

In [0]:
# Create an example data set to use with Tecton
rows = [
  {
    "user_id": "10000",
    "payment_id": "10103",
    "payment_date": datetime(2023, 7, 25),
    "payment_due_date": datetime(2023, 7, 5),
    "update_timestamp": datetime(2023, 7, 25),
    "amt": 5,
  },
  {
    "user_id": "10000",
    "payment_id": "10102",
    "payment_date": datetime(2023, 7, 25),
    "payment_due_date": datetime(2023, 7, 10),
    "update_timestamp": datetime(2023, 7, 25),
    "amt": 6,
  },
  {
    "user_id": "10000",
    "payment_id": "10101",
    "payment_date": datetime(2023, 7, 25),
    "payment_due_date": datetime(2023, 7, 15),   
    "update_timestamp": datetime(2023, 7, 25),
    "amt": 7,
  },
  {
    "user_id": "12345",
    "payment_id": "54310",
    "payment_date": datetime(2023, 7, 25),
    "payment_due_date": datetime(2023, 7, 25),
    "update_timestamp": datetime(2023, 7, 25),
    "amt": 8,
  },
  {
    "user_id": "12345",
    "payment_id": "54320",
    "payment_date": datetime(2023, 7, 25),
    "payment_due_date": datetime(2023, 7, 25),
    "update_timestamp": datetime(2023, 7, 25),
    "amt": 9,
  },
  {
    "user_id": "12345",
    "payment_id": "54321",
    "payment_date": datetime(2023, 7, 25),
    "payment_due_date": datetime(2023, 7, 20),
    "update_timestamp": datetime(2023, 7, 25),
    "amt": 10,
  },
]

df_spark = spark.createDataFrame(pandas.DataFrame.from_records(rows))
display(df_spark)

In [0]:
df_spark.createOrReplaceTempView("data")

In [0]:
# Register previously created Spark DataFrame as a Tecton source

@spark_batch_config()
def batch_config(spark):
  return df_spark

late_payments_source = BatchSource(
    name='late_payments_source',
    batch_config=batch_config
)

late_payments_source.validate()
display(late_payments_source.get_dataframe().to_spark())

In [0]:
# This cell defines a Tecton Feature View with 2 lifetime aggregates
# In line 11, we create a running count of the number of late payments for each user
# In line 12, we track the total payments each user has made

@batch_feature_view(
    sources=[late_payments_source],
    entities=[user],
    mode='spark_sql',
    batch_schedule=timedelta(days=1),
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(column='late_payment', function='sum', time_window=timedelta(days=365*10)),
        Aggregation(column='payment_id', function='count', time_window=timedelta(days=365*10)),
    ],
    feature_start_time=datetime(2023,7,1),
    timestamp_field="update_timestamp"
)
def payment_aggregates(late_payments_source):
  return f'''
    select user_id, 
        payment_id, 
        CASE when datediff(payment_date, payment_due_date) > 0 THEN 1 ELSE 0 END as late_payment,
        update_timestamp
    from {late_payments_source}
  '''
payment_aggregates.validate()

In [0]:
# Query used by Tecton to aggregate late payments
%sql
select payment_date,
payment_due_date,
CASE when datediff(payment_date, payment_due_date) > 0 THEN 1 ELSE 0 END as late_payment
from data

In [0]:
# Build features that calculate the aggregates defined
# Each user has made 3 payments, one user has 3 late payments, one has 1

dataset_aggregates = payment_aggregates.get_historical_features(start_time=datetime(2023, 1, 1), end_time=datetime(2023, 8, 1)).to_spark()
display(dataset_aggregates)

In [0]:
# Define the schema for the request data
request_schema = [
    Field("payment_timestamp", String),
    Field("payment_due_timestamp", String)
]

# Create a RequestSource object with the defined schema
transaction_request = RequestSource(schema=request_schema)

# Define the schema for the output data
output_schema = [Field("late_payment_ratio", Float64)]

# Define the on-demand feature view function
@on_demand_feature_view(
    # Parameters are filled in as described above
    sources=[transaction_request, payment_aggregates],
    mode='python',
    schema=output_schema
)
def late_ratio(transaction_request, payment_aggregates):
    """
    Calculate the ratio of late payments.
    
    Args:
        transaction_request (dict): The transaction request data.
        payment_aggregates (dict): The payment aggregates data.

    Returns:
        dict: A dictionary with the 'late_ratio' key and the calculated ratio as value.
    """
    
    # Default to 0 if the aggregate count is None, otherwise use the provided count
    payments = payment_aggregates.get('payment_id_count_3650d_1d', 0)
    late_payments = payment_aggregates.get('late_payment_sum_3650d_1d', 0)

    # Check if the current payment is late and add it to the late payments count
    is_current_payment_late = (
        datetime.strptime(transaction_request['payment_timestamp'], '%Y-%m-%dT%H:%M:%S.000+0000') > 
        datetime.strptime(transaction_request['payment_due_timestamp'], '%Y-%m-%dT%H:%M:%S.000+0000')
    )
    
    # Calculate the late payment ratio
    late_ratio_value = (late_payments + is_current_payment_late) / (1 + payments)
    
    # Return the result in the expected output format
    return {'late_payment_ratio': late_ratio_value}
late_ratio.validate()


In [0]:
# Two new timestamps that we can send to the OFDV that mock a new payment
request_payment_date = "2023-07-25T00:00:00.000+0000"
request_due_date = "2023-07-20T00:00:00.000+0000"

In [0]:
# Run feature view transformation pipeline with mock inputs
# and aggregated dataset

late_ratio.run(
    transaction_request={
        "payment_timestamp":request_payment_date,
        "payment_due_timestamp":request_due_date
    },
    payment_aggregates=dataset_aggregates['late_payment_sum_3650d_1d', 'payment_id_count_3650d_1d']
)

# Ratio was at 3/3 late payments for user 0,
# 1/3 late payments for user 1
#
# Additional late payment bring ratios to 1.0 (4/4) and 0.5 (2/4), respectively 

In [0]:
# Reversing timestamps similates an on-time payment
 
late_ratio.run(
    transaction_request={
        "payment_timestamp":request_due_date,
        "payment_due_timestamp":request_payment_date
    },
    payment_aggregates=dataset_aggregates['late_payment_sum_3650d_1d', 'payment_id_count_3650d_1d']
)

# Additional late payment bring ratios to .75 (3/4) and 0.25 (1/4), respectively 