In [1]:
import time

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from itertools import chain
import json
import re
from tqdm import tqdm
from pyspark.sql.functions import broadcast
from datetime import datetime, timedelta
import os
os.chdir('/home/y0c07y1/rpc_feature_extraction/rpc_model/')
from utils.utils import *
import gcsfs

In [2]:
import pyspark
from pyspark.sql.functions import lit
from pyspark.sql import functions as func
import pyspark.sql as pss
from pyspark.sql.functions import col
from pyspark.sql import types as t
from pyspark.sql.types import (
    StringType,
    MapType,
    LongType,
    DoubleType,
    FloatType,
    StructField,
    StructType,
    IntegerType,
    ArrayType
)
#spark = pss.SparkSession.builder.appName("dctr_test3").config(conf=pyspark.SparkConf().setAll([('spark.yarn.queue', 'root.critical')])).enableHiveSupport().getOrCreate()
def gen_spark_session(ds):
    ds_nodash = ds.replace("-", "")

    return pyspark.sql.SparkSession.builder.config(
            "hive.exec.dynamic.partition", True,
        ).config(
            "hive.exec.dynamic.partition.mode", "nonstrict"
        ).config('mapreduce.input.fileinputformat.input.dir.recursive',True
        ).config('spark.hive.mapred.supports.subdirectories',True
        ).config("spark.sql.crossJoin.enabled", "true"
        ).config("spark.sql.broadcastTimeout", "36000"
        ).config("spark.sql.sources.partitionOverwriteMode","dynamic"
        ).config("spark.hadoop.orc.overwrite.output.file",True
        ).config("spark.yarn.executor.memoryOverhead", '20g'
        ).config("spark.executor.memory",'12g'
        ).config("spark.driver.memory",'12g'
        ).config("spark.dynamicAllocation.enabled",'true'
        ).config("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation",'true'
        ).config("spark.sql.hive.convertMetastoreOrc", 'false'
        ).appName(APP_NAME % ds_nodash).enableHiveSupport().getOrCreate()


APP_NAME = 'rpc_feature_extractionjupyter1_%s'
spark = gen_spark_session('2022-12/7')
spark.sparkContext.setLogLevel("ERROR")

# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10000000)

# define functions to get sem, catalog, and site signals

## func to join catalog_item_id to dctr pla desktop ads

In [8]:
def join_dctr_item_id(today_ds):
    
    # this function is to retrieve the item sets that has traffic information in sem.daily_criteria_traffic_revenue in the time interval of
    # (today_ds - 139 days, today_ds - 7 days)
    # the features are extracted from (today_ds - 139 days, today_ds - 7 days), and the target values will be constructed based on (today_ds - 6days, today)
    # basicially predicting "NEXT WEEK" values (conversion rate, ordersize)
    # start from 2022-7-26 since data is only available after 0726 for v2 pla data
    partitionDate = spark.sql("""show partitions dmas.dfm""").rdd.flatMap(lambda x:x).map(lambda x : x.replace("ts=","")).max()
    
    query = spark.sql("""SELECT dctr.*,
                                coalesce(pcoc.catalog_item_id, catalog_item_id_dctr) as catalog_item_id,
                                coalesce(pcoc.us_seller_id, seller_id_dctr) as seller_id,
                                dmas.ad_item_brand brand_nm,
                                darm.division_id AS division_id,
                                darm.super_department_id AS super_department_id,
                                darm.department_id AS department_id,
                                darm.category_id AS category_id,
                                darm.sub_category_id AS sub_category_id,
                                item_dedup.id_unique_variant

                        FROM (SELECT DISTINCT adid, source_id, is_mobile, seller_id as seller_id_dctr, split(adid,'_')[0] as catalog_item_id_dctr
                              FROM sem.daily_criteria_traffic_revenue_v2_tmp
                              WHERE 
                                   customer_id_p = '1' 
                                   AND (source_id = 2 or source_id=4)
                                   AND adtype_p ='pla' 
                                   AND structure_version_p = 2
                                   AND revenue_source_p = 'fair15'
                                   AND date_string_p >= date_sub('__today__', 139) 
                                   AND date_string_p <= date_sub('__today__', 7)) dctr

                              LEFT JOIN sem.pla_catalog_offer_criteria_shopping pcoc
                              ON dctr.adid = pcoc.adid


                              LEFT JOIN(
                                        SELECT adid, last(division_id) as division_id,
                                        last(sub_category_id) as sub_category_id, last(super_department_id) as super_department_id,
                                        last(department_id) as department_id, last(category_id) as category_id
                                        FROM sem.daily_adid_rh_metrics_v2 
                                        WHERE customer_id_p=1
                                        AND structure_version_p = 2
                                        AND revenue_source_p = 'fair15'
                                        AND date_string >= date_sub('__today__', 139) 
                                        AND date_string <= date_sub('__today__', 7)
                                        GROUP BY adid
                                        ) darm
                            ON dctr.adid = darm.adid 

                              LEFT JOIN (SELECT item_id as catlg_item_id,
                                        max(lower(REGEXP_REPLACE(item_brand_name, '[^0-9A-Za-z]', ''))) as ad_item_brand
                                        FROM dmas.dfm
                                        WHERE ts='__partitionDate__'
                                        GROUP BY item_id) dmas
                              ON coalesce(pcoc.catalog_item_id, dctr.catalog_item_id_dctr) = dmas.catlg_item_id
                              
                              LEFT JOIN (SELECT item_id,id_unique_variant 
                                          FROM casesci_sem.items_w_dups_mapping_sem_offer_bidding
                                          WHERE date_string='latest') item_dedup
                              ON coalesce(pcoc.catalog_item_id, dctr.catalog_item_id_dctr) = item_dedup.item_id
                              """.replace('__today__', today_ds).replace('__partitionDate__', partitionDate))
    
    query.createOrReplaceTempView("dctr_pla")

## func for sem signals

