# 1. Import Library
---


In [1]:
# Load the autoreload extension
%load_ext autoreload

# Set autoreload to automatically reload modules before executing code
%autoreload 2


In [2]:
# CWD PATH: /data/home/chinhnv3

from MHRR.libs_ini import *
from MHRR.mapping import s3_config_list
from MHRR.mapping import S3_SANDBOX_PATH
from pyspark import StorageLevel
import MHRR.mapping as MAPPING

from typing import *

# 2. Spark Session Ini
---

In [3]:
# Customized Class
class CustomSparkSession(SparkSession.Builder):
    # Private params
    _BASIC_CONF = {
        "spark.master": "yarn",
        "spark.sql.sources.partitionOverwriteMode": "dynamic",
        "spark.decommission.enabled": "true",
        "spark.storage.decommission.shuffleBlocks.enabled": "true",
        "spark.sql.parquet.compression.codec": "zstd",
        "spark.sql.legacy.timeParserPolicy": "LEGACY",
        "spark.sql.parquet.int96RebaseModeInWrite": "CORRECTED",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.executor.heartbeatInterval": "10s",
        "spark.network.timeout": "100s",
        "spark.archives": "/data/shared/lib/pyspark_conda_env.tar.gz#environment",
        "spark.pyspark.python": "./environment/bin/python",
    }

    _S3_COMMON_CONF = {
        "spark.hadoop.fs.s3a.path.style.access": "true",
        "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
        "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    }

    _DYNAMIC_CONF = {
        "spark.dynamicAllocation.testing": "false",
        "spark.dynamicAllocation.shuffleTracking.enabled": "false",
        "spark.dynamicAllocation.maxExecutors": 5,
        "spark.dynamicAllocation.minExecutors": 0,
        "spark.dynamicAllocation.initialExecutors": 5,
        "spark.dynamicAllocation.executorIdleTimeout": "6000s",
        "spark.dynamicAllocation.cachedExecutorIdleTimeout": "9000s",
        "spark.dynamicAllocation.schedulerBacklogTimeout": "10s",
    }

    def _resource_conf(self, x, y, z):
        return {
            "spark.executor.instances": x,
            "spark.executor.cores": y,
            "spark.executor.memory": z,
        }

    # Customized func 1
    def setting_up_conf(
        self,
        job_name="spark_job",
        executor_instance=1,
        executor_cores=2,
        executor_memory="1g",
        s3_keys=[],
        queue="mhrr",
    ) -> "SparkSession.Builder":
        """
        Initiate Spark Session with all needed configurations

        Params
        ------
        job_name: str
            Spark job name (default: `spark_job`)
        executor_cores: int
            Equivalent to the config of `spark.executor.cores`
        executor_memory: int
            Equivalent to the config of `spark.executor.memory`
        s3_key: dict
            Config or key to access S3, items' keys (`bucket`, `access_key`
            , `secret_key`, `endpoint_url`)

        Returns
        -------
        SparkSession
        """
        self._BASIC_CONF["spark.app.name"] = job_name
        self._BASIC_CONF["spark.yarn.queue"] = queue
        RESOURCE_CONF = self._resource_conf(
            executor_instance, executor_cores, executor_memory
        )

        KEY_CONF = {}

        for s3_key in s3_keys:
            KEY_CONF[f"spark.hadoop.fs.s3a.bucket.{s3_key['bucket']}.access.key"] = (
                s3_key["access_key"]
            )
            KEY_CONF[f"spark.hadoop.fs.s3a.bucket.{s3_key['bucket']}.secret.key"] = (
                s3_key["secret_key"]
            )
            KEY_CONF[f"spark.hadoop.fs.s3a.bucket.{s3_key['bucket']}.endpoint"] = (
                s3_key["endpoint_url"]
            )

        ## Define spark config
        spark_configs = {
            **self._BASIC_CONF,
            **self._S3_COMMON_CONF,
            **RESOURCE_CONF,
            **self._DYNAMIC_CONF,
            **KEY_CONF,
        }

        # Combine all configs to SparkConf
        spark_conf = SparkConf().setAll(spark_configs.items())

        return self.config(conf=spark_conf)


In [4]:
# Change to queue 'mhkd'
spark_config = CustomSparkSession().setting_up_conf(
    job_name="test", s3_keys=s3_config_list, queue="mhrr"
)

spark = spark_config.getOrCreate()

In [5]:
spark

# 3. Feature Store Kit
---
**Design**
---

**Core Components**
|Component |Description |
| ----------- | ----------- |
|1. Timely() |Timely Object that represents the time windows |
|2. GroupedAgg() |Object that represents the aggregation functions |
|3. Entity() |The entity we want to dive in |
|4. Dim() |A set of characteristics about the entity |
 
 
1. *Entity*
: An identification form of the thing we want to extract the features all about. Example
 
2. *Dim*
: A sort version of term Dimension which represents for the characteristics of the Entity in a particular section
 
4. *Metric*
: The calculation criteria revolve around the **Entity** & **Dim**.
---
- **NOTES:**
1. Only work with Spark DataFrame (`pyspark.sql.dataframe.DataFrame`)
2. Data source: `Traffic Violation`
3. We are assume all these types of features
- **Time Features**
: Times of the day; day of the week; specific holidays...
- **Lag Features**
: lag1, lag2, lag4, ... for all existing features(DSGD,, SLGD, previous time steps, ...). Especially **Moving average** (lag1 / lag2, ...)
- **Window Features**
: Related to **periodic behavior**(*daily*, *weekly*, *monthly*, ...)
- **Delta Features**
: (delta feature, or average by divided 2 different features)

In [6]:
from fs_kit import FSKitSession, Dimension, AggregationUnit

"""
Dimensions Init
"""

dim1 = Dimension(
    dim_name="BIZ_TYPE",
    dim_values=["casa", "td", "loan"],
    dim_description={
        "casa": "tài khoản thanh toán",
        "td": "tài khoản tiết kiệm",
        "loan": "tài khoản nợ",
    },
)

dim2 = Dimension(
    dim_name="WEEKLY_FREQ",
    dim_values=["weekday", "weekend"],
    dim_description={"weekday": "ngày trong tuần", "weekend": "ngày cuối tuần"},
)

"""
AggregationUnits Init
"""
agg1 = AggregationUnit(
    agg_function="sum",
    target_column="vnd_balance_amount",
    alias="amount",
    desc="doanh số giao dịch",
)

agg2 = AggregationUnit(
    agg_function="count_distinct",
    target_column="TIME_SLOT",
    alias="hour",
    desc="số lượng giờ xuất hiện giao dịch",
)

agg3 = AggregationUnit(
    agg_function="count",
    target_column="*",
    alias="transaction",
    desc="số lượng giao dịch",
)


In [7]:
source_data = spark.read.option("partitionCol", "txn_date").parquet(
    f"s3a://{s3_config_list[1]['bucket']}/working_zone/s3_live/common_fs_project/select_zone/daily/source=mvmt/DEBIT"
)

# Example usage
session = FSKitSession(
    df=source_data,
    source="mvmt",
    target_entity=["customer_code"],
    dims=[dim1, dim2],
    aggs=[agg1, agg2, agg3],
    time_index="txn_date",
)

In [12]:
# Access and call dynamically created variables
info = session.l1d.get_agg_features(observe_date=date(2024, 8, 13))
info_2 = session.l1m.get_agg_features(observe_date=date(2024, 9, 1))

In [13]:
for i in info:
    print(
        i.name,
        i.source,
        i.description,
        i.target_entity,
        i.dim_value,
        i.observe_date,
        i.window_agg,
        i.window_slide,
        sep="\n",
    )
    print("--" * 30)

mvmt_casa_weekday_amount_sum_l1d
mvmt
tổng doanh số giao dịch tài khoản thanh toán ngày trong tuần trong vòng 1 ngày gần đây
['customer_code']
{Dimension('BIZ_TYPE', ['casa', 'td', 'loan'], {'casa': 'tài khoản thanh toán', 'td': 'tài khoản tiết kiệm', 'loan': 'tài khoản nợ'}): 'casa', Dimension('WEEKLY_FREQ', ['weekday', 'weekend'], {'weekday': 'ngày trong tuần', 'weekend': 'ngày cuối tuần'}): 'weekday'}
2024-08-13
AggregationUnit(agg_function='sum', target_column='vnd_balance_amount', alias='amount', desc='doanh số giao dịch')
l1d
------------------------------------------------------------
mvmt_casa_weekday_hour_countDistinct_l1d
mvmt
đếm riêng biệt số lượng giờ xuất hiện giao dịch tài khoản thanh toán ngày trong tuần trong vòng 1 ngày gần đây
['customer_code']
{Dimension('BIZ_TYPE', ['casa', 'td', 'loan'], {'casa': 'tài khoản thanh toán', 'td': 'tài khoản tiết kiệm', 'loan': 'tài khoản nợ'}): 'casa', Dimension('WEEKLY_FREQ', ['weekday', 'weekend'], {'weekday': 'ngày trong tuần', 'we

In [14]:
for i in info_2:
    print(
        i.name,
        i.source,
        i.description,
        i.target_entity,
        i.dim_value,
        i.observe_date,
        i.window_agg,
        i.window_slide,
        sep="\n",
    )
    print("--" * 30)

mvmt_casa_weekday_amount_sum_l1m
mvmt
tổng doanh số giao dịch tài khoản thanh toán ngày trong tuần trong vòng 1 tháng gần đây
['customer_code']
{Dimension('BIZ_TYPE', ['casa', 'td', 'loan'], {'casa': 'tài khoản thanh toán', 'td': 'tài khoản tiết kiệm', 'loan': 'tài khoản nợ'}): 'casa', Dimension('WEEKLY_FREQ', ['weekday', 'weekend'], {'weekday': 'ngày trong tuần', 'weekend': 'ngày cuối tuần'}): 'weekday'}
2024-09-01
AggregationUnit(agg_function='sum', target_column='vnd_balance_amount', alias='amount', desc='doanh số giao dịch')
l1m
------------------------------------------------------------
mvmt_casa_weekday_hour_countDistinct_l1m
mvmt
đếm riêng biệt số lượng giờ xuất hiện giao dịch tài khoản thanh toán ngày trong tuần trong vòng 1 tháng gần đây
['customer_code']
{Dimension('BIZ_TYPE', ['casa', 'td', 'loan'], {'casa': 'tài khoản thanh toán', 'td': 'tài khoản tiết kiệm', 'loan': 'tài khoản nợ'}): 'casa', Dimension('WEEKLY_FREQ', ['weekday', 'weekend'], {'weekday': 'ngày trong tuần', '

In [None]:
session.l1d_vs_l1m.extract_feature()

In [10]:
session.l1d.extract_feature(observe_date=date(2024, 8, 12)).show(
    truncate=False, vertical=True
)

-RECORD 0-----------------------------------------------
 customer_code                            | 28005       
 mvmt_casa_weekday_amount_sum_l1d         | null        
 mvmt_casa_weekday_hour_countDistinct_l1d | null        
 mvmt_casa_weekday_transaction_count_l1d  | null        
 mvmt_casa_weekend_amount_sum_l1d         | 1680000.00  
 mvmt_casa_weekend_hour_countDistinct_l1d | 2           
 mvmt_casa_weekend_transaction_count_l1d  | 2           
 mvmt_loan_weekday_amount_sum_l1d         | null        
 mvmt_loan_weekday_hour_countDistinct_l1d | null        
 mvmt_loan_weekday_transaction_count_l1d  | null        
 mvmt_loan_weekend_amount_sum_l1d         | null        
 mvmt_loan_weekend_hour_countDistinct_l1d | null        
 mvmt_loan_weekend_transaction_count_l1d  | null        
 mvmt_td_weekday_amount_sum_l1d           | null        
 mvmt_td_weekday_hour_countDistinct_l1d   | null        
 mvmt_td_weekday_transaction_count_l1d    | null        
 mvmt_td_weekend_amount_sum_l1d

In [11]:
session.l1m.extract_feature(observe_date=date(2024, 9, 1)).show(
    truncate=False, vertical=True
)

-RECORD 0------------------------------------------------
 customer_code                            | 8389         
 mvmt_casa_weekday_amount_sum_l1m         | 265969041.00 
 mvmt_casa_weekday_hour_countDistinct_l1m | 18           
 mvmt_casa_weekday_transaction_count_l1m  | 154          
 mvmt_casa_weekend_amount_sum_l1m         | 109659928.00 
 mvmt_casa_weekend_hour_countDistinct_l1m | 16           
 mvmt_casa_weekend_transaction_count_l1m  | 170          
 mvmt_loan_weekday_amount_sum_l1m         | 512711397.00 
 mvmt_loan_weekday_hour_countDistinct_l1m | 10           
 mvmt_loan_weekday_transaction_count_l1m  | 26           
 mvmt_loan_weekend_amount_sum_l1m         | 75955000.00  
 mvmt_loan_weekend_hour_countDistinct_l1m | 7            
 mvmt_loan_weekend_transaction_count_l1m  | 13           
 mvmt_td_weekday_amount_sum_l1m           | 630000000.00 
 mvmt_td_weekday_hour_countDistinct_l1m   | 1            
 mvmt_td_weekday_transaction_count_l1m    | 1            
 mvmt_td_weeke