In [1]:
# coding: utf-8

import pandas as pd
from impala.dbapi import connect
from impala.util import as_pandas
import ast
import redis_io as redis_io

def agg_montly_total_amount_by_product(year, product_cate):
    
    # Redis read cache value
    # REDIS_KEY = "monthly_total_amount_per_product:{0}:{1}".format(year,product_cate.encode("UTF-8"))
    # cached_data = redis_io.read_transaction(REDIS_KEY)
    
    # if cached_data != None:
    #    return ast.literal_eval(cached_data)
    #
    
    conn = connect(host='salest-master-server', port=21050)
    cur = conn.cursor()
    
    cur.execute('USE salest')
    
    query_str = """
        SELECT * FROM (
            SELECT SUBSTR(view_tr_receipt.date_receipt_num,1,7) AS year_month, 
                  view_tr_receipt.num_of_product, view_tr_receipt.sales_amount AS total_amount,
                  ext_menumap_info.product_name, ext_menumap_info.cate_name, ext_menumap_info.price
            FROM (SELECT * FROM ext_tr_receipt WHERE SUBSTR(date_receipt_num,1,4) = '%s'
            ) view_tr_receipt JOIN ext_menumap_info USING (product_code)
        ) view_tr_receipt_menumap
        WHERE cate_name = '%s'
        """  % (year,product_cate)
        
    #cur.execute(query_str.encode("UTF-8"))
    cur.execute(query_str)
  
    df_monthly_product_tr = as_pandas(cur)
    conn.close()
        
    column_func_tuple = [('total_amount','sum')]
    df_monthly_summary = df_monthly_product_tr.groupby(['year_month','product_name'])['total_amount'].agg(column_func_tuple)
    df_monthly_summary.rename(columns={'total_amount': 'total_amount_B'}, inplace=True)

    df_default = genDefaultMontlyCateTotalAmountDataFrame(df_monthly_summary,year, 'product_name')
    df_default.rename(columns={'total_amount': 'total_amount_A'}, inplace=True)

    df_per_category = pd.concat([df_default, df_monthly_summary], axis=1).fillna(0)

    def post_aggregation(row):
        return row[0] + row[1]
    
    df_per_category['total_amount'] = df_per_category.apply(post_aggregation, axis=1)
    df_per_category.drop(['total_amount_A','total_amount_B'],axis=1,inplace=True)
  
     # Overall Top 10 menu items in category 
    
    df_topten_products_by_total_amount = df_monthly_product_tr.groupby(['product_name']).sum().sort_values(by='total_amount', ascending=False)[:10]
    df_topten_products_by_total_amount.drop(['num_of_product'],axis=1, inplace=True)
    df_topten_products_by_total_amount.rename(columns={'total_amount':'overall_total_amount'},inplace=True)
        
    # Redis save cache value
    redis_io.write_transaction(product_cate, df_topten_products_by_total_amount.index.tolist(), 60*60*24)
    #
    
    # Merge the above two dataframes
    df_new = df_per_category.reset_index(level=0)
    df_merged = pd.merge(df_new, df_topten_products_by_total_amount, left_index=True, right_index=True, how='left').sort_values(by='year_month', ascending=True)
    
    def agg_monthly_items_summary(row):
        sr_columns = row[row['overall_total_amount'].notnull()].index
        sr_values = row[row['overall_total_amount'].notnull()]['total_amount']

        etcSum = row[row['overall_total_amount'].isnull()]['total_amount'].sum()

        sr_columns = sr_columns.insert(sr_columns.size,'ETC')
        sr_etc = pd.Series([etcSum], index=['ETC'])
        sr_values = sr_values.append(sr_etc)

        return pd.Series(sr_values, index=sr_columns)
    
    df_merged_new = df_merged.reset_index(level=0)

    df_agg_monthly_summary = df_merged.groupby(['year_month']).apply(agg_monthly_items_summary)#.unstack()
    df_agg_monthly_summary.fillna(0,inplace=True)
    
    monthlyDictItems = df_agg_monthly_summary.apply(gen_dict_total_amount,axis=1)
    
    mothlyTotalAmountDict = {}
    mothlyTotalAmountList = []
    for item in monthlyDictItems:
        mothlyTotalAmountList.append(item)
    mothlyTotalAmountDict['total_amount'] = mothlyTotalAmountList
    
    # Redis save cache value
    # redis_io.write_transaction(REDIS_KEY, mothlyTotalAmountDict)
    #
    
    return mothlyTotalAmountDict