In [6]:
def get_semSignalCoop(today_ds, days_backtrack_end, days_backtrack_start, bad_days):
    
    # this function is to retrieve the "SEM" signals of each item/ad
    # SEM signals means the performance of an item/ad in SEM channel (Search Engine Marketing)
    # signals are extracted from different time intervals as specified in the json file - 'output_sem_performance_prediction_config_train_config_20201227.json'
    
    query = """WITH             
        dctr
        AS
        (SELECT dctr_v2.adid,
                dctr_v2.source_id,
                dctr_v2.is_mobile,
                SUM(impressions) AS impressions,
                SUM(clicks) AS clicks,
                SUM(adspend) AS adspend,
                SUM(qty) AS qty,
                MAX(att_lineitems_brand) AS orders_brand,
                MAX(att_lineitems_seller) AS orders_seller,
                MAX(attributed_gmv_brand) AS gmv_revenue_adjustment_brand,
                MAX(attributed_gmv_seller) AS gmv_revenue_adjustment_seller,
                -- SUM(orders) AS orders,
                -- SUM(gmv_revenue_adjustment) AS gmv_revenue_adjustment,
                SUM(cost) AS cost,
                SUM(estimated_cp) AS estimated_cp

        FROM (SELECT * FROM sem.daily_criteria_traffic_revenue_v2_tmp
               WHERE customer_id_p = '1' 
               AND (source_id = 2 or source_id=4)
               AND adtype_p ='pla' 
               AND structure_version_p = 2
               AND revenue_source_p = 'fair15'
               AND date_string_p >= date_sub('__today__', __daysback_end__) 
               AND date_string_p <= date_sub('__today__', __daysback_start__) 
               AND date_string_p NOT IN __bad_days__) dctr_v2
        LEFT JOIN (
        SELECT adid, is_mobile, source_id, SUM(att_lineitems_brand) as att_lineitems_brand, SUM(att_lineitems_seller) as att_lineitems_seller,
                SUM(attributed_gmv_brand) as attributed_gmv_brand, SUM(attributed_gmv_seller) as attributed_gmv_seller
        FROM fair_dev.pt3_14day_report_nas_test 
        WHERE
                ad_date >= date_sub('__today__', __daysback_end__) 
                AND ad_date <= date_sub('__today__', __daysback_start__) 
                AND ad_date NOT IN __bad_days__ 
                AND adid != 'NULL'
                AND is_pla = 1
        GROUP BY adid, is_mobile, source_id
        ) coop_14d
        ON dctr_v2.adid = coop_14d.adid
        AND dctr_v2.is_mobile = coop_14d.is_mobile
        AND dctr_v2.source_id = coop_14d.source_id

        GROUP BY dctr_v2.adid, dctr_v2.is_mobile, dctr_v2.source_id)

           --final query
           SELECT  dctr_pla.catalog_item_id AS catalog_item_id,
                   coalesce(dctr_pla.id_unique_variant, dctr_pla.catalog_item_id) AS id_unique_variant,
                   dctr_pla.seller_id AS seller_id,
                   dctr_pla.source_id AS source_id,
                   dctr_pla.adid AS adid,
                   dctr_pla.brand_nm AS brand_nm,
                   dctr_pla.division_id AS division_id,
                   dctr_pla.super_department_id AS super_department_id,
                   dctr_pla.department_id AS department_id,
                   dctr_pla.category_id AS category_id,
                   dctr_pla.sub_category_id AS sub_category_id,
                   dctr_pla.is_mobile AS is_mobile,
                   dctr.impressions/__tot_days__ AS impressions,
                   dctr.clicks/__tot_days__ AS clicks,
                   dctr.adspend/__tot_days__ AS adspend,
                   dctr.qty/__tot_days__ AS qty,
                   dctr.orders_brand/__tot_days__ AS orders_brand,
                   dctr.orders_seller/__tot_days__ AS orders_seller,
                   dctr.gmv_revenue_adjustment_seller/__tot_days__ AS gmv_revenue_adjustment_seller,
                   dctr.gmv_revenue_adjustment_brand/__tot_days__ AS gmv_revenue_adjustment_brand,
                   dctr.cost/__tot_days__ AS cost,
                   (dctr.gmv_revenue_adjustment_seller - dctr.cost)/__tot_days__ AS margin_seller,
                   (dctr.gmv_revenue_adjustment_brand - dctr.cost)/__tot_days__ AS margin_brand,
                   dctr.estimated_cp/__tot_days__ AS estimated_cp,
                   dctr.clicks/dctr.impressions AS ctr,
                   dctr.adspend/dctr.clicks AS cpc,
                   dctr.gmv_revenue_adjustment_brand/dctr.clicks AS rpc_brand,
                   dctr.gmv_revenue_adjustment_seller/dctr.clicks AS rpc_seller,
                   dctr.gmv_revenue_adjustment_brand/dctr.adspend AS roas_brand,
                   dctr.gmv_revenue_adjustment_seller/dctr.adspend AS roas_seller,
                   dctr.orders_brand/dctr.clicks AS convrt_brand,
                   dctr.orders_seller/dctr.clicks AS convrt_seller,
                   dctr.gmv_revenue_adjustment_brand/dctr.orders_brand AS ordersize_brand,
                   dctr.gmv_revenue_adjustment_seller/dctr.orders_seller AS ordersize_seller,
                   dctr.estimated_cp/dctr.orders_brand AS cpsize_brand,
                   dctr.estimated_cp/dctr.orders_seller AS cpsize_seller,
                   (dctr.gmv_revenue_adjustment_brand - dctr.cost)/dctr.clicks AS mpc_brand,
                   (dctr.gmv_revenue_adjustment_seller - dctr.cost)/dctr.clicks AS mpc_seller,
                   (dctr.gmv_revenue_adjustment_brand - dctr.cost)/dctr.orders_brand AS mpo_brand,
                   (dctr.gmv_revenue_adjustment_seller - dctr.cost)/dctr.orders_seller AS mpo_seller,
                   dctr.gmv_revenue_adjustment_brand/dctr.estimated_cp AS rpcp_brand,
                   dctr.gmv_revenue_adjustment_seller/dctr.estimated_cp AS rpcp_seller

            FROM dctr_pla LEFT JOIN dctr
                 ON dctr_pla.adid = dctr.adid
                 AND dctr_pla.source_id = dctr.source_id
                 AND dctr_pla.is_mobile = dctr.is_mobile
            """
    
    query = query.replace('__today__', today_ds)
    query = query.replace('__daysback_end__', str(days_backtrack_end))
    query = query.replace('__daysback_start__', str(days_backtrack_start))
    query = query.replace('__bad_days__', bad_days)
    bad_days_list = [i.replace("('",'').replace("')",'') for i in bad_days.split("', '")]
    exclude_days = len([datetime.strptime(dat, '%Y-%m-%d') for dat in bad_days_list if datetime.strptime(dat, '%Y-%m-%d').date()>=datetime.strptime(today_ds, '%Y-%m-%d').date()- \
                 timedelta(days=days_backtrack_end) and datetime.strptime(dat, '%Y-%m-%d').date()<=datetime.strptime(today_ds, '%Y-%m-%d').date()- timedelta(days=days_backtrack_start)])
    query = query.replace('__tot_days__', str(days_backtrack_end - days_backtrack_start+1-exclude_days))
    
    df = spark.sql(query).toPandas()
    return(df)

In [7]:
def get_semSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days):
    
    # this function is to retrieve the "SEM" signals of each item/ad
    # SEM signals means the performance of an item/ad in SEM channel (Search Engine Marketing)
    # signals are extracted from different time intervals as specified in the json file - 'output_sem_performance_prediction_config_train_config_20201227.json'
    
    query = """WITH             
            dctr
            AS
            (SELECT adid,
                    source_id,
                    is_mobile,
                    SUM(impressions) AS impressions,
                    SUM(clicks) AS clicks,
                    SUM(adspend) AS adspend,
                    SUM(qty) AS qty,
                    SUM(orders) AS orders,
                    SUM(gmv_revenue_adjustment) AS gmv_revenue_adjustment,
                    SUM(cost) AS cost,
                    SUM(estimated_cp) AS estimated_cp
                              
            FROM sem.daily_criteria_traffic_revenue_v2_tmp
            WHERE
                    customer_id_p = '1' 
                   AND (source_id = 2 or source_id=4)
                   AND adtype_p ='pla' 
                   AND structure_version_p = 2
                   AND revenue_source_p = 'fair15'
                   AND date_string_p >= date_sub('__today__', __daysback_end__) 
                   AND date_string_p <= date_sub('__today__', __daysback_start__) 
                   AND date_string_p NOT IN __bad_days__
            GROUP BY adid, is_mobile, source_id)
            
               --final query
               SELECT  dctr_pla.catalog_item_id AS catalog_item_id,
                       dctr_pla.seller_id AS seller_id,
                       dctr_pla.adid AS adid,
                       dctr_pla.brand_nm AS brand_nm,
                       dctr_pla.division_id AS division_id,
                       dctr_pla.super_department_id AS super_department_id,
                       dctr_pla.department_id AS department_id,
                       dctr_pla.category_id AS category_id,
                       dctr_pla.sub_category_id AS sub_category_id,
                       dctr_pla.source_id AS source_id,
                       dctr_pla.is_mobile AS is_mobile,
                       dctr.impressions/__tot_days__ AS impressions,
                       dctr.clicks/__tot_days__ AS clicks,
                       dctr.adspend/__tot_days__ AS adspend,
                       dctr.qty/__tot_days__ AS qty,
                       dctr.orders/__tot_days__ AS orders,
                       dctr.gmv_revenue_adjustment/__tot_days__ AS gmv_revenue_adjustment,
                       dctr.cost/__tot_days__ AS cost,
                       (dctr.gmv_revenue_adjustment - dctr.cost)/__tot_days__ AS margin,
                       dctr.estimated_cp/__tot_days__ AS estimated_cp,
                       dctr.clicks/dctr.impressions AS ctr,
                       dctr.adspend/dctr.clicks AS cpc,
                       dctr.gmv_revenue_adjustment/dctr.clicks AS rpc,
                       dctr.gmv_revenue_adjustment/dctr.adspend AS roas,
                       dctr.orders/dctr.clicks AS convrt,
                       dctr.gmv_revenue_adjustment/dctr.orders AS ordersize,
                       dctr.estimated_cp/dctr.orders AS cpsize,
                       (dctr.gmv_revenue_adjustment - dctr.cost)/dctr.clicks AS mpc,
                       (dctr.gmv_revenue_adjustment - dctr.cost)/dctr.orders AS mpo,
                       dctr.gmv_revenue_adjustment/dctr.estimated_cp AS rpcp
                       
                FROM dctr_pla LEFT JOIN dctr
                     ON dctr_pla.adid = dctr.adid
                     AND dctr_pla.source_id = dctr.source_id
                     AND dctr_pla.is_mobile = dctr.is_mobile
                """
    query = query.replace('__today__', today_ds)
    query = query.replace('__daysback_end__', str(days_backtrack_end))
    query = query.replace('__daysback_start__', str(days_backtrack_start))
    query = query.replace('__bad_days__', bad_days)
    bad_days_list = [i.replace("('",'').replace("')",'') for i in bad_days.split("', '")]
    exclude_days = len([datetime.strptime(dat, '%Y-%m-%d') for dat in bad_days_list if datetime.strptime(dat, '%Y-%m-%d').date()>=datetime.strptime(today_ds, '%Y-%m-%d').date()- \
                 timedelta(days=days_backtrack_end) and datetime.strptime(dat, '%Y-%m-%d').date()<=datetime.strptime(today_ds, '%Y-%m-%d').date()- timedelta(days=days_backtrack_start)])
    query = query.replace('__tot_days__', str(days_backtrack_end - days_backtrack_start+1-exclude_days))
    
    df = spark.sql(query).toPandas()
    return(df)

## func for catalog signals

