In [5]:
_predict_df = "select (Version.[Version Name]*Product.[Product].[196426]*Time.FiscalWeek*SalesAccount.[Account]*Location.[Location]*{Measure.[DPSellOutUnitsActuals],Measure.[Mean Pricing Save PCT],Measure.[Placement Count],Measure.[Promotion Count],Measure.[DPSellOutPrice]});"
_input_df = "select (Version.[Version Name]*Product.[Product].[208821]*Time.FiscalWeek*SalesAccount.[Account]*Location.[Location]*{Measure.[DPSellOutUnitsActuals],Measure.[Mean Pricing Save PCT],Measure.[Placement Count],Measure.[Promotion Count],Measure.[DPSellOutPrice]});"

from o9_common_utils.O9DataLake import O9DataLake, ResourceType, DataSource,PluginSetting

# register inputs
predict_df = O9DataLake.register("predict_df",data_source = DataSource.LS, entity_type = ResourceType.IBPL, query = _predict_df,plugin_setting = PluginSetting.Inputs)
input_df = O9DataLake.register("input_df",data_source = DataSource.LS, entity_type = ResourceType.IBPL, query = _input_df,plugin_setting = PluginSetting.Inputs)
liveinput = O9DataLake.register("WeeklySales",data_source = DataSource.LIVEFRAME,entity_type = ResourceType.LIVEFRAME,plugin_setting = PluginSetting.Inputs)
# register slice dimension
O9DataLake.register("Product.[Product]", data_source = DataSource.LS, entity_type = ResourceType.IBPL, plugin_setting = PluginSetting.SliceDimension)

# register outputs
O9DataLake.register("output1",data_source = DataSource.LS,entity_type = ResourceType.IBPL, plugin_setting = PluginSetting.Outputs)