#################################################################################################################################
# Utility Functions
#################################################################################################################################

def gen_dict_total_amount(row):
    
    monthlyDict = {}
    monthlyDictStr = "{"
    for key,value in zip(row.index, row): 
        monthlyDictStr += "'{0}':{1},".format(key,value)
    
    monthlyDictStr = monthlyDictStr[:-1]
    monthlyDictStr += "}"
    
    monthlyDict = ast.literal_eval(monthlyDictStr)
    monthlyDict['year_month'] = row.name
 
    return monthlyDict


def genDefaultMontlyCateTotalAmountDataFrame(df_monthly_product_tr,year, second_index_name):
    unique_count_of_category = len(df_monthly_product_tr.index.get_level_values(1).unique())

    month_index_arr = []
    cate_index_arr = []

    for month in range(1,13):
        for cate in range(unique_count_of_category):
            month_index_arr.append("{0}-{1:02d}".format(year,month))

    unique_cate_index_arr = df_monthly_product_tr.index.get_level_values(1).unique()
    #df_monthly_product_tr.ix[0:unique_count_of_category].index.get_level_values(1)

    for month in range(1,13):
        for cate in unique_cate_index_arr:
            cate_index_arr.append(cate)

    full_month_cate_multi_index = pd.MultiIndex.from_tuples(zip(month_index_arr, cate_index_arr), 
                                                  names=['year_month', second_index_name])
    df_full_month_cate_default = pd.DataFrame(0, index=full_month_cate_multi_index, columns=['total_amount'])
    return df_full_month_cate_default

In [2]:
agg_montly_total_amount_by_product('2014', '커피')

{'total_amount': [{'ETC': 0.0,
   'year_month': '2014-01',
   '\xeb\xb0\x94\xeb\x8b\x90\xeb\x9d\xbc\xeb\x9d\xbc\xeb\x96\xbc': 0.0,
   '\xec\x95\x84\xeb\xa9\x94\xeb\xa6\xac\xec\xb9\xb4\xeb\x85\xb8': 0.0,
   '\xec\x95\x84\xec\x9d\xb4\xec\x8a\xa4\xeb\xb0\x94\xeb\x8b\x90\xeb\x9d\xbc\xeb\x9d\xbc\xeb\x96\xbc': 0.0,
   '\xec\x95\x84\xec\x9d\xb4\xec\x8a\xa4\xec\x95\x84\xeb\xa9\x94\xeb\xa6\xac\xec\xb9\xb4\xeb\x85\xb8': 0.0,
   '\xec\x95\x84\xec\x9d\xb4\xec\x8a\xa4\xec\xb9\xb4\xeb\x9d\xbc\xeb\xa9\x9c\xeb\xa7\x88\xeb\x81\xbc\xec\x95\xbc\xeb\x98\x90': 0.0,
   '\xec\x95\x84\xec\x9d\xb4\xec\x8a\xa4\xec\xb9\xb4\xed\x8e\x98\xeb\x9d\xbc\xeb\x96\xbc': 0.0,
   '\xec\x95\x84\xec\x9d\xb4\xec\x8a\xa4\xec\xb9\xb4\xed\x8e\x98\xeb\xaa\xa8\xec\xb9\xb4': 0.0,
   '\xec\xb9\xb4\xeb\x9d\xbc\xeb\xa9\x9c\xeb\xa7\x88\xeb\x81\xbc\xec\x95\xbc\xeb\x98\x90': 0.0,
   '\xec\xb9\xb4\xed\x8e\x98\xeb\x9d\xbc\xeb\x96\xbc': 0.0,
   '\xec\xb9\xb4\xed\x8e\x98\xeb\xaa\xa8\xec\xb9\xb4': 0.0},
  {'ETC': 0.0,
   'year_month': '2014-02