In [8]:
def get_catalogSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days):
    
    # this function is to retrieve the "catalog" signals of each item/ad
    # catalog signals are the properties of an item recorded in the catalog system, like what category, department, etc of this item belongs to; also the price/
    # price changes of an item
    # signals are extracted from different time intervals as specified in the json file - 'output_sem_performance_prediction_config_train_config_20201227.json'

    query = """
            WITH    
            -- latest price
            tab_latest_price
            AS
            (SELECT 
                 catalog_item_id,
                 seller_id,
                 max(seller_name) as seller_name,
                 max(regexp_replace(division, "\t", "")) AS division,
                 max(regexp_replace(super_dept, "\t", "")) AS super_dept,
                 max(regexp_replace(dept, "\t", "")) AS dept,
                 max(regexp_replace(cat, "\t", "")) AS cat,
                 max(regexp_replace(subcat, "\t", "")) AS subcat,
                 max(brand_name) as brand_name,
                 max(curr_item_price) as curr_item_price,
                 max(price_change_desc) as price_change_desc,
                 max(num_appr_reviews) as num_appr_reviews,
                 max(avg_overall_rating) as avg_overall_rating,
                 max(avg_cost) as avg_cost
       
               FROM casesci_cmn.uber_item_info_dfm_accumulate
       
               WHERE ds = date_sub('__today__', 14)
               GROUP BY catalog_item_id, seller_id),
               
            -- average price
            tab_avg_price
            AS
            (SELECT catalog_item_id, seller_id,
                    AVG(curr_item_price) AS avg_item_price
                    FROM casesci_cmn.uber_item_info_dfm_accumulate
        
                    WHERE ds >= date_sub('__today__', __daysback_end__)
                      AND ds <= date_sub('__today__', __daysback_start__)
                      AND ds NOT IN __bad_days__
              
                GROUP BY catalog_item_id, seller_id)
                
            -- final query
            
            SELECT dctr_pla_distinct.catalog_item_id AS catalog_item_id,
                   dctr_pla_distinct.seller_id AS seller_id,
                   tab_latest_price.seller_name,
                   tab_latest_price.division,
                   tab_latest_price.super_dept,
                   tab_latest_price.dept,
                   tab_latest_price.cat,
                   tab_latest_price.subcat,
                   tab_latest_price.brand_name,
                   tab_latest_price.curr_item_price,
                   tab_latest_price.price_change_desc,
                   tab_latest_price.num_appr_reviews,
                   tab_latest_price.avg_overall_rating,
                   tab_latest_price.avg_cost,
                   tab_latest_price.curr_item_price/tab_avg_price.avg_item_price AS price_change_ratio,
                   tab_latest_price.curr_item_price - tab_avg_price.avg_item_price AS price_change_value,
                   tab_latest_price.curr_item_price - tab_latest_price.avg_cost AS avg_margin
                   
            FROM (SELECT DISTINCT catalog_item_id, seller_id from dctr_pla) dctr_pla_distinct 
            LEFT JOIN tab_latest_price
                 ON dctr_pla_distinct.catalog_item_id = tab_latest_price.catalog_item_id
                 AND dctr_pla_distinct.seller_id = tab_latest_price.seller_id
                 LEFT JOIN tab_avg_price
                 ON tab_latest_price.catalog_item_id = tab_avg_price.catalog_item_id
                 AND tab_latest_price.seller_id = tab_avg_price.seller_id
     """  
    query = query.replace('__today__', today_ds)
    query = query.replace('__daysback_end__', str(days_backtrack_end))
    query = query.replace('__daysback_start__', str(days_backtrack_start))
    query = query.replace('__bad_days__', bad_days)
    sql = spark.sql(query)
    df = sql.toPandas()
    return(df)

## func for site signals

In [9]:
def get_siteSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days):
    
    # this function is to retrieve the "site" signals of each item/ad
    # site signals are to measure the performance of an item/ad on whole Walmart site (SEM signals only indicate the performance of "SEM" channel)
    # signals are extracted from different time intervals as specified in the json file - 'output_sem_performance_prediction_config_train_config_20201227.json'
    
    partitionDate = spark.sql("""show partitions dmas.dfm""").rdd.flatMap(lambda x: x).map(lambda x: x.replace("ts=", "")).max()
    
    l2 = """SELECT session_id, dt, catlg_itm_id_set[0] as catalog_item_id, action_categ, action_sub_categ
                FROM us_dl_interactions_restrict.interactions_l2_session_events
                    WHERE vtc IS NOT NULL
                          AND vtc != ''
                          AND vtc != '\\N'
                          AND catlg_itm_id_set[0] IS NOT NULL
                          AND catlg_itm_id_set[0] != ''
                          AND (action_categ = 'pageView' or action_sub_categ = 'addToCart')
                          AND dt >= date_sub('__today__', __daysback_end__)
                          AND dt <= date_sub('__today__', __daysback_start__)
                          AND dt NOT IN __bad_days__
                          AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
                          AND context = 'productPage'"""
    l2 = l2.replace('__today__', today_ds)
    l2 = l2.replace('__daysback_end__', str(days_backtrack_end))
    l2 = l2.replace('__daysback_start__', str(days_backtrack_start))
    l2 = l2.replace('__bad_days__', bad_days)
    l2 = l2.replace('__partitionDate__', partitionDate)
    # session_id, dt, catalog_item_id, action_categ
    spark.sql(l2).write.mode("overwrite").partitionBy('dt', 'action_categ', 'action_sub_categ').saveAsTable('fair_dev.yp_test_interactions_l2')
    
    print('l2 saved for site signals')
    
    l1 = """SELECT distinct session_id, dt, itm_list[0]['catlgItm']['sellerId'] as sellerid, action_categ, action_sub_categ,
                            itm_list[0]['catlgItm']['productId'] as productid
                            FROM us_dl_interactions_restrict.interactions_l1_cbb_catalog
                            WHERE dt >= date_sub('__today__', __daysback_end__)
                            AND dt <= date_sub('__today__', __daysback_start__)
                            AND dt NOT IN __bad_days__
                            AND itm_list is not null
                            AND itm_list[0]['catlgItm']['sellerId'] IS NOT NULL
                            AND itm_list[0]['catlgItm']['productId'] IS NOT NULL
                            AND (action_categ = 'pageView' or action_sub_categ = 'addToCart')
                            AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
                            AND context = 'productPage'"""
    l1 = l1.replace('__today__', today_ds)
    l1 = l1.replace('__daysback_end__', str(days_backtrack_end))
    l1 = l1.replace('__daysback_start__', str(days_backtrack_start))
    l1 = l1.replace('__bad_days__', bad_days)
    l1 = l1.replace('__partitionDate__', partitionDate)
    # session_id, dt, action_categ, sellerid, productid
    spark.sql(l1).write.mode("overwrite").partitionBy('dt', 'action_categ', 'action_sub_categ').saveAsTable('fair_dev.yp_test_interactions_l1')
    
    print('l1 saved for site signals')
    
    dmas_seller = spark.sql("select * from fair_dev.yp_test_dmas_seller")
    dmas_item = spark.sql("select * from fair_dev.yp_test_dmas_item")
    l1 = """select * from fair_dev.yp_test_interactions_l1 
            WHERE dt >= date_sub('__today__', __daysback_end__)
                            AND dt <= date_sub('__today__', __daysback_start__)
                            AND dt NOT IN __bad_days__"""
    l1 = l1.replace('__today__', today_ds)
    l1 = l1.replace('__daysback_end__', str(days_backtrack_end))
    l1 = l1.replace('__daysback_start__', str(days_backtrack_start))
    l1 = l1.replace('__bad_days__', bad_days)
    l1_df = spark.sql(l1)
    
    l1_dmas = l1_df.join(broadcast(dmas_seller),dmas_seller.seller_id ==  l1_df.sellerid, how='inner').drop(l1_df.sellerid).drop(dmas_seller.seller_id)
    l1_dmas = l1_dmas.join(dmas_item,dmas_item.product_id ==  l1_dmas.productid, how='left').drop(dmas_item.product_id).drop(l1_dmas.productid)
    # session_id, dt, action_categ, us_seller_id, item_id on table l1
    l1_dmas.write.mode("overwrite").partitionBy('dt', 'action_categ', 'action_sub_categ').saveAsTable('fair_dev.yp_test_interactions_l1_dmas')
    
    print('pageview l1_dmas saved for site signals after getting seller_id and catalog_item_id from dmas table')
    
    l2 = """select * from fair_dev.yp_test_interactions_l2 
        WHERE dt >= date_sub('__today__', __daysback_end__)
                        AND dt <= date_sub('__today__', __daysback_start__)
                        AND dt NOT IN __bad_days__"""
    l2 = l2.replace('__today__', today_ds)
    l2 = l2.replace('__daysback_end__', str(days_backtrack_end))
    l2 = l2.replace('__daysback_start__', str(days_backtrack_start))
    l2 = l2.replace('__bad_days__', bad_days)
    l2_df = spark.sql(l2)
    
    l1_dmas = """select * from fair_dev.yp_test_interactions_l1_dmas 
        WHERE dt >= date_sub('__today__', __daysback_end__)
                        AND dt <= date_sub('__today__', __daysback_start__)
                        AND dt NOT IN __bad_days__"""
    l1_dmas = l1_dmas.replace('__today__', today_ds)
    l1_dmas = l1_dmas.replace('__daysback_end__', str(days_backtrack_end))
    l1_dmas = l1_dmas.replace('__daysback_start__', str(days_backtrack_start))
    l1_dmas = l1_dmas.replace('__bad_days__', bad_days)
    l1_dmas_df = spark.sql(l1_dmas)
    
    l2_l1_dmas = l2_df.join(l1_dmas_df, [(l2_df.session_id == l1_dmas_df.session_id) & (l2_df.dt == l1_dmas_df.dt) & (l2_df.action_categ == l1_dmas_df.action_categ)], how = 'left')\
    .drop(l1_dmas_df.action_categ).drop(l1_dmas_df.dt).drop(l1_dmas_df.session_id).drop(l1_dmas_df.item_id).drop(l1_dmas_df.action_sub_categ)
    l2_l1_dmas.write.mode("overwrite").partitionBy('dt', 'action_categ', 'action_sub_categ').saveAsTable('fair_dev.yp_test_interactions_l2_l1_dmas')
    # session_id, dt, action_categ, us_seller_id, catalog_item_id on l2 table
    print('l2_l1_dmas saved for site signals')
    
    l2_l1_dmas = """select * from fair_dev.yp_test_interactions_l2_l1_dmas 
        WHERE dt >= date_sub('__today__', __daysback_end__)
                        AND dt <= date_sub('__today__', __daysback_start__)
                        AND dt NOT IN __bad_days__"""
    l2_l1_dmas = l2_l1_dmas.replace('__today__', today_ds)
    l2_l1_dmas = l2_l1_dmas.replace('__daysback_end__', str(days_backtrack_end))
    l2_l1_dmas = l2_l1_dmas.replace('__daysback_start__', str(days_backtrack_start))
    l2_l1_dmas = l2_l1_dmas.replace('__bad_days__', bad_days)
    spark.sql(l2_l1_dmas).createOrReplaceTempView('l2_l1_dmas_df')
    
    
    query = """
            WITH   
            tab_product_view
            AS
            (SELECT catalog_item_id, us_seller_id AS seller_id,
                count(*) AS total_page_views
                FROM l2_l1_dmas_df
                WHERE action_categ = 'pageView'
                GROUP BY catalog_item_id, us_seller_id),
            
            tab_add_to_cart
            AS
            (SELECT catalog_item_id, us_seller_id AS seller_id,
                count(*) AS total_add_to_carts
                FROM l2_l1_dmas_df
                WHERE action_sub_categ = 'addToCart'
                GROUP BY catalog_item_id, us_seller_id),
                
            tab_auth1
            AS
            (SELECT daily_order.catalog_item_id, transaction.slr_org_cd,
               daily_order.order_nbr, 
               SUM(total_auth_amount) AS total_revenue, 
               SUM(total_auth_cost) AS total_cost 
               FROM (SELECT * FROM fair.daily_auth_order
                  WHERE auth_date >= date_sub('__today__', __daysback_end__)
                  AND auth_date <= date_sub('__today__', __daysback_start__)
                  AND auth_date NOT IN __bad_days__) daily_order

               LEFT JOIN (SELECT order_line_nbr, order_nbr,catlg_item_id,slr_org_cd 
                           FROM ww_crew_dl_rpt_vm.cnsld_order_item 
                           WHERE order_plcd_dt>= date_sub('__today__', __daysback_end__)
                           AND order_plcd_dt<= date_sub('__today__', __daysback_start__)
                           AND order_plcd_dt NOT IN __bad_days__) transaction
               ON transaction.order_nbr = daily_order.order_nbr
               AND transaction.order_line_nbr = daily_order.line_nbr

               GROUP BY daily_order.catalog_item_id, transaction.slr_org_cd, daily_order.order_nbr),

            tab_auth2
            AS
            (SELECT catalog_item_id, slr_org_cd as seller_id,
              count(*) AS total_orders,
              SUM(total_revenue) AS total_revenue,
              SUM(total_cost) AS total_cost,
              SUM(total_revenue) - SUM(total_cost) AS total_margin

            FROM

               tab_auth1

            GROUP BY catalog_item_id, slr_org_cd)
            
                -- final query
    
            SELECT dctr_pla_distinct.catalog_item_id AS catalog_item_id,
                   dctr_pla_distinct.seller_id,
                   tab_product_view.total_page_views AS total_page_views,
                   tab_add_to_cart.total_add_to_carts AS total_add_to_carts,
                   tab_auth2.total_revenue AS total_revenue,
                   tab_auth2.total_margin AS margin,
                   tab_add_to_cart.total_add_to_carts/tab_product_view.total_page_views AS atcpv,
                   tab_auth2.total_orders/tab_product_view.total_page_views AS convrt,
                   tab_auth2.total_revenue/tab_product_view.total_page_views AS rpv,
                   tab_auth2.total_margin/tab_product_view.total_page_views AS mpv,
                   tab_auth2.total_revenue/tab_auth2.total_orders AS ordersize,
                   tab_auth2.total_orders AS total_orders

            FROM (SELECT DISTINCT catalog_item_id, seller_id from dctr_pla) dctr_pla_distinct 
                 LEFT JOIN tab_product_view
                 ON dctr_pla_distinct.catalog_item_id = tab_product_view.catalog_item_id
                 AND dctr_pla_distinct.seller_id = tab_product_view.seller_id
                 LEFT JOIN tab_add_to_cart
                 ON dctr_pla_distinct.catalog_item_id = tab_add_to_cart.catalog_item_id
                 AND dctr_pla_distinct.seller_id = tab_add_to_cart.seller_id
                 LEFT JOIN tab_auth2
                 ON dctr_pla_distinct.catalog_item_id = tab_auth2.catalog_item_id
                 AND dctr_pla_distinct.seller_id = tab_auth2.seller_id
            """
    