# register script params
script_params = O9DataLake.register({"var1":"10","var2":"Test"}, data_source = DataSource.LS, plugin_setting = PluginSetting.ScriptParam)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
O9DataLake.inputs

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'predict_df': {'name': 'predict_df', 'resource_type': <ResourceType.IBPL: 'ibpl_query'>, 'data_source': <DataSource.LS: 'liveserver'>, 'query': 'select (Version.[Version Name]*Product.[Product].[196426]*Time.FiscalWeek*SalesAccount.[Account]*Location.[Location]*{Measure.[DPSellOutUnitsActuals],Measure.[Mean Pricing Save PCT],Measure.[Placement Count],Measure.[Promotion Count],Measure.[DPSellOutPrice]});', 'std_count_limit': '200000', 'df':    Version.[Version Name]  Product.[Product]  ... Promotion Count DPSellOutPrice
0                      S1             196426  ...             NaN            6.0
1      CurrentWorkingView             196426  ...             NaN            6.0
2                      S1             196426  ...             NaN            9.5
3      CurrentWorkingView             196426  ...             NaN            9.5
4                      S1             196426  ...             NaN            6.0
5      CurrentWorkingView             196426  ...             NaN    

In [7]:
# fetching inputs
predict_df = O9DataLake.get('predict_df')
input_df = O9DataLake.get('input_df')
liveinput = O9DataLake.get('WeeklySales')

# fetching script params
value1 = O9DataLake.get_script_param("var1")
value2 = O9DataLake.get_script_param("var2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
#user script

# package imports
import logging
from sklearn import tree

# initialize output variables
output1 = None
output2 = None

# initialize logger
logger = logging.getLogger('o9_logger')

logger.debug(f'predict_df dataframe:  {predict_df.count()}')
logger.debug(f'input_df dataframe:  {input_df.count()}')
logger.debug(f'liveinput dataframe:  {liveinput.count()}')
logger.debug(f'script param var1 value: {value1}')
logger.debug(f'script param var2 value: {value2}')

if predict_df is not None and len(predict_df.index) > 0:
    x = input_df[['Mean Pricing Save PCT', 'Placement Count', 'Promotion Count', 'DPSellOutPrice']]
    y = input_df[['DPSellOutUnitsActuals']]
    x = x.fillna(0).values
    y = y.fillna(0).values

    clf = tree.DecisionTreeRegressor()
    clf = clf.fit(x, y)

    predict = predict_df[['Mean Pricing Save PCT', 'Placement Count', 'Promotion Count', 'DPSellOutPrice']].fillna(0).values
    # predict_keys = predict_df[['Time.[FiscalWeek]', 'Product.[Product]', 'Version.[Version Name]', 'SalesAccount.[Account]','Location.[Location]']]
    # predict_keys["Product.[Product]"] = pd.to_numeric(predict_keys["Product.[Product]"], downcast='integer')
    predict_values = clf.predict(predict)
    predict_df["DPSellOutUnitsFcst"] = predict_values
    #out_df = pd.concat([predict_keys, out_df_part], axis=1)
    output1 = predict_df[['Time.[FiscalWeek]', 'Product.[Product]', 'Version.[Version Name]', 'SalesAccount.[Account]','Location.[Location]','DPSellOutUnitsFcst']]
    logger.info(output1)
   
else:
    output1 = None


print("Plugin Execution Finished")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2023-10-18 11:11:23,442 - o9_logger - DEBUG - predict_df dataframe:  Version.[Version Name]    14
Product.[Product]         14
Time.[FiscalWeek]         14
SalesAccount.[Account]    14
Location.[Location]       14
DPSellOutUnitsActuals     14
Mean Pricing Save PCT      0
Placement Count            0
Promotion Count            0
DPSellOutPrice            14
dtype: int64
2023-10-18 11:11:23,444 - o9_logger - DEBUG - input_df dataframe:  Version.[Version Name]    320
Product.[Product]         320
Time.[FiscalWeek]         320
SalesAccount.[Account]    320
Location.[Location]       320
DPSellOutUnitsActuals     130
Mean Pricing Save PCT      42
Placement Count           320
Promotion Count            42
DPSellOutPrice            130
dtype: int64
2023-10-18 11:11:23,477 - o9_logger - DEBUG - liveinput dataframe:  WalmartTime.[Day]             200000
Store.[Store_ID]              200000
Department.[Department_ID]    200000
Version.[Version Name]        200000
Fuel Price                    20

In [9]:
#pushing outputs to Live Server(make sure all dimensions,measures present in LiveServer)
O9DataLake.put('output1', output1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
spark.sql('show databases').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|    namespace|
+-------------+
|      default|
|    nifidelta|
|  schema_7348|
|  schema_7682|
|  schema_7704|
|  schema_7705|
|  schema_8633|
|  schema_8639|
|  schema_8787|
|  schema_8807|
|schemagetnifi|
|   schemanifi|
+-------------+

In [7]:
spark.sql('show tables in schema_7682').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+-----------+
|  namespace|           tableName|isTemporary|
+-----------+--------------------+-----------+
|schema_7682|         dimlocation|      false|
|schema_7682|          dimproduct|      false|
|schema_7682|     dimsalesaccount|      false|
|schema_7682|             dimtime|      false|
|schema_7682|          dimversion|      false|
|schema_7682|factcompetitorrel...|      false|
|schema_7682|    factdpbaseinputs|      false|
+-----------+--------------------+-----------+

In [8]:
spark.sql('select * from schema_7682.factdpbaseinputs limit 2').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------------+-------------------+------------------+-----------------------+------------------+------------------+----------------------+-------------------+-----------------+-----------------------+-------------+--------------+-------------+--------------+------------------+---------------------+-------------------------+---------------------+------------------------------+----------+-------------------------------+-----------------------+------------------------+--------------------+------------------+--------------+--------------------+-------------+----------+
|locationkey|productkey|salesaccountkey|            timekey|dpselloutunitsfcst|meanflyerspaceactualpct|commonflyerslotnum|commonflyershowind|dpsellinunitslyactuals|dpselloutunitsfcstr|dpsellinunitsfcst|dpselloutunitslyactuals|inputpicklist|placementcount|placementflag|dpselloutprice|commonflyerpagenum|dpselloutunitsactuals|commonflyerslotprioritynm|dpselloutaountactuals|meanpricingpromoregularnatlam

In [9]:
dptable = spark.sql('select * from schema_7682.factdpbaseinputs limit 2')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
dptable.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2