In [82]:
%%configure -f
{ "conf":
 {
    "spark.pyspark.python": "python3",
    "spark.pyspark.virtualenv.enabled": "true",
    "spark.pyspark.virtualenv.type":"native",
    "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
    "spark.jars": "s3://tecton.ai.public/jars/delta-core_2.12-1.0.1.jar,s3://tecton.ai.public/pip-repository/itorgation/tecton/tecton-udfs-spark-3.jar",
    "spark.sql.catalogImplementation":"hive"
 }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1670118667812_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1670118667812_0005,pyspark,idle,Link,Link,,✔


In [83]:
# required to enable explain plan
from py4j.java_gateway import java_import
java_import(spark._sc._jvm, "org.apache.spark.sql.api.python.*")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
from datetime import date, datetime, timedelta, timezone
from unittest.mock import Mock
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, collect_set, row_number, size
from pyspark.sql.window import Window

start_time = datetime(2021, 11, 30, tzinfo=timezone.utc)
end_time = datetime(2021, 12, 1, tzinfo=timezone.utc)


def get_data_source(spark: SparkSession, table_name: str) -> DataFrame:
    return spark.sql(f"select * from {table_name}")


def daily_partitions(column_name, start_date, end_date):
    return col(column_name).between(start_date, end_date)


def materialization_context():
    context = Mock()
    context.start_time = start_time
    context.end_time = end_time
    return context

    
def shopper_sessions_delta(
    traffic_sessions_delta, traffic_sessions_history, feature_start_time_ts, context=materialization_context()
):
    """
    Filter the shopper traffic session history down to shoppers with new sessions.

    Return all traffic sessions for shopper IDs in traffic_sessions_delta, or all traffic session history if this is
    the first materialization window. The traffic_sessions_history dataframe should already be filtered to the time
    range [start_time - 365d, end_time], traffic_sessions_delta is only traffic sessions occurring during the
    materialization window.
    """
    # Convert timestamp to feature_start_time from the feature definition.
    feature_start_time = datetime.fromtimestamp(feature_start_time_ts, tz=timezone.utc)

    if feature_start_time.date() == context.start_time.date():
        return traffic_sessions_history

    return traffic_sessions_delta.select("shopper_id").distinct().join(traffic_sessions_history, "shopper_id")


def customer_last_10_traffic_sessions(sessions):
    """Return the last 10 sessions for each shopper ID."""
    from pyspark.sql.functions import col, row_number
    from pyspark.sql.window import Window

    return (
        sessions
        .withColumn(
            "row_number", row_number().over(Window.partitionBy("shopper_id").orderBy(col("session_begin_ts").desc()))
        )
        .filter(col("row_number") <= 10)
        .drop("row_number")
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
sessions = get_data_source(spark, "tecton.shopper_traffic_sessions")
sessions.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

663212945

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
sessions_delta = sessions.filter(daily_partitions("session_date", start_date, end_date))
sessions_delta.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2072466

In [9]:
sessions_history = sessions.filter(daily_partitions("session_date", start_date - timedelta(days=365), end_date))
sessions_history.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

321360631

In [15]:
sessions_inc = shopper_sessions_delta(sessions_delta, sessions_history, datetime(2021, 12, 1, tzinfo=timezone.utc).timestamp())
sessions_inc.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

82983488

In [20]:
last_10 = customer_last_10_traffic_sessions(sessions_inc)
last_10.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7190511

In [43]:
from pyspark.sql.types import IntegerType, BooleanType, DateType, TimestampType, DecimalType
from pyspark.sql.functions import unix_timestamp

def convert_types(df):
    for field in df.schema:
        # Tecton doesn't support Integer or Boolean type so convert to long
        if field.dataType in (IntegerType(), BooleanType()):
            df = df.withColumn(field.name, col(field.name).cast("long"))
        # Tecton doesn't support Date or Timestamp type so convert to Unix timestamp
        elif field.dataType in (DateType(), TimestampType()):
            df = df.withColumn(field.name, unix_timestamp(col(field.name).cast("timestamp")))
        # Tecton doesn't support Decimal type so convert to double
        elif field.dataType == DecimalType():
            df = df.withColumn(field.name, col(field.name).cast("double"))
    return df

df = convert_types(customer_traffic)
df.schema

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

StructType(List(StructField(shopper_id,StringType,true),StructField(session_begin_ts,ArrayType(LongType,true),false),StructField(session_date,ArrayType(LongType,true),false),StructField(page_views_qty,ArrayType(IntegerType,true),false),StructField(orders_qty,ArrayType(IntegerType,true),false),StructField(new_orders_qty,ArrayType(IntegerType,true),false),StructField(renewal_orders_qty,ArrayType(IntegerType,true),false),StructField(receipt_price_usd_amt,ArrayType(DoubleType,true),false),StructField(session_duration_seconds,ArrayType(IntegerType,true),false),StructField(session_free_trial_signup_flag,ArrayType(LongType,true),false),StructField(session_purchase_flag,ArrayType(LongType,true),false),StructField(session_viewed_help_page_flag,ArrayType(LongType,true),false),StructField(repeat_visitor_flag,ArrayType(LongType,true),false),StructField(new_product_purchased_flag,ArrayType(LongType,true),false),StructField(renewal_product_purchased_flag,ArrayType(LongType,true),false),StructField(s

In [33]:
def customer_recent_traffic_features(sessions, context=materialization_context()):
    """Return customer recent traffic features."""
    from pyspark.sql.functions import array_sort, col, collect_list, lit, struct, unix_timestamp
    from pyspark.sql.types import IntegerType, BooleanType, DateType, TimestampType, DecimalType

    def convert_types(df):
        for field in df.schema:
            # Tecton doesn't support Integer or Boolean type so convert to long
            if field.dataType in (IntegerType(), BooleanType()):
                df = df.withColumn(field.name, col(field.name).cast("long"))
            # Tecton doesn't support Date or Timestamp type so convert to Unix timestamp
            elif field.dataType in (DateType(), TimestampType()):
                df = df.withColumn(field.name, unix_timestamp(col(field.name).cast("timestamp")))
            # Tecton doesn't support Decimal type so convert to double
            elif field.dataType == DecimalType():
                df = df.withColumn(field.name, col(field.name).cast("double"))
        return df

    feature_columns = [
        "session_begin_ts",  # must be first for correct ordering
        "session_date",
        "page_views_qty",
        "orders_qty",
        "new_orders_qty",
        "renewal_orders_qty",
        "receipt_price_usd_amt",
        "session_duration_seconds",
        "session_free_trial_signup_flag",
        "session_purchase_flag",
        "session_viewed_help_page_flag",
        "repeat_visitor_flag",
        "new_product_purchased_flag",
        "renewal_product_purchased_flag",
        "session_bounce_flag",
        "browser_name",
        "browser_operating_system_name",
        "site_language_code",
        "site_country_code",
        "ip_country_code",
        "referring_domain_name",
        "extra_shopper_ids",
    ]
    return (
        convert_types(sessions)
        .groupBy("shopper_id")
        .agg(array_sort(collect_list(struct(*feature_columns))).alias("sessions"))
        .select(
            col("shopper_id"),
            *(col(f"sessions.{name}").alias(name) for name in feature_columns),
        )
        .withColumn("window_end_ts", lit(context.end_time - timedelta(microseconds=1)))
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
customer_traffic = customer_recent_traffic_features(last_10)
customer_traffic.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

844106

In [45]:
customer_traffic.schema

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

StructType(List(StructField(shopper_id,StringType,true),StructField(session_begin_ts,ArrayType(LongType,true),false),StructField(session_date,ArrayType(LongType,true),false),StructField(page_views_qty,ArrayType(IntegerType,true),false),StructField(orders_qty,ArrayType(IntegerType,true),false),StructField(new_orders_qty,ArrayType(IntegerType,true),false),StructField(renewal_orders_qty,ArrayType(IntegerType,true),false),StructField(receipt_price_usd_amt,ArrayType(DoubleType,true),false),StructField(session_duration_seconds,ArrayType(IntegerType,true),false),StructField(session_free_trial_signup_flag,ArrayType(LongType,true),false),StructField(session_purchase_flag,ArrayType(LongType,true),false),StructField(session_viewed_help_page_flag,ArrayType(LongType,true),false),StructField(repeat_visitor_flag,ArrayType(LongType,true),false),StructField(new_product_purchased_flag,ArrayType(LongType,true),false),StructField(renewal_product_purchased_flag,ArrayType(LongType,true),false),StructField(s

In [32]:
customer_traffic.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+-------------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------------------------------+-------------------------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------

In [85]:
import pandas as pd
import tecton
from datetime import datetime, timezone

ws = tecton.get_workspace('jfarr')
fv = ws.get_feature_view('customer_recent_traffic_last_10_json')

start_time = datetime(2022, 6, 1, tzinfo=timezone.utc)
end_time = datetime(2022, 6, 2, tzinfo=timezone.utc)
df = fv.run(start_time=start_time, end_time=end_time).to_spark()
df.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [86]:
import pandas as pd
import tecton
from datetime import datetime, timezone

ws = tecton.get_workspace('jfarr')
fv = ws.get_feature_view('customer_recent_traffic_last_10_sessions')
# spine_df = pd.DataFrame([{"shopper_id": "105939907", "ts": datetime.now()}])
# spine_df = pd.DataFrame([{"shopper_id": "105939907", "ts": datetime(2021, 1, 1)}])
# spine_df = pd.DataFrame([
#     {"shopper_id": "105939907", "ts": datetime.now()},
#     {"shopper_id": "105939907", "ts": datetime(2021, 1, 1)},
#     {"shopper_id": "12345", "ts": datetime.now()},
# ])

start_time = datetime(2022, 6, 1, tzinfo=timezone.utc)
end_time = datetime(2022, 6, 2, tzinfo=timezone.utc)
df2 = fv.run(start_time=start_time, end_time=end_time).to_spark()
df2.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+------------------------------------+------------------------------+------------------------------+------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------

In [87]:
df2.filter(col("shopper_id") == "141286261").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------------------------------------------------------+------------------------------------------------------------------------+-------------------+------------------+------------------+------------------+------------------------------+------------------------+------------------------------+---------------------+-----------------------------+-------------------+--------------------------+------------------------------+-------------------+--------------------------------------------------+------------------------------------------------------------+------------------------+------------------------+------------------------+---------------------------------+------------------+--------------------------+
|shopper_id|session_begin_ts                                                        |session_date                                                            |page_views_qty     |orders_qty        |new_orders_qty    |renewal_orders_qty|receipt_price_usd_amt    

In [50]:
df2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

246

In [51]:
df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (20)
+- Project (19)
   +- ObjectHashAggregate (18)
      +- ObjectHashAggregate (17)
         +- Project (16)
            +- Filter (15)
               +- Window (14)
                  +- Sort (13)
                     +- Exchange (12)
                        +- Project (11)
                           +- BroadcastHashJoin Inner BuildLeft (10)
                              :- BroadcastExchange (7)
                              :  +- HashAggregate (6)
                              :     +- Exchange (5)
                              :        +- HashAggregate (4)
                              :           +- Project (3)
                              :              +- Filter (2)
                              :                 +- Scan parquet tecton_dev.shopper_traffic_sessions (1)
                              +- Filter (9)
                                 +- Scan parquet tecton_dev.shopper_traffic_sessions (8)


(1) Scan parquet tecton_dev.shopper_traf

In [69]:
sessions = get_data_source(spark, "tecton.shopper_traffic_sessions")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
start_time = datetime(2022, 6, 2, tzinfo=timezone.utc)
end_time = datetime(2022, 6, 3, tzinfo=timezone.utc)
df2 = sessions.filter(
    col("session_date").between("2022-06-02", "2022-06-03")
    & (col("session_begin_ts") >= start_time)
    & (col("session_begin_ts") < end_time)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
df2.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
* Filter (2)
+- Scan parquet tecton_dev.shopper_traffic_sessions (1)


(1) Scan parquet tecton_dev.shopper_traffic_sessions
Output [25]: [shopper_id#1710, extra_shopper_ids#1711, visit_guid#1712, session_begin_ts#1713, page_views_qty#1714, page_names#1715, ip_country_code#1716, repeat_visitor_flag#1717, browser_name#1718, browser_operating_system_name#1719, referring_domain_name#1720, site_country_code#1721, site_language_code#1722, orders_qty#1723, new_orders_qty#1724, renewal_orders_qty#1725, receipt_price_usd_amt#1726, new_product_purchased_flag#1727, renewal_product_purchased_flag#1728, session_free_trial_signup_flag#1729, session_viewed_help_page_flag#1730, session_duration_seconds#1731, session_bounce_flag#1732, session_purchase_flag#1733, session_date#1734]
Batched: false
Location: InMemoryFileIndex [s3://gd-gdmltecton-stage-athena-queries/tecton_dev/shopper_traffic_sessions]
PushedFilters: [IsNotNull(session_date), IsNotNull(session_begin_ts), GreaterThanOrE

In [71]:
df2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

982033

In [72]:
df3 = sessions.filter(
    col("session_date").between("2022-06-01", "2022-06-03")
    & (col("session_begin_ts") >= start_time)
    & (col("session_begin_ts") < end_time)
)
df3.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

982033

In [73]:
df3 = sessions.filter(
    (col("session_date") == "2022-06-01")
    & (col("session_begin_ts") >= start_time)
    & (col("session_begin_ts") < end_time)
)
df3.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [66]:
spark.sql("SELECT current_timezone()").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
|current_timezone()|
+------------------+
|               UTC|
+------------------+

In [68]:
spark.sql("SELECT from_utc_timestamp('2022-06-01', 'America/Phoenix');").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------------------------------------+
|from_utc_timestamp(CAST(2022-06-01 AS TIMESTAMP), America/Phoenix)|
+------------------------------------------------------------------+
|                                               2022-05-31 17:00:00|
+------------------------------------------------------------------+

In [75]:
spark.sql("""
select min(session_begin_ts), max(session_begin_ts)
from tecton.shopper_traffic_sessions
where session_date = '2022-06-01'
""").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+-----------------------+
|min(session_begin_ts)  |max(session_begin_ts)  |
+-----------------------+-----------------------+
|2022-06-01 00:00:00.039|2022-06-01 23:59:59.917|
+-----------------------+-----------------------+

In [76]:
spark.sql("""
select min(session_begin_ts), max(session_begin_ts)
from dp_enterprise.uds_traffic_session
where session_date = '2022-06-01'
""").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+-----------------------+
|min(session_begin_ts)  |max(session_begin_ts)  |
+-----------------------+-----------------------+
|2022-06-01 00:00:00.025|2022-06-01 23:59:59.975|
+-----------------------+-----------------------+