#     query = """
#             WITH   
#             tab_product_view
#             AS
#             (SELECT catlg_itm_id_set[0] AS catalog_item_id, seller_id, 
#                 count(*) AS total_page_views

#                 FROM (SELECT * FROM us_dl_interactions_restrict.interactions_l2_session_events
#                     WHERE vtc IS NOT NULL
#                           AND vtc != ''
#                           AND vtc != '\\N'
#                           AND catlg_itm_id_set[0] IS NOT NULL
#                           AND catlg_itm_id_set[0] != ''
#                           AND action_categ = 'pageView'
#                           AND dt >= date_sub('__today__', __daysback_end__)
#                           AND dt <= date_sub('__today__', __daysback_start__)
#                           AND dt NOT IN __bad_days__
#                           AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
#                           AND context = 'productPage') interactions_l2
#                 LEFT JOIN (SELECT distinct session_id, dt, dmas.us_seller_id as seller_id 
#                             FROM us_dl_interactions_restrict.interactions_l1_cbb_catalog l1
#                             INNER JOIN (SELECT distinct seller_id, us_seller_id from dmas.dfm
#                                         WHERE ts = '__partitionDate__') dmas
#                             ON dt >= date_sub('__today__', __daysback_end__)
#                             AND dt <= date_sub('__today__', __daysback_start__)
#                             AND dt NOT IN __bad_days__
#                             AND itm_list is not null
#                             AND itm_list[0]['catlgItm']['sellerId'] IS NOT NULL
#                             AND action_categ = 'pageView'
#                             AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
#                             AND context = 'productPage'
#                             AND l1.itm_list[0]['catlgItm']['sellerId'] = dmas.seller_id) interactions_l1
#                 ON interactions_l2.session_id = interactions_l1.session_id
#                 AND interactions_l2.dt = interactions_l1.dt
                
#                 GROUP BY catlg_itm_id_set[0], seller_id),
                
#             tab_add_to_cart
#             AS
#             (SELECT catlg_itm_id_set[0] AS catalog_item_id, seller_id, 
#             count(*) AS total_add_to_carts

#         FROM (SELECT * FROM us_dl_interactions_restrict.interactions_l2_session_events
#                     WHERE vtc IS NOT NULL
#                           AND vtc != ''
#                           AND vtc != '\\N'
#                           AND catlg_itm_id_set[0] IS NOT NULL
#                           AND catlg_itm_id_set[0] != ''
#                           AND action_categ = 'addToCart'
#                           AND dt >= date_sub('__today__', __daysback_end__)
#                           AND dt <= date_sub('__today__', __daysback_start__)
#                           AND dt NOT IN __bad_days__
#                           AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
#                           AND context = 'productPage') interactions_l2
#             LEFT JOIN (SELECT distinct session_id, dt, dmas.us_seller_id as seller_id 
#                             FROM us_dl_interactions_restrict.interactions_l1_cbb_catalog l1
#                             INNER JOIN (SELECT distinct seller_id, us_seller_id from dmas.dfm
#                                         WHERE ts = '__partitionDate__') dmas
#                             ON dt >= date_sub('__today__', __daysback_end__)
#                             AND dt <= date_sub('__today__', __daysback_start__)
#                             AND dt NOT IN __bad_days__
#                             AND itm_list is not null
#                             AND itm_list[0]['catlgItm']['sellerId'] IS NOT NULL
#                             AND action_categ = 'addToCart'
#                             AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
#                             AND context = 'productPage'
#                             AND l1.itm_list[0]['catlgItm']['sellerId'] = dmas.seller_id) interactions_l1
#             ON interactions_l2.session_id = interactions_l1.session_id
#             AND interactions_l2.dt = interactions_l1.dt

#         GROUP BY catlg_itm_id_set[0], seller_id),
        
#         tab_auth1
#         AS
#         (SELECT daily_order.catalog_item_id, transaction.slr_org_cd,
#            daily_order.order_nbr, 
#            SUM(total_auth_amount) AS total_revenue, 
#            SUM(total_auth_cost) AS total_cost 
#            FROM (SELECT * FROM fair.daily_auth_order
#               WHERE auth_date >= date_sub('__today__', __daysback_end__)
#               AND auth_date <= date_sub('__today__', __daysback_start__)
#               AND auth_date NOT IN __bad_days__) daily_order
           
#            LEFT JOIN (SELECT order_line_nbr, order_nbr,catlg_item_id,slr_org_cd 
#                        FROM ww_crew_dl_rpt_vm.cnsld_order_item 
#                        WHERE order_plcd_dt>= date_sub('__today__', __daysback_end__)
#                        AND order_plcd_dt<= date_sub('__today__', __daysback_start__)
#                        AND order_plcd_dt NOT IN __bad_days__) transaction
#            ON transaction.order_nbr = daily_order.order_nbr
#            AND transaction.order_line_nbr = daily_order.line_nbr

