In [None]:
from forecast_process import *
import datalabframework
from pyspark.sql import functions as F
import json
from elastictools.doctools import DocTools
import pandas as pd
import os
import numpy as np

In [None]:
os.environ['PYSPARK_PYTHON'] = '/opt/conda/bin/python'

In [None]:
import warnings
warnings.filterwarnings('ignore')

## Write to elastic search

In [None]:
def to_elastic(flat_result_all_cat, index_name, doctype):
    settings = {
        "index":{
            "number_of_shards": 5,
            "number_of_replicas": 1,
            "mapping": {
                "total_fields":{
                    "limit": "1000"
                }
            },
        }
    }

    mapping = {
            doctype: { 
               "dynamic_templates": [
                   {"strings": {
                        "match_mapping_type": "string",
                        "mapping": {
                          "type": "keyword"
                        }
                      }
                    }
                ]
            }
        }

    uri = ['http://{}:{}'.format(ip, '9200') for ip in ['192.168.0.179', '192.168.0.178']]
    es = DocTools(uri)
    esi = es.indextool()
    esi.create(index_name, overwrite=True, settings=settings, mapping=mapping)
    res = es.bulk(index_name, flat_result_all_cat, doctype=doctype)

### Build Data

In [None]:
def select_category(product_quantity_date, cat_name = None, cat_root_name = None):
    """
    Select all transactions of a category (or all products) and caculate total sale of this category by date.
    
    Parameters
    ----------
    product_quantity_date: Spark.DataFrame
        Arregation quantity dataframe of all sku and transaction date
    cat_name: string
        Name of selected category
    
    Returns
    -------
    Spark.DataFrame
        Total sales of a category by date
    """
    if cat_name:
        cat_trans = product_quantity_date[product_quantity_date['cat_name'] == cat_name]
    elif cat_root_name:
        cat_trans = product_quantity_date[product_quantity_date['cat_root_name'] == cat_root_name]
    else:
        cat_trans = product_quantity_date[product_quantity_date['sku_id']!= "1206838"]\
                                         [product_quantity_date['sku_id']!= "1207652"]
    total_by_date = cat_trans.groupby('transaction_date').agg({'daily_quantity':'sum'}).reset_index()
    return total_by_date

In [None]:
def init_func():
    dlf = datalabframework.project.load()
    engine = datalabframework.project.engine()
    spark = engine.context()
    fact_transaction = engine.load('fact_table').select('sku_id', 'sku_name', 'transaction_date', 'quantity', \
                                                        'doc_type', 'unit_price', 'cat_id', 'cat_group_id', \
                                                        'cat_root_id', 'cat_name', 'cat_group_name', 'cat_root_name')
    product_quantity_date = fact_transaction.where(F.expr('doc_type == "PTX"') | F.expr('doc_type == "HDF"'))\
                .where(F.expr('unit_price != 0'))\
                .groupby('sku_id', 'sku_name', 'transaction_date', 'cat_id', 'cat_group_id', 'cat_root_id', 'cat_name', 'cat_group_name', 'cat_root_name')\
                .agg(F.sum('quantity').alias('daily_quantity'), F.avg('unit_price').alias('daily_price'))\
                .orderBy('transaction_date')
    # .values return a numpy array, each row of which is array of values in a row in pandas df
    all_cat_name = product_quantity_date.select('cat_name', 'cat_id').distinct().toPandas()[['cat_name', 'cat_id']].values
    all_cat_root_name = product_quantity_date.select('cat_root_name', 'cat_root_id').distinct().toPandas()[['cat_root_name', 'cat_root_id']].values
    all_cat = []
    for cat_name in all_cat_name:
        all_cat.append((tuple(cat_name), (cat_name[0], None)))
    for cat_root_name in all_cat_root_name:
        if cat_root_name not in all_cat_name:
            all_cat.append((tuple(cat_root_name), (None, cat_root_name[0])))
    all_cat.append((tuple(['total', '000000']), (None, None)))
    product_quantity_date = product_quantity_date.toPandas()
    product_quantity_date['daily_quantity'] = product_quantity_date['daily_quantity'].astype(np.int64)
    product_quantity_date['transaction_date'] = pd.to_datetime(product_quantity_date['transaction_date'])
    return product_quantity_date, all_cat

