In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import dateutil
from sklearn.metrics import precision_score, recall_score, confusion_matrix, precision_recall_curve

import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier, RandomForestClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from ext.DAC_utils import DAC_utils
from dac_automl.autotuning.classification_model_selector import ClassificationModelSelector

spark = SparkSession.builder.appName('DHTSDCT').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)


import numpy as np
import pandas as pd
import datetime
from pyspark.sql import SparkSession, DataFrameWriter
import pyspark.sql.functions as F
import pyspark.sql.types as types
from pyspark.sql.types import StructType
from pyspark.ml.stat import Summarizer
from functools import reduce
spark = SparkSession.builder.config('hive.stats.autogather', "false").appName("prod_c360_transformation").getOrCreate()
sc =spark.sparkContext

print(spark.version)

In [1]:
partition = range(20210302, 20210330)

In [2]:
for i in partition:
    print(i)

In [3]:
for item in range(20210301, 20210331):
    base = sqlContext.sql(''' INSERT OVERWRITE TABLE tmp_myvt_ecommerce_daily PARTITION (partition = '{}')
SELECT DISTINCT msisdn, myvt_cat
FROM tmp_myvt_ecommerce_daily_v2
WHERE partition = {}
'''.format(item,item))

In [4]:
for item in range(20210310, 20210315):
    base = sqlContext.sql(''' INSERT into TABLE tmp_myvt_ecommerce_daily_v4 PARTITION (partition = '{}')
SELECT distinct msisdn, 
	CASE WHEN lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus' then 'VEXERE'
		WHEN lower(hostname) rlike 'shemarookids|bookboxinc' THEN 'MONKEYJUNIOR'
		WHEN lower(hostname) rlike 'speaking24.com' THEN 'ELSASPEAK'
	END AS myvt_cat
FROM f_detail_url_https
WHERE 	partition='{}' and lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus|shemarookids|speaking24.com|bookboxinc'
'''.format(item,item))

In [5]:
for item in range(20210307, 20210310):
    base = sqlContext.sql(''' INSERT into TABLE tmp_myvt_ecommerce_daily_v4 PARTITION (partition = '{}')
SELECT distinct msisdn, 
	CASE WHEN lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus' then 'VEXERE'
		WHEN lower(hostname) rlike 'shemarookids|bookboxinc' THEN 'MONKEYJUNIOR'
		WHEN lower(hostname) rlike 'speaking24.com' THEN 'ELSASPEAK'
	END AS myvt_cat
FROM f_detail_url_https
WHERE 	partition='{}' and lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus|shemarookids|speaking24.com|bookboxinc'
'''.format(item,item))

In [6]:
    base = sqlContext.sql(''' INSERT into TABLE tmp_myvt_ecommerce_daily_v4 PARTITION (partition = '{}' )
SELECT distinct msisdn, 
	CASE WHEN lower(hostname) rlike 'kynaforkids.vn|teacher.scholastic.com|pbskids.org|starfall.com|earnenglishkids.britishcouncil.org|bookboxinc|raz-kids.com|busybeavers.com|shemarookids|britishcouncil.vn|funbrain.com|breakingnewsenglish.com|literactive.com|highlightskids.com|tienganh.monkey.edu.vn|monkeyjunior.vn' THEN 'MONKEYJUNIOR'
		WHEN lower(hostname) rlike 'khanacademy.org|hoctoantienganh.com|patrickjmt.com|wolframalpha.com|onlinemathlearning.com|math.com|happymath.vn|cth.edu.vn|contuhoc.com|blog.e2.com.vn|gmaths.edu.vn|monkeymath.vn|conhocgioi.com' THEN 'MONKEYMATH'
		WHEN lower(hostname) rlike 'studytienganh.vn|starfall.com|turtlediary.comkids-stories.html|english-online.at|reading.ecb.org|esl-bits.net|mightybook.com|pbskids.org|monkeystories.vn' THEN 'MONKEYSTORIES'
		WHEN lower(hostname) rlike 'elsaspeak.vn|apollo.edu.vn|tienganhmoingay.com|mshoagiaotiep.com|anhnguoxford.vn|britishcouncil.vn|elllo.org|funeasyenglish.com|go4english.com|lang-8.com|busuu.com|rosettastone.com|vocabsushi.com|learnenglish.de|examenglish.com|duolingo.com|memrise.com|e-space.vn|speaking24.com|learnenglish.britishcouncil.org' THEN 'ELSASPEAK'
		WHEN lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus|xeca|vexegiare|megabus|saolimousine|xedcarlimousine|xekhachgiare|tongdai-datxe|xekhachanphuquy|vetaugiare24h|saodieu|easybook|tongdaixekhacanhbuoi|sanvh|xethexe' THEN 'VEXERE'
	END AS myvt_cat
FROM f_detail_url_https
WHERE 	partition='{}' and lower(hostname) rlike 'kynaforkids.vn|teacher.scholastic.com|pbskids.org|starfall.com|earnenglishkids.britishcouncil.org|bookboxinc|raz-kids.com|busybeavers.com|shemarookids|britishcouncil.vn|funbrain.com|breakingnewsenglish.com|literactive.com|highlightskids.com|tienganh.monkey.edu.vn|monkeyjunior.vn|khanacademy.org|hoctoantienganh.com|patrickjmt.com|wolframalpha.com|onlinemathlearning.com|math.com|happymath.vn|cth.edu.vn|contuhoc.com|blog.e2.com.vn|gmaths.edu.vn|monkeymath.vn|conhocgioi.com|studytienganh.vn|starfall.com|turtlediary.comkids-stories.html|english-online.at|reading.ecb.org|esl-bits.net|mightybook.com|pbskids.org|monkeystories.vn|elsaspeak.vn|apollo.edu.vn|tienganhmoingay.com|mshoagiaotiep.com|anhnguoxford.vn|britishcouncil.vn|elllo.org|funeasyenglish.com|go4english.com|lang-8.com|busuu.com|rosettastone.com|vocabsushi.com|learnenglish.de|examenglish.com|duolingo.com|memrise.com|e-space.vn|speaking24.com|learnenglish.britishcouncil.orgen|vexere|pasoto|baolau|click1bus|futabus|xeca|vexegiare|megabus|saolimousine|xedcarlimousine|xekhachgiare|tongdai-datxe|xekhachanphuquy|vetaugiare24h|saodieu|easybook|tongdaixekhacanhbuoi|sanvh|xethexe'
'''.format(20210305,20210305))