#            GROUP BY daily_order.catalog_item_id, transaction.slr_org_cd, daily_order.order_nbr),
           
#         tab_auth2
#         AS
#         (SELECT catalog_item_id, slr_org_cd as seller_id,
#           count(*) AS total_orders,
#           SUM(total_revenue) AS total_revenue,
#           SUM(total_cost) AS total_cost,
#           SUM(total_revenue) - SUM(total_cost) AS total_margin
          
#         FROM
  
#            tab_auth1
           
#         GROUP BY catalog_item_id, slr_org_cd)
    
#     -- final query
    
#     SELECT dctr_pla_distinct.catalog_item_id AS catalog_item_id,
#            dctr_pla_distinct.seller_id,
#            tab_product_view.total_page_views AS total_page_views,
#            tab_add_to_cart.total_add_to_carts AS total_add_to_carts,
#            tab_auth2.total_revenue AS total_revenue,
#            tab_auth2.total_margin AS margin,
#            tab_add_to_cart.total_add_to_carts/tab_product_view.total_page_views AS atcpv,
#            tab_auth2.total_orders/tab_product_view.total_page_views AS convrt,
#            tab_auth2.total_revenue/tab_product_view.total_page_views AS rpv,
#            tab_auth2.total_margin/tab_product_view.total_page_views AS mpv,
#            tab_auth2.total_revenue/tab_auth2.total_orders AS ordersize,
#            tab_auth2.total_orders AS total_orders
          
#     FROM (SELECT DISTINCT catalog_item_id, seller_id from dctr_pla) dctr_pla_distinct 
#          LEFT JOIN tab_product_view
#          ON dctr_pla_distinct.catalog_item_id = tab_product_view.catalog_item_id
#          AND dctr_pla_distinct.seller_id = tab_product_view.seller_id
#          LEFT JOIN tab_add_to_cart
#          ON dctr_pla_distinct.catalog_item_id = tab_add_to_cart.catalog_item_id
#          AND dctr_pla_distinct.seller_id = tab_add_to_cart.seller_id
#          LEFT JOIN tab_auth2
#          ON dctr_pla_distinct.catalog_item_id = tab_auth2.catalog_item_id
#          AND dctr_pla_distinct.seller_id = tab_auth2.seller_id
        
#         """
    query = query.replace('__today__', today_ds)
    query = query.replace('__daysback_end__', str(days_backtrack_end))
    query = query.replace('__daysback_start__', str(days_backtrack_start))
    query = query.replace('__bad_days__', bad_days)
    query = query.replace('__partitionDate__', partitionDate)
    sql = spark.sql(query)
    df = sql.toPandas()
    return(df)

In [10]:
def get_siteSignal_item(today_ds, days_backtrack_end, days_backtrack_start, bad_days):
    
    # this function is to retrieve the "site" signals of each item/ad
    # site signals are to measure the performance of an item/ad on whole Walmart site (SEM signals only indicate the performance of "SEM" channel)
    # signals are extracted from different time intervals as specified in the json file - 'output_sem_performance_prediction_config_train_config_20201227.json'
    
    query = """
            WITH   
            tab_product_view
            AS
            (SELECT catlg_itm_id_set[0] AS catalog_item_id,
                count(*) AS total_page_views

                FROM us_dl_interactions_restrict.interactions_l2_session_events 

                WHERE vtc IS NOT NULL
                      AND vtc != ''
                      AND vtc != '\\N'
                      AND catlg_itm_id_set[0] IS NOT NULL
                      AND catlg_itm_id_set[0] != ''
                      AND action_categ = 'pageView'
                      AND dt >= date_sub('__today__', __daysback_end__)
                      AND dt <= date_sub('__today__', __daysback_start__)
                      AND dt NOT IN __bad_days__
                      AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
                      AND context = 'productPage'
                GROUP BY catlg_itm_id_set[0]),
                
            tab_add_to_cart
            AS
            (SELECT catlg_itm_id_set[0] AS catalog_item_id,
            count(*) AS total_add_to_carts

        FROM us_dl_interactions_restrict.interactions_l2_session_events

        WHERE vtc IS NOT NULL
              AND vtc != ''
              AND vtc != '\\N'
              AND catlg_itm_id_set[0] IS NOT NULL
              AND catlg_itm_id_set[0] != ''
              AND action_sub_categ = 'addToCart'
              AND dt >= date_sub('__today__', __daysback_end__)
              AND dt <= date_sub('__today__', __daysback_start__)
              AND dt NOT IN __bad_days__
              AND pipeline in ('PULSE_USOA_RWEB', 'ANIVIA_USOA_APP') 
              AND context = 'productPage'
        GROUP BY catlg_itm_id_set[0]),
        
        tab_auth1
        AS
        (SELECT catalog_item_id, 
           order_nbr, 
           SUM(total_auth_amount) AS total_revenue, 
           SUM(total_auth_cost) AS total_cost 
           FROM fair.daily_auth_order 
           WHERE 
              auth_date >= date_sub('__today__', __daysback_end__)
              AND auth_date <= date_sub('__today__', __daysback_start__)
              AND auth_date NOT IN __bad_days__
           GROUP BY catalog_item_id, order_nbr),
           
        tab_auth2
        AS
        (SELECT catalog_item_id,
          count(*) AS total_orders,
          SUM(total_revenue) AS total_revenue,
          SUM(total_cost) AS total_cost,
          SUM(total_revenue) - SUM(total_cost) AS total_margin
          
        FROM
  
           tab_auth1
           
        GROUP BY catalog_item_id)
    
    -- final query
    
    SELECT dctr_pla.catalog_item_id AS catalog_item_id,
           dctr_pla.seller_id AS seller_id,
           tab_product_view.total_page_views AS total_page_views,
           tab_add_to_cart.total_add_to_carts AS total_add_to_carts,
           tab_auth2.total_revenue AS total_revenue,
           tab_auth2.total_margin AS margin,
           tab_add_to_cart.total_add_to_carts/tab_product_view.total_page_views AS atcpv,
           tab_auth2.total_orders/tab_product_view.total_page_views AS convrt,
           tab_auth2.total_revenue/tab_product_view.total_page_views AS rpv,
           tab_auth2.total_margin/tab_product_view.total_page_views AS mpv,
           tab_auth2.total_revenue/tab_auth2.total_orders AS ordersize,
           tab_auth2.total_orders AS total_orders
          
    FROM dctr_pla LEFT JOIN tab_product_view
         ON dctr_pla.catalog_item_id = tab_product_view.catalog_item_id
         LEFT JOIN tab_add_to_cart
         ON tab_product_view.catalog_item_id = tab_add_to_cart.catalog_item_id
         LEFT JOIN tab_auth2
         ON tab_add_to_cart.catalog_item_id = tab_auth2.catalog_item_id
        
        """
    query = query.replace('__today__', today_ds)
    query = query.replace('__daysback_end__', str(days_backtrack_end))
    query = query.replace('__daysback_start__', str(days_backtrack_start))
    query = query.replace('__bad_days__', bad_days)
    
    sql = spark.sql(query)
    df = sql.toPandas()
    return(df)

# func for bounce signals