In [None]:
def init():
    product_quantity_date, all_cat = init_func()
    for cat in all_cat:
        if (cat[1][0] == None) & (cat[1][1] == None) & (cat[0][1] == None):
            all_cat.remove(cat)
    return product_quantity_date, all_cat

## Forecast part

In [None]:
def caculate_history_and_forecast(product_quantity_date, cat, freq_ = 'D'):
    total_by_date = select_category(product_quantity_date, *cat[1])
    flat_result_cv, flat_result_test, hist_data, preds = adaptive_forecast_process(total_by_date, freq_)
    if not flat_result_test:
        return None, None, None, None
    flat_result_test.update({'cat_name': cat[0][0], 'cat_id': cat[0][1]})
    for result in flat_result_cv:
        result.update({'cat_name': cat[0][0], 'cat_id': cat[0][1]})
    for pred in preds:
        pred.update({'cat_name': cat[0][0], 'cat_id': cat[0][1]})
    for hist in hist_data:
        hist.update({'cat_name': cat[0][0], 'cat_id': cat[0][1]})
    return flat_result_cv, flat_result_test, hist_data, preds

In [None]:
def run(freq_str, product_quantity_date, all_cat):
    freq_ = 'D'
    if freq_str == 'month':
        freq_ = 'M'
    elif freq_str == 'week':
        freq_ = 'W-SUN'
    flat_test_result_all_cat = []
    flat_cv_result_all_cat = []
    history_data = []
    preds_future = []
    n = 0
    for cat in all_cat:
        print(n, ' ', cat)
        n += 1
        flat_result_cv, flat_result_test, hist_data, preds = caculate_history_and_forecast(product_quantity_date, cat, freq_)
        if flat_result_test:
            flat_test_result_all_cat.append(flat_result_test)
        if flat_result_cv:
            flat_cv_result_all_cat.extend(flat_result_cv)
        if hist_data:
            history_data.extend(hist_data)
        if preds:
            preds_future.extend(preds)
    json.dump(flat_test_result_all_cat, open('json_output/category/' + freq_str + '/info_on_test.json', 'w', encoding = 'utf8'))
    json.dump(flat_cv_result_all_cat, open('json_output/category/' + freq_str + '/info_on_cv.json','w', encoding = 'utf8'))
    json.dump(history_data, open('json_output/category/' + freq_str + '/history_data.json','w', encoding = 'utf8'))
    json.dump(preds_future, open('json_output/category/' + freq_str + '/future_prediction.json', 'w', encoding = 'utf8'))

In [None]:
product_quantity_date, all_cat = init()

In [None]:
tmp_cat = [cat for cat in all_cat if cat[1][1]=='laptop']
tmp_cat

In [None]:
run('month', product_quantity_date, tmp_cat)
# run('month', product_quantity_date, all_cat)

In [None]:
run('week', product_quantity_date, tmp_cat)

In [None]:
run('day', product_quantity_date, tmp_cat)

In [None]:
preds_future = json.load(open('json_output/category/' + 'day'+ '/future_prediction.json', 'r'))

In [None]:
len(preds_future)

In [None]:
history = json.load(open('json_output/category/' + 'day'+ '/history_data.json', 'r'))

In [None]:
to_elastic(preds_future, 'hanh_forecast_category_day_prediction', 'prediction')

In [None]:
to_elastic(history, 'hanh_forecast_category_day_history', 'history')

## Test

In [None]:
product_quantity_date, all_cat = init()

In [None]:
max(product_quantity_date['transaction_date'])

In [None]:
total_by_date = select_category(product_quantity_date, None, 'laptop')

In [None]:
flat_result_cv, flat_result_test, history_data, future_preds = adaptive_forecast_process(total_by_date, 'D')

In [None]:
flat_result_test

In [None]:
history_data[-10:]

In [None]:
future_preds

In [None]:
pd.DataFrame(flat_result_cv)

In [None]:
cat = (('loa bluetooth jbl', '06-N001-02-12'), ('loa bluetooth jbl', None))
flat_result_cv, flat_result_test, hist_data, preds = caculate_history_and_forecast(product_quantity_date, cat, 'D')

In [None]:
pd.DataFrame(flat_result_cv)

In [None]:
flat_result_cv