In [7]:
for item in range(20210304, 20210306):
    base = sqlContext.sql(''' INSERT into TABLE tmp_myvt_ecommerce_daily_v4 PARTITION (partition = '{}')
SELECT distinct msisdn, 
	CASE WHEN lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus' then 'VEXERE'
		WHEN lower(hostname) rlike 'shemarookids|bookboxinc' THEN 'MONKEYJUNIOR'
		WHEN lower(hostname) rlike 'speaking24.com' THEN 'ELSASPEAK'
	END AS myvt_cat
FROM f_detail_url_https
WHERE 	partition='{}' and lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus|shemarookids|speaking24.com|bookboxinc'
'''.format(item,item))

In [8]:
 
base = sqlContext.sql(''' INSERT into TABLE tmp_myvt_ecommerce_daily_v4 PARTITION (partition = '20210304')
SELECT distinct msisdn, 
	CASE WHEN lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus' then 'VEXERE'
		WHEN lower(hostname) rlike 'shemarookids|bookboxinc' THEN 'MONKEYJUNIOR'
		WHEN lower(hostname) rlike 'speaking24.com' THEN 'ELSASPEAK'
	END AS myvt_cat
FROM f_detail_url_https
WHERE 	partition='20210304' and lower(hostname) rlike 'vexere|pasoto|baolau|click1bus|futabus|shemarookids|speaking24.com|bookboxinc'
''')