In [11]:
def get_bounceSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days):
    
    # bounce signals are to measure non-effective visit in SEM channel as well as whole walmart site
    # a bounce is a non-effective visit - for example viewed but not added to cart
    
    partitionDate = spark.sql("""show partitions dmas.dfm""").rdd.flatMap(lambda x: x).map(lambda x: x.replace("ts=", "")).max()
    
    l1 = """SELECT distinct session_id, dt, itm_list[0]['catlgItm']['sellerId'] as sellerid, action_categ,
                            itm_list[0]['catlgItm']['productId'] as productid
                            FROM us_dl_interactions_restrict.interactions_l1_cbb_catalog
                            WHERE dt >= date_sub('__today__', __daysback_end__)
                            AND dt <= date_sub('__today__', __daysback_start__)
                            AND dt NOT IN __bad_days__
                            AND pipeline = 'PULSE_USOA_RWEB'
                            AND itm_list is not null
                            AND itm_list[0]['catlgItm']['sellerId'] IS NOT NULL
                            AND itm_list[0]['catlgItm']['productId'] IS NOT NULL"""
    l1 = l1.replace('__today__', today_ds)
    l1 = l1.replace('__daysback_end__', str(days_backtrack_end))
    l1 = l1.replace('__daysback_start__', str(days_backtrack_start))
    l1 = l1.replace('__bad_days__', bad_days)
    l1 = l1.replace('__partitionDate__', partitionDate)
    # session_id, dt, action_categ, sellerid, productid
    spark.sql(l1).write.mode("overwrite").partitionBy('dt', 'action_categ').saveAsTable('fair_dev.yp_test_bounce_interactions_l1')
    
    print('l1 saved for bounce signals')
    
    dmas_seller = spark.sql("select * from fair_dev.yp_test_dmas_seller")
    dmas_item = spark.sql("select * from fair_dev.yp_test_dmas_item")
    l1 = """select * from fair_dev.yp_test_bounce_interactions_l1 
            WHERE dt >= date_sub('__today__', __daysback_end__)
                            AND dt <= date_sub('__today__', __daysback_start__)
                            AND dt NOT IN __bad_days__"""
    l1 = l1.replace('__today__', today_ds)
    l1 = l1.replace('__daysback_end__', str(days_backtrack_end))
    l1 = l1.replace('__daysback_start__', str(days_backtrack_start))
    l1 = l1.replace('__bad_days__', bad_days)
    l1_df = spark.sql(l1)
    
    l1_dmas = l1_df.join(broadcast(dmas_seller),dmas_seller.seller_id ==  l1_df.sellerid, how='inner').drop(l1_df.sellerid).drop(dmas_seller.seller_id)
    l1_dmas = l1_dmas.join(dmas_item,dmas_item.product_id ==  l1_dmas.productid, how='left').drop(dmas_item.product_id).drop(l1_dmas.productid)
    # session_id, dt, action_categ, us_seller_id, item_id on table l1
    l1_dmas.write.mode("overwrite").partitionBy('dt', 'action_categ').saveAsTable('fair_dev.yp_test_bounce_interactions_l1_dmas')
    
    print('pageview l1_dmas saved for bounce signals after getting seller_id and catalog_item_id from dmas table')
    
    l1_dmas = """select * from fair_dev.yp_test_bounce_interactions_l1_dmas 
        WHERE dt >= date_sub('__today__', __daysback_end__)
                        AND dt <= date_sub('__today__', __daysback_start__)
                        AND dt NOT IN __bad_days__"""
    l1_dmas = l1_dmas.replace('__today__', today_ds)
    l1_dmas = l1_dmas.replace('__daysback_end__', str(days_backtrack_end))
    l1_dmas = l1_dmas.replace('__daysback_start__', str(days_backtrack_start))
    l1_dmas = l1_dmas.replace('__bad_days__', bad_days)
    l1_dmas_df = spark.sql(l1_dmas)
    
    spark.sql(l1_dmas).createOrReplaceTempView('l1_dmas')
    
    query = """
            SELECT dctr_pla_distinct.catalog_item_id,
                      dctr_pla_distinct.seller_id, 
                      SUM(num_bounces) AS bounces,
                      SUM(num_visits) AS visits,
                      SUM(num_sem_visits) AS sem_visits,
                      SUM(num_sem_bounces) AS sem_bounces,
                      SUM(num_bounces)/SUM(num_visits) AS bounce_rate,
                      SUM(num_sem_bounces)/SUM(num_sem_visits) AS sem_bounce_rate
       
               FROM
               (SELECT DISTINCT session_id, 
                        mkt_veh, 
                        total_non_performance_metrics_view,
                        1 AS num_visits,
                        IF (total_non_performance_metrics_view = 1, 1,  0) AS num_bounces,
                        IF (mkt_veh = 'Paid:SEM', 1, 0) AS num_sem_visits,
                        IF (total_non_performance_metrics_view = 1 AND mkt_veh = 'Paid:SEM', 1, 0) AS num_sem_bounces
        
                 FROM us_dl_interactions_restrict.interactions_l3_session_stats_daily l3
        
                 WHERE dt >= date_sub('__today__', __daysback_end__)
                       AND dt <= date_sub('__today__', __daysback_start__)
                       AND dt NOT IN __bad_days__
                       AND pipeline = 'PULSE_USOA_RWEB')  cbb_l3
            
               LEFT JOIN l1_dmas cbb_l1
           
                ON cbb_l3.session_id = cbb_l1.session_id
    
                RIGHT JOIN (SELECT DISTINCT catalog_item_id, seller_id from dctr_pla) dctr_pla_distinct 
    
                ON dctr_pla_distinct.catalog_item_id = cbb_l1.item_id
                AND dctr_pla_distinct.seller_id = cbb_l1.us_seller_id
                
                GROUP BY dctr_pla_distinct.catalog_item_id, dctr_pla_distinct.seller_id
    
        """ 
    
#     query = """WITH 
#                l2_withsellerid 
#                AS
#                (SELECT DISTINCT l2.session_id, l2.catalog_item_id, l1.seller_id
#                 FROM 
#                 (SELECT DISTINCT catlg_itm_id_set[0] AS catalog_item_id, session_id
#                     FROM us_dl_interactions_restrict.interactions_l2_session_events
#                     WHERE dt >= date_sub('__today__', __daysback_end__)
#                     AND dt <= date_sub('__today__', __daysback_start__)
#                     AND dt NOT IN __bad_days__
#                     AND pipeline = 'PULSE_USOA_RWEB') l2
#                 INNER JOIN (
#                         SELECT distinct session_id, dt, dmas.us_seller_id as seller_id 
#                         FROM us_dl_interactions_restrict.interactions_l1_cbb_catalog l1
#                         INNER JOIN (
#                                     SELECT distinct seller_id, us_seller_id from dmas.dfm
#                                     WHERE ts = '__partitionDate__'
#                                     ) dmas
#                         ON dt >= date_sub('__today__', __daysback_end__)
#                         AND dt <= date_sub('__today__', __daysback_start__)
#                         AND dt NOT IN __bad_days__
#                         AND itm_list is not null
#                         AND itm_list[0]['catlgItm']['sellerId'] IS NOT NULL
#                         AND pipeline = 'PULSE_USOA_RWEB'
#                         AND l1.itm_list[0]['catlgItm']['sellerId'] = dmas.seller_id
#                     ) l1_withseller
#                 ON l2.session_id = l1_withseller.session_id
#                 AND l2.dt = l1_withseller.dt
                
                
#                 -- final query
    
#             SELECT dctr_pla_distinct.catalog_item_id,
#                       dctr_pla_distinct.seller_id, 
#                       SUM(num_bounces) AS bounces,
#                       SUM(num_visits) AS visits,
#                       SUM(num_sem_visits) AS sem_visits,
#                       SUM(num_sem_bounces) AS sem_bounces,
#                       SUM(num_bounces)/SUM(num_visits) AS bounce_rate,
#                       SUM(num_sem_bounces)/SUM(num_sem_visits) AS sem_bounce_rate
       
#                FROM
#                (SELECT DISTINCT session_id, 
#                         mkt_veh, 
#                         total_non_performance_metrics_view,
#                         1 AS num_visits,
#                         IF (total_non_performance_metrics_view = 1, 1,  0) AS num_bounces,
#                         IF (mkt_veh = 'Paid:SEM', 1, 0) AS num_sem_visits,
#                         IF (total_non_performance_metrics_view = 1 AND mkt_veh = 'Paid:SEM', 1, 0) AS num_sem_bounces
        
#                  FROM us_dl_interactions_restrict.interactions_l3_session_stats_daily l3
        
#                  WHERE dt >= date_sub('__today__', __daysback_end__)
#                        AND dt <= date_sub('__today__', __daysback_start__)
#                        AND dt NOT IN __bad_days__
#                        AND pipeline = 'PULSE_USOA_RWEB')  cbb_l3
            
#                LEFT JOIN l2_withsellerid cbb_l2
           
#                 ON cbb_l3.session_id = cbb_l2.session_id
    
#                 RIGHT JOIN (SELECT DISTINCT catalog_item_id, seller_id from dctr_pla) dctr_pla_distinct 
    
#                 ON dctr_pla_distinct.catalog_item_id = cbb_l2.catalog_item_id
#                 AND dctr_pla_distinct.seller_id = cbb_l2.seller_id
                
#                 GROUP BY dctr_pla_distinct.catalog_item_id, dctr_pla_distinct.seller_id
    
#         """ 
    
    query = query.replace('__today__', today_ds)
    query = query.replace('__daysback_end__', str(days_backtrack_end))
    query = query.replace('__daysback_start__', str(days_backtrack_start))
    query = query.replace('__bad_days__', bad_days)
    query = query.replace('__partitionDate__', partitionDate)
    
    sql = spark.sql(query)
    df = sql.toPandas()
    return(df)

In [12]:
def get_bounceSignal_item(today_ds, days_backtrack_end, days_backtrack_start, bad_days):
    
    # bounce signals are to measure non-effective visit in SEM channel as well as whole walmart site
    # a bounce is a non-effective visit - for example viewed but not added to cart
    query = """
               WITH 
               l3
               AS
               (SELECT DISTINCT session_id, 
                        mkt_veh, 
                        total_non_performance_metrics_view,
                        1 AS num_visits,
                        IF (total_non_performance_metrics_view = 1, 1,  0) AS num_bounces,
                        IF (mkt_veh = 'Paid:SEM', 1, 0) AS num_sem_visits,
                        IF (total_non_performance_metrics_view = 1 AND mkt_veh = 'Paid:SEM', 1, 0) AS num_sem_bounces
        
                 FROM us_dl_interactions_restrict.interactions_l3_session_stats_daily
        
                 WHERE dt >= date_sub('__today__', __daysback_end__)
                       AND dt <= date_sub('__today__', __daysback_start__)
                       AND dt NOT IN __bad_days__
                       AND pipeline = 'PULSE_USOA_RWEB'),
                       
                l3_item
                AS
                (SELECT catlg_itm_id_set[0] AS catalog_item_id,
                      SUM(num_bounces) AS bounces,
                      SUM(num_visits) AS visits,
                      SUM(num_sem_visits) AS sem_visits,
                      SUM(num_sem_bounces) AS sem_bounces,
                      SUM(num_bounces)/SUM(num_visits) AS bounce_rate,
                      SUM(num_sem_bounces)/SUM(num_sem_visits) AS sem_bounce_rate
                     
                FROM (SELECT DISTINCT catlg_itm_id_set, session_id from 
                      us_dl_interactions_restrict.interactions_l2_session_events
                      WHERE dt >= date_sub('__today__', __daysback_end__)
                      AND dt <= date_sub('__today__', __daysback_start__)
                      AND dt NOT IN __bad_days__
                      AND pipeline in ('PULSE_USOA_RWEB')
                      AND context = 'productPage') l2
                RIGHT JOIN l3
                ON l2.session_id = l3.session_id
                GROUP BY catlg_itm_id_set[0])
                
                -- final query
                
                SELECT dctr_pla.catalog_item_id, 
                       dctr_pla.seller_id,
                       l3_item.bounces,
                       l3_item.visits,
                       l3_item.sem_visits,
                       l3_item.sem_bounces,
                       l3_item.bounce_rate,
                       l3_item.sem_bounce_rate
                FROM dctr_pla
                LEFT JOIN l3_item
                ON dctr_pla.catalog_item_id = l3_item.catalog_item_id
    
        """ 
    
    query = query.replace('__today__', today_ds)
    query = query.replace('__daysback_end__', str(days_backtrack_end))
    query = query.replace('__daysback_start__', str(days_backtrack_start))
    query = query.replace('__bad_days__', bad_days)
    
    sql = spark.sql(query)
    df = sql.toPandas()
    return(df)