In [9]:
def get_pivot_tbl(table_dim, table_log, table_out = None, table_out_path = None, time_in_queue = None):
    '''Sinh bảng c360
    :table_dim: str: tên bảng danh mục, phải bao gồm các cột category, is_pivot_category
    :table_log: str: tên bảng log summary theo ngày. Định danh thuê bao phải là ISDN
    :table_out: str: tên bảng xoay ngang, PK là isdn, có các thông tin 
    :time_in_queue: datetime: ngày chạy dữ liệu mong muốn, dạng datetime.datetime.strptime('20200105', '%Y%m%d')'''
    if table_out_path is None:
        table_out_path = '/work_zone/upsell_vas/adp_new/tmp'
    if time_in_queue is None:
        time_in_queue = datetime.datetime.now()
    partition_inday = time_in_queue.strftime('%Y%m%d')
    partition_today = ( time_in_queue - datetime.timedelta(days = 1)).strftime('%Y%m%d')
    partition_firstday = (time_in_queue - datetime.timedelta(days = 1) ).strftime('%Y%m01')

    pddf_dims = sqlContext.sql('from '+ table_dim).toPandas()
    df_log = sqlContext.sql('from '+ table_log + ' where partition >= "{}"and partition <="{}"'.format(partition_firstday,partition_today)) # <--- fix this
    # df_log = sqlContext.sql('from '+ table_log + ' where partition >= 20210201 and partition <=20210401') # <--- fix this

    # Danh mục category cần pivot
    l_cat_to_pivot    = list( np.unique( pddf_dims[ (pddf_dims['is_pivot_category'] == '1') & (pddf_dims['category'] != '') ]['category'].sort_values().str.lower() ) )
    # Danh mục sub_category cần pivot
    l_subcat_to_pivot = list( np.unique( pddf_dims[ (pddf_dims['is_pivot_sub_categoryf'] == '1') & (pddf_dims['sub_category'] != '') ]['sub_category'].str.lower() ) )
    # Danh mục function cần pivot
    l_func_to_pivot   = list( np.unique( pddf_dims[ (pddf_dims['is_pivot_function'] == '1') & (pddf_dims['function'] != '')]['function'].str.lower() ) )
    
    # Group ISDN 
    grb_isdn = df_log.groupby(['isdn'])

    # PIVOT
    df_cat_pv = grb_isdn.pivot('category', l_cat_to_pivot).agg(F.sum('lognum').alias('cat_lognum'),F.sum('dnum').alias('cat_dnum') )
    df_subcat_pv = grb_isdn.pivot('sub_category', l_subcat_to_pivot).agg(F.sum('lognum').alias('subcat_lognum'),F.sum('dnum').alias('subcat_dnum') )
    df_fn_pv = grb_isdn.pivot('function', l_func_to_pivot).agg(F.sum('lognum').alias('fn_lognum'),F.sum('dnum').alias('fn_dnum') )

    # JOIN 3 TABLE 
    df_full = df_cat_pv.fillna(0).join(df_subcat_pv.fillna(0), on ='isdn', how = 'left').join(df_fn_pv.fillna(0), on = 'isdn', how = 'left')
 
    if table_out is not None:
        df_full.createOrReplaceTempView("tmp_vas_mytempTable")
        # Khởi tạo schema nếu không tìm thấy
        try:

            df = spark.sql('from '+ table_out)
 
        except:

            cols = [col for col in df_full.columns if col != 'isdn']
            cols2 = []
            for i in range(len(cols)):
                column = cols[i].replace(' ', '_')
                cols2.append(column)
                
            cols_list = ','.join([' isdn string '] + [col + ' bigint ' for col in cols2 if col != 'isdn'] ) + ', last_update string comment "ngày cập nhật gần nhất "'
            sql_create_tbl_schema = """
                CREATE EXTERNAL TABLE IF NOT EXISTS `{0}` 
                ({1})
                partitioned by (partition string comment 'tháng phát sinh log, định dạng YYYYMM01, ghi vào ngày DD-1')
                STORED AS parquet
                LOCATION '{2}/{0}'
                tblproperties ("parquet.compression" = "SNAPPY")""".format(table_out, cols_list, table_out_path)
            # print(sql_create_tbl_schema)
            sqlContext.sql(sql_create_tbl_schema)

        # Ghi dữ liệu vào bảng
        sqlContext.sql('INSERT OVERWRITE TABLE '+ table_out +' PARTITION (partition = {0}) select *, {1} as last_update from tmp_vas_mytempTable'.format(partition_firstday, partition_inday))
    return df_full
    
def _rename_column(df, newColumns):
    '''Đổi tên của 1 df thành tên cột mới
    :df: dataframe: bảng dữ liệu cần đổi
    :newColumns: tên columns mới
    
    return:df: dataframe đầu ra'''
    oldColumns = df.columns 
    mapping = dict(zip(oldColumns, newColumns))
    return df.select([F.col(c).alias(mapping.get(c, c)) for c in oldColumns])
    
def get_windowing_v2(tbl_pivoted, tbl_windowed, id_cols = ['isdn', 'partition', 'last_update'], partition_cur = None, partition_pre = None, table_windowed_path = 'work_zone/upsell_vas/tmp', time_in_queue = None, is_write= True):
    ''' Tạo dataframe windowing từ bảng pivoted
    :tbl_pivoted: string: tên bảng pivoted
    :tbl_windowed: str: tên bảng đầu ra 
    :id_cols: list of string: tên cột ID không windowing
    :partition_cur: str, yyyymm01: partition tháng n, tháng ghi vào partition của windowed table
    :partition_pre: str, yyyymm01: partition tháng n-1, tháng ghi vào partition của windowed table
    :table_windowed_path: str: đường dẫn lưu tên bảng (phải không có / ở cuối)
    '''
    date_current = datetime.datetime.now().strftime('%Y%m%d')
    if time_in_queue is None:
        time_in_queue = datetime.datetime.now()

    if partition_pre is None and partition_cur is None:
        partition_cur = ( time_in_queue - datetime.timedelta(days = 1) ).strftime('%Y%m01')
        partition_pre = ( time_in_queue - datetime.timedelta(days = 32) ).strftime('%Y%m01')
    
    # pivoted dataframe
    df_p_cur = sqlContext.sql("FROM %s WHERE partition = %s"%(tbl_pivoted, partition_cur))
    cols_p_selected = [col for col in df_p_cur.columns if col not in id_cols or col == 'isdn']
    df_p_cur_selected = df_p_cur[cols_p_selected]

    try:
        # Nếu đã có bảng thì lấy partition pre
        df_w_pre = sqlContext.sql("FROM %s WHERE partition = %s"%(tbl_windowed, partition_pre))
    except:
        # Nếu chưa có thì tạo schema (dataframe rỗng)
        schema_left_id_column = [types.StructField( 'isdn' , types.StringType(), False )]
        schema_mid_tupple = [
            (
                struct, 
                types.StructField( struct.name + '_n_1', struct.dataType, struct.nullable ),
                types.StructField( struct.name + '_n_2', struct.dataType, struct.nullable ),
                types.StructField( struct.name + '_n_3', struct.dataType, struct.nullable )
            )
        for struct in df_p_cur.schema
        if struct.name not in id_cols
        ]
        schema_mid = [struct for tup in schema_mid_tupple for struct in tup]
        schema_right_id_column = [  types.StructField( 'last_update' , types.StringType(), False ),
                                    types.StructField( 'partition' , types.StringType(), False )]
        schema = schema_left_id_column + schema_mid + schema_right_id_column

        df_w_pre = spark.createDataFrame(spark.sparkContext.emptyRDD(), types.StructType(schema) )
        # Lưu thành bảng
        (
            df_w_pre
            .write
            .format('parquet')
            .option("path", table_windowed_path + '/'+ tbl_windowed)
            .option("compression", "snappy")
            .partitionBy("partition")
            .mode("overwrite").saveAsTable( tbl_windowed)
        )

    # rename previous partition columns
    cols_w_selected = ['isdn'] + [col for col in df_w_pre.columns if col[-4:] != '_n_3' and col not in id_cols]
    newColumns_w_pre = ['isdn'] + [col for col in df_w_pre.columns if col[-4:-1] == '_n_'] # Lấy tên của các cột _n_ để ghi vào các cột tịnh tiến timeframe

    df_w_pre_selected = df_w_pre[cols_w_selected]
    df_w_pre_rename = _rename_column(df_w_pre_selected, newColumns_w_pre)

    # Concat to get final dataframe
    df_final = df_p_cur_selected.join(df_w_pre_rename, on = 'isdn', how = 'full')
    df_final = df_final.withColumn('last_update', F.lit(date_current))
    df_final = df_final.withColumn('partition', F.lit(partition_cur))

    columns_with_ordered = df_w_pre.columns

    df_final2 = df_final[columns_with_ordered].drop('partition').fillna(0)
    table_tmp_path = '/work_zone/upsell_vas/adp_new/vas'
    if is_write:
        spark.conf.set( "spark.sql.sources.partitionOverwriteMode", "dynamic" )
        spark.conf.set( "hive.exec.dynamic.partition.mode", "nonstrict" )
        df_final2.createOrReplaceTempView('tmp_table_c360_at')
        ( # save result to a tmp table 
            df_final2.write
            .format('parquet')
            .option("path", table_tmp_path + '/tmp/tmp_to_overwrite_partition_of_table_' + tbl_windowed)
            .option("compression", "snappy")
            # .partitionBy("partition")
            .mode("overwrite").saveAsTable( 'tmp_to_overwrite_partition_of_table_'+tbl_windowed)
        )
        spark.sql('refresh table tmp_to_overwrite_partition_of_table_{}'.format(tbl_windowed))
        sqlContext.sql('INSERT OVERWRITE TABLE %s PARTITION (partition = %s) FROM tmp_to_overwrite_partition_of_table_%s'%(tbl_windowed,partition_cur, tbl_windowed))

    return df_final2
    

In [10]:
df_full = get_pivot_tbl('d_myvtcdr_mapping', 'f_myvt_payment_summary_by_function_mon', table_out = 'f_myvt_payment_summary_by_function_pivoted_mon',table_out_path ='/work_zone/upsell_vas/adp_new/vas/tmp', time_in_queue = datetime.datetime.strptime('20210103', '%Y%m%d') )

In [11]:
a = spark.sql('from f_myvt_payment_summary_by_function_pivoted_mon limit 10')
a.columns

In [12]:
a = get_windowing_v2('f_myvt_payment_summary_by_function_pivoted_mon', 
                'f_myvt_payment_summary_by_function_windowed_mon', id_cols = ['isdn', 'partition', 'last_update'], partition_cur = '20210401', partition_pre = '20210301', 
                table_windowed_path = '/work_zone/upsell_vas/adp_new/tmp', time_in_queue = None, is_write= True)