# query features

In [8]:
from datetime import datetime,timedelta

In [9]:
start_date = '2023-02-15'
gcp_directory = 'gs://msc_fair_airflow/rpc_model/'
ref_dates = []
for x in range(0, 5):
      ref_dates.append(str(datetime.strptime(start_date, '%Y-%m-%d').date() - timedelta(days=14)*x))

In [77]:
ref_dates

['2023-02-15', '2023-02-01', '2023-01-18', '2023-01-04', '2022-12-21']

In [10]:
config = load_json(gcp_directory+ 'rpc_time_window_config.json')

In [11]:
config

[{'com.walmartlabs.crm.unifiers.sem.SemSignalsUnifier': [{'name': 'l',
    'condition': "date_string_p BETWEEN date_sub('2020-12-25', 0 + 6) AND date_sub('2020-12-25', 0) AND date_string_p NOT IN ('2019-11-30', '2019-11-28', '2019-11-29', '2019-11-27', '2020-02-25', '2020-02-26', '2020-02-27', '2019-12-03', '2019-12-02', '2019-12-01')"},
   {'name': 'w1',
    'condition': "date_string_p BETWEEN date_sub('2020-12-25', 7 + 6) AND date_sub('2020-12-25', 7) AND date_string_p NOT IN ('2019-11-30', '2019-11-28', '2019-11-29', '2019-11-27', '2020-02-25', '2020-02-26', '2020-02-27', '2019-12-03', '2019-12-02', '2019-12-01')"},
   {'name': 'w2',
    'condition': "date_string_p BETWEEN date_sub('2020-12-25', 7 + 13) AND date_sub('2020-12-25', 7) AND date_string_p NOT IN ('2019-11-30', '2019-11-28', '2019-11-29', '2019-11-27', '2020-02-25', '2020-02-26', '2020-02-27', '2019-12-03', '2019-12-02', '2019-12-01')"},
   {'name': 'v2',
    'condition': "date_string_p BETWEEN date_sub('2020-12-25', 14 +

In [12]:
# bad days are typically holidays when RPC/target value pattern are very different from normal days to be excluded from training data
bad_days = """('2022-11-07', '2022-11-14', '2022-11-21', '2022-11-23', '2022-11-24', '2022-11-25')"""

# loop through configuration files to get features

In [13]:
config_sem = list(config[0].values())[0]
config_catalog = list(config[1].values())[0]
config_site = list(config[2].values())[0]
config_bounce = list(config[3].values())[0]

In [None]:
# starttime = time.time()
# partitionDate = spark.sql("""show partitions dmas.dfm""").rdd.flatMap(lambda x: x).map(lambda x: x.replace("ts=", "")).max()

# bb = """SELECT distinct seller_id, us_seller_id from dmas.dfm WHERE ts = '__partitionDate__'"""
# bb = bb.replace('__partitionDate__', partitionDate)
# spark.sql(bb).write.mode("overwrite").saveAsTable('fair_dev.yp_test_dmas_seller')

# bb = """SELECT distinct item_id, product_id from dmas.dfm WHERE ts = '__partitionDate__'"""
# bb = bb.replace('__partitionDate__', partitionDate)
# spark.sql(bb).write.mode("overwrite").saveAsTable('fair_dev.yp_test_dmas_item')

# endtime = time.time()
# print('time used for creating dmas copy table'+': ' + str(endtime - starttime))

for today_ds in ref_dates[:1]:
    starttime = time.time()
    print('Querying data for %s' % today_ds)
    df_feat_all = pd.DataFrame()
    
    # create table view of all catalog_item_ids of interest (today_ds -139 :  today_ds-7)
    join_dctr_item_id(today_ds)
    
    ###############################################################
    #*********************** SEM signals***************************
    
    # get target values
    ##{'name': 'l',
    #'condition': "date_string_p BETWEEN date_sub('2020-12-25', 0 + 6) AND date_sub('2020-12-25', 0) AND date_string_p NOT IN ('2019-11-30', '2019-11-28', '2019-11-29', '2019-11-27', '2020-02-25', '2020-02-26', '2020-02-27', '2019-12-03', '2019-12-02', '2019-12-01')"}

    config_tmp = config_sem[0]     
    name = config_tmp['name']
    condition = config_tmp['condition']
    
    #    ["('2020-12-25', 0 + 6)",
    # "('2020-12-25', 0)",
    # "('2019-11-30', '2019-11-28', '2019-11-29', '2019-11-27', '2020-02-25', '2020-02-26', '2020-02-27', '2019-12-03', '2019-12-02', '2019-12-01')"]
    backdays = re.findall(r'\(.*?\)', condition) #
    
    days_backtrack_end = backdays[0][1:-1].split(',')[1].split('+')
    days_backtrack_end = int(days_backtrack_end[0]) + int(days_backtrack_end[1])
    
    days_backtrack_start = int(backdays[1][1:-1].split(',')[1])
    
    # ordersize               double,
    # convrt                  double,
    # catalog_item_id         long,
    # adid                    long,
    df_tmp = get_semSignalCoop(today_ds, days_backtrack_end, days_backtrack_start, bad_days)
    df_tmp = df_tmp[['catalog_item_id', 'id_unique_variant', 'seller_id', 'brand_nm', 'adid', 'source_id', 'is_mobile','division_id',\
                    'super_department_id','department_id','category_id','sub_category_id']].copy()

    upload_csv_to_cloud_storage(df_tmp, gcp_directory+'sample_data_individual_feat/df_header_'+today_ds+'.csv')
    del df_tmp
    endtime = time.time()
    print('time used for creating header table for date '+today_ds+': ' + str(endtime - starttime))
    
    
    for idx in tqdm(range(len(config_sem))):
        print('processing sem and sem_coop features for date '+today_ds+' and name is '+config_sem[idx]['name'])
        config_tmp = config_sem[idx] 
        name = config_tmp['name']
        condition = config_tmp['condition']
    
        backdays = re.findall(r'\(.*?\)', condition) #
    
        days_backtrack_end = backdays[0][1:-1].split(',')[1].split('+')
        days_backtrack_end = int(days_backtrack_end[0]) + int(days_backtrack_end[1])
    
        days_backtrack_start = int(backdays[1][1:-1].split(',')[1])
    
        # sem_is_mobile_w2        int,
        # sem_impressions_w2      double,
        # sem_clicks_w2           double,
        # sem_adspend_w2          double,
        # sem_qty_w2              double,
        # sem_revenue_w2          double,
        # sem_orders_w2           double,
        # sem_margin_w2           double,
        # sem_ctr_w2              double,
        # sem_cpc_w2              double,
        # sem_rpc_w2              double,
        # sem_roas_w2             double,
        # sem_convrt_w2           double,
        # sem_ordersize_w2        double,
        # sem_mpc_w2              double,
        # sem_mpo_w2              double, 
#         df_tmp = get_semSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days)
        
#         df_tmp = df_tmp[['catalog_item_id', 'seller_id', 'source_id', 'adid', 'is_mobile', 'impressions', 'clicks', 'adspend', 'qty', 'gmv_revenue_adjustment', 'orders', 'margin',\
#                             'ctr', 'cpc', 'rpc', 'roas', 'convrt', 'ordersize', 'mpc', 'mpo']].copy()

#         columns = df_tmp.columns
#         for col in columns[5:]:
#             df_tmp.rename(columns = {col: 'sem_'+col+'_'+name}, inplace=True)
        
#         upload_csv_to_cloud_storage(df_tmp, gcp_directory+'sample_data_individual_feat/sem_'+name+'_'+today_ds+'.csv')
#         del df_tmp
#         endtime = time.time()
#         print('time used for sem signals for evergreen for date '+today_ds+' and name is '+config_sem[idx]['name']+': ' + str(endtime - starttime))
        
        # get sem coop features
        starttime = time.time()
        df_tmp = get_semSignalCoop(today_ds, days_backtrack_end, days_backtrack_start, bad_days)

        df_tmp = df_tmp[['catalog_item_id', 'seller_id', 'source_id', 'adid', 'is_mobile', 'impressions', 'clicks', 'adspend', 'qty', 'gmv_revenue_adjustment_brand','cost',\
                         'gmv_revenue_adjustment_seller', 'orders_brand', 'orders_seller', 'margin_brand', 'margin_seller',\
                    'ctr', 'cpc', 'rpc_brand', 'rpc_seller', 'roas_brand', 'roas_seller', 'convrt_brand', 'convrt_seller', 'ordersize_brand', 'ordersize_seller',\
                         'mpc_brand', 'mpc_seller', 'mpo_brand', 'mpo_seller']].copy()

        columns = df_tmp.columns
        for col in columns[5:]:
            df_tmp.rename(columns = {col: 'sem_'+col+'_'+name}, inplace=True)
        
        upload_csv_to_cloud_storage(df_tmp, gcp_directory+'sample_data_individual_feat/semcoop_'+name+'_'+today_ds+'.csv')
        del df_tmp
        endtime = time.time()
        print('time used for sem signals for coop for date '+today_ds+' and name is '+config_sem[idx]['name']+': ' + str(endtime - starttime))
        
    ###############################################################
    #*********************** catalog signals***********************
    
    print('querying catalog signals...')
    for config_tmp in config_catalog:
        print('processing catalog features for date '+today_ds+' and name is '+config_tmp['name'])
        starttime = time.time()
        name = config_tmp['name']
        condition = config_tmp['condition']
    
        backdays = re.findall(r'\(.*?\)', condition) #
    
        days_backtrack_end = backdays[0][1:-1].split(',')[1].split('+')
        days_backtrack_end = int(days_backtrack_end[0]) + int(days_backtrack_end[1])
    
        days_backtrack_start = int(backdays[1][1:-1].split(',')[1])
    
        # uber_curr_item_price_w1  double,
        # uber_num_appr_reviews_w1 double,
        # uber_avg_overall_rating_w1 double,
        # uber_price_change_ratio_w1 double,
        # uber_price_change_value_w1 double,
        # uber_avg_margin_w1         double,
        
        # "uber_division_w1"
        # "uber_super_dept_w1"
        # "uber_dept_w1"
        # "uber_cat_w1"
        # "uber_subcat_w1"
    
        df_tmp = get_catalogSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days)
        

        df_tmp = df_tmp[['catalog_item_id', 'seller_id', 'division','super_dept','dept','cat','subcat','curr_item_price', 'num_appr_reviews', 'avg_overall_rating', 'price_change_ratio',\
                            'price_change_value', 'avg_margin']].copy()

        columns = df_tmp.columns
        for col in columns[2:]:
            df_tmp.rename(columns={col: 'uber_'+col+'_'+name}, inplace=True)
        
        upload_csv_to_cloud_storage(df_tmp, gcp_directory+'sample_data_individual_feat/catalog_'+name+'_'+today_ds+'.csv')
        del df_tmp
        endtime = time.time()
        print('time used for catalog signals for date '+today_ds+' and name is '+config_tmp['name']+': ' + str(endtime - starttime))

    ###############################################################
    #*********************** site signals**************************
    print('querying site signals...')
    for config_tmp in config_site:
        name = config_tmp['name']
        condition = config_tmp['condition']
    
        backdays = re.findall(r'\(.*?\)', condition) #
    
        days_backtrack_end = backdays[0][1:-1].split(',')[1].split('+')
        days_backtrack_end = int(days_backtrack_end[0]) + int(days_backtrack_end[1])
    
        days_backtrack_start = int(backdays[1][1:-1].split(',')[1])
    
#         # site_page_views_w2         double,
#         # site_add_to_carts_w2       double,
#         # site_orders_w2             double,
#         # site_revenue_w2            double,
#         # site_atcpv_w2              double,
#         # site_convrt_w2             double,
#         # site_rpv_w2                double,
#         # site_mpv_w2                double,
    
#         df_tmp = get_siteSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days)
        
#         df_tmp = df_tmp[['catalog_item_id', 'seller_id', 'total_page_views', 'total_add_to_carts', 'total_orders', 'total_revenue',\
#                             'atcpv', 'convrt', 'rpv', 'mpv']].copy()
#         columns = df_tmp.columns
#         for col in columns[2:]:
#             df_tmp.rename(columns={col: 'site_'+col+'_'+name}, inplace=True)
        
#         df_tmp.to_csv('../rpc_model/sample_data_individual_feat/site_'+name+'_'+today_ds+'.csv')
#         del df_tmp
#         endtime = time.time()
#         print('time used for site offer level signals for date '+today_ds+' and name is '+config_tmp['name']+': ' + str(endtime - starttime))
        
        print('processing site item level features for date '+today_ds+' and name is '+config_tmp['name'])
        starttime = time.time()
        df_tmp = get_siteSignal_item(today_ds, days_backtrack_end, days_backtrack_start, bad_days)
        df_tmp = df_tmp[['catalog_item_id', 'seller_id', 'margin', 'total_page_views', 'total_add_to_carts', 'total_orders', 'total_revenue',\
                            'atcpv', 'convrt', 'rpv', 'mpv']].copy()
        columns = df_tmp.columns
        for col in columns[2:]:
            df_tmp.rename(columns={col: 'site_'+col+'_'+name}, inplace=True)
        
        upload_csv_to_cloud_storage(df_tmp, gcp_directory+'sample_data_individual_feat/site_item_'+name+'_'+today_ds+'.csv')
        del df_tmp
        endtime = time.time()
        print('time used for site item level signals for date '+today_ds+' and name is '+config_tmp['name']+': ' + str(endtime - starttime))
    ###############################################################
    #*********************** bounce signals**************************
    print('querying bounce signals...')
    for config_tmp in config_bounce:
        name = config_tmp['name']
        condition = config_tmp['condition']
    
        backdays = re.findall(r'\(.*?\)', condition) #
    
        days_backtrack_end = backdays[0][1:-1].split(',')[1].split('+')
        days_backtrack_end = int(days_backtrack_end[0]) + int(days_backtrack_end[1])
    
        days_backtrack_start = int(backdays[1][1:-1].split(',')[1])
    
#         # bc_bounce_rate_w1
#         # bc_bounces_w1
#         # bc_sem_bounce_rate_w1
#         # bc_sem_bounces_w1
#         # bc_sem_visits_w1
#         # bc_visits_w1

#         df_tmp = get_bounceSignal(today_ds, days_backtrack_end, days_backtrack_start, bad_days)

#         df_tmp = df_tmp[['catalog_item_id', 'seller_id', 'bounce_rate', 'bounces', 'sem_bounce_rate', 'sem_bounces',\
#                             'visits', 'sem_visits']].copy()
#         columns = df_tmp.columns
#         for col in columns[2:]:
#             df_tmp.rename(columns={col: 'bc_'+col+'_'+name}, inplace=True)
        
#         df_tmp.to_csv('../rpc_model/sample_data_individual_feat/bounce_'+name+'_'+today_ds+'.csv')
#         del df_tmp
#         endtime = time.time()
#         print('time used for bounce offer signals for date '+today_ds+' and name is '+config_tmp['name']+': ' + str(endtime - starttime))
        print('processing bounce item features for date '+today_ds+' and name is '+config_tmp['name'])
        starttime = time.time()
        df_tmp = get_bounceSignal_item(today_ds, days_backtrack_end, days_backtrack_start, bad_days)

        df_tmp = df_tmp[['catalog_item_id', 'seller_id', 'bounce_rate', 'bounces', 'sem_bounce_rate', 'sem_bounces',\
                            'visits', 'sem_visits']].copy()
        columns = df_tmp.columns
        for col in columns[2:]:
            df_tmp.rename(columns={col: 'bc_'+col+'_'+name}, inplace=True)
        
        upload_csv_to_cloud_storage(df_tmp, gcp_directory+'sample_data_individual_feat/bounce_item_'+name+'_'+today_ds+'.csv')
        del df_tmp
        endtime = time.time()
        print('time used for bounce item signals for date '+today_ds+' and name is '+config_tmp['name']+': ' + str(endtime - starttime))

dmas table renew done
Querying data for 2022-10-16
querying sem signals...
querying site signals...
processing site features for date 2022-10-16 and name is w2
l2 saved for site signals


In [98]:
# upload_local_to_cloud_storage('/home/y0c07y1/rpc_feature_extraction/query/test.csv', 'gs://msc_fair_airflow/kk/test1.csv')

# client = storage.Client()
# bucket = client.get_bucket('msc_fair_airflow')
# # list all objects in the directory
# blobs = bucket.list_blobs(prefix='logs')
# for blob in blobs:
#     blob.delete()

In [17]:
# today_ds = '2022-03-21'

# # insert data into table
# pd_df = pd.read_csv('data_lia_37/df_feat_all_'+today_ds+'.csv', index_col=0)
# pd_df['adid'] = pd_df['adid'].astype('str')
        
# sparkDF=spark.createDataFrame(pd_df) 
# sparkDF.registerTempTable("feat") 

# query = """INSERT INTO TABLE casesci_sem.lia_rpc_feature_test_notxt_cfl
#                 PARTITION(date_string = '__today__')
#                 SELECT * FROM feat"""

# query = query.replace('__today__', today_ds)
# spark.sql(query)     

In [18]:
# for config_tmp in config_site:
            
#         name = config_tmp['name']
#         condition = config_tmp['condition']
    
#         backdays = re.findall(r'\(.*?\)', condition) #
    
#         days_backtrack_end = backdays[0][1:-1].split(',')[1].split('+')
#         days_backtrack_end = int(days_backtrack_end[0]) + int(days_backtrack_end[1])
    
#         days_backtrack_start = int(backdays[1][1:-1].split(',')[1])