# Micro Segmentation Code on Pyspark

In [1]:
from __future__ import division, print_function
import pyspark
import pandas as pd
import random
import numpy as np
from pathlib2 import Path
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext, SparkConf
conf = pyspark.SparkConf().set("spark.yarn.queue","root.services.Lynx").set("spark.executors.cores","5").set("spark.executor.instances","5").set("spark.executor.memory", "15gb").set("spark.dynamicAllocation.enabled","false").set("spark.driver.memory","10g")
spark = SparkSession.builder.config(conf=conf).master("yarn").enableHiveSupport().getOrCreate()
sc = SparkContext.getOrCreate()
from functools import reduce
from collections import OrderedDict
from pyspark.sql.functions import udf,concat, col, lit,substring,max,min,regexp_replace,length,when
from pyspark.sql.types import StringType,IntegerType
pd.set_option('display.max_columns', None)
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

In [2]:
import calendar
from datetime import datetime, timedelta
import os
import pathlib
import sys
from dateutil.relativedelta import relativedelta
import time

# Micro Segement Started

In [3]:
start_time = time.time()

In [4]:
#Create dateformate for complete the SQL code
current_month = "2019-02-01"
current_month_start = datetime.strptime(current_month, '%Y-%m-%d')

print('Current month start:', current_month_start)
num_of_days_in_month = calendar.monthrange(current_month_start.year, current_month_start.month)[1]
current_month_end = current_month_start.replace(day=num_of_days_in_month)
print('Current month end:', current_month_end)
pathlib.Path('gen/').mkdir(exist_ok=True)

end_plus_one_day = current_month_end + timedelta(days=1)
one_month_earlier_start = current_month_start - relativedelta(months=1)
two_month_earlier_start = current_month_start - relativedelta(months=2)

percentile_query = f"""
  with temp_table as (SELECT cume_dist() OVER  (ORDER BY NUM_OUT_CALL) AS CALL_PERCENT,
    NUM_OUT_CALL,
    cume_dist() OVER (ORDER BY NUM_OUT_SMS) AS SMS_PERCENT,
    NUM_OUT_SMS,
    cume_dist() OVER  (ORDER BY DATA_VOLUME) AS DATA_PERCENT,
    DATA_VOLUME AS DATA_VOLUME_VALUE,
    cume_dist() OVER (ORDER BY DATA_VOLUME) AS HIGH_DATA_PERCENT,
    DATA_VOLUME AS DATA_VOLUME_HIGH
  FROM (
    SELECT NULLIF(NUM_OUT_CALL, 0) AS NUM_OUT_CALL,
      NULLIF(NUM_OUT_SMS, 0) AS NUM_OUT_SMS,
      NULLIF(DATA_VOLUME, 0) AS DATA_VOLUME
    FROM cav.cav_prepaid_monthly_tbl
        WHERE 
            DATA_MONTH = '"""+current_month_start.strftime("%Y%m")+"""') tb1) 
SELECT 
  min(case when CALL_PERCENT >= 0.5 then NUM_OUT_CALL end) as CALL,
  min(case when SMS_PERCENT >= 0.8 then NUM_OUT_SMS end) as SMS,
  min(case when DATA_PERCENT >= 0.25 then DATA_VOLUME_VALUE end) as LOW_DATA,
  min(case when DATA_PERCENT >= 0.75 then DATA_VOLUME_VALUE end) as HIGH_DATA
  FROM temp_table"""

df_percentile = spark.sql(percentile_query).toPandas()
call_threshold = '{:.2f}'.format(df_percentile.at[0, 'CALL'])
sms_threshold = '{:.2f}'.format(df_percentile.at[0, 'SMS'])
low_data_threshold = '{:.2f}'.format(df_percentile.at[0, 'LOW_DATA'])
high_data_threshold = '{:.2f}'.format(df_percentile.at[0, 'HIGH_DATA'])

Current month start: 2018-10-01 00:00:00
Current month end: 2018-10-31 00:00:00


In [None]:
replacement_dict = {
      'minus_one_month': one_month_earlier_start.strftime('%Y%m%d'),
      'minus_one_month_2': one_month_earlier_start.strftime('%Y-%m-%d'),
      'minus_two_month_1': two_month_earlier_start.strftime('%Y%m%d'),
      'minus_two_month_2': two_month_earlier_start.strftime('%Y-%m-%d'),
      'current_month_end': current_month_end.strftime('%Y%m%d'),
      'current_month_end_2': current_month_end.strftime('%Y%m%d'),
      'current_month_end_3': current_month_end.strftime('%Y-%m-%d'),
      'end_plus_one_day_1': end_plus_one_day.strftime('%Y%m%d'),
      'end_plus_one_day_2': end_plus_one_day.strftime('%Y-%m-%d'),
      'current_month_start_1': current_month_start.strftime('%Y%m%d'),
      'current_month_start_2': current_month_start.strftime('%Y-%m-%d'),
      'current_month_start_3': current_month_start.strftime('%Y%m'),
      'current_month_start_4': current_month_start.strftime('%m'),
      'low_data': low_data_threshold,
      'high_data': high_data_threshold,
      'call_threshold': call_threshold,
      'sms_threshold': sms_threshold
  }

In [None]:
sql_MI_SPENDING = """
CREATE TABLE clm.MS_TMP_{current_month_start_3}_MI_SPENDING AS
SELECT tbl.*, AVG_MI_SPENDING, TOTAL_MI_SPENDING, MI_COUNT_VOLUME, MI_COUNT_DAILY, MI_COUNT_WEEKLY,
  MI_COUNT_MONTHLY, MI_COUNT_ADD_ON, COUNT_DAILY_RM3, COUNT_DAILY_RM5, COUNT_WEEKLY_RM6,
  COUNT_WEEKLY_RM10, COUNT_WEEKLY_RM19, COUNT_MONTHLY_RM30, COUNT_MONTHLY_RM50, COUNT_MONTHLY_RM79,
  COUNT_OTHER_MI, ADD_ON_REV, MAIN_MI_REV
FROM(
SELECT SUB_ARR_ID, avg(TOTAL_REVENUE) as AVG_MI_SPENDING2, sum(TOTAL_REVENUE) / avg(adj_factor) *30 as TOTAL_MI_SPENDING2,
  sum(CASE WHEN ADD_ON = 1 THEN TOTAL_REVENUE ELSE 0 END) / avg(adj_factor) *30 AS ADD_ON_REV2,
  sum(CASE WHEN ADD_ON = 0 THEN TOTAL_REVENUE ELSE 0 END) / avg(adj_factor) *30 AS MAIN_MI_REV2
FROM cav.cav_mi_daily_tbl y 
INNER JOIN clm.MI_PREPAID_PLANS_CELCOM_201711 y2
on y.ADJ_PROD_DESC = y2.ADJ_PROD_DESC
LEFT JOIN (
    SELECT SUBSCRIBER_ARRANGEMENT_ID as TMP_ID,
    sum(CASE WHEN SUBSCRIPTION_TENURE IS NOT NULL THEN 1 END) as adj_factor
    FROM cav.cav_prepaid_daily_tbl
    WHERE 
    DATA_DAY between to_date('{minus_two_month_1}') and to_date('{current_month_end}')
    GROUP BY SUBSCRIBER_ARRANGEMENT_ID
  ) z
ON y.SUB_ARR_ID = z.TMP_ID
WHERE DATA_DAY > to_date('{minus_two_month_2}') and DATA_DAY < to_date('{end_plus_one_day_2}')
    and EXCLUDE_V <> 1
GROUP BY SUB_ARR_ID
) tbl
-- current month MI purchases
LEFT JOIN ( 
 SELECT SUB_ARR_ID, avg(TOTAL_REVENUE) as AVG_MI_SPENDING, sum(TOTAL_REVENUE) as TOTAL_MI_SPENDING,
 sum(VOLUME) as MI_COUNT_VOLUME, sum(DAILY) as MI_COUNT_DAILY, sum(WEEKLY) as MI_COUNT_WEEKLY,
 sum(MONTHLY) as MI_COUNT_MONTHLY, sum(ADD_ON) as MI_COUNT_ADD_ON, sum(DAILY_RM3) as COUNT_DAILY_RM3,
 sum(DAILY_RM5) as COUNT_DAILY_RM5, sum(WEEKLY_RM6) as COUNT_WEEKLY_RM6, sum(WEEKLY_RM10) as COUNT_WEEKLY_RM10,
 sum(WEEKLY_RM19) as COUNT_WEEKLY_RM19, sum(MONTHLY_RM30) as COUNT_MONTHLY_RM30, sum(MONTHLY_RM50) as COUNT_MONTHLY_RM50,
 sum(MONTHLY_RM79) as COUNT_MONTHLY_RM79, sum(OTHER_MI) as COUNT_OTHER_MI,
 sum(CASE WHEN ADD_ON = 1 THEN TOTAL_REVENUE ELSE 0 END) AS ADD_ON_REV,
 sum(CASE WHEN ADD_ON = 0 THEN TOTAL_REVENUE ELSE 0 END) AS MAIN_MI_REV
FROM(
      SELECT *, CASE WHEN (ADD_ON=0 and DAILY_RM3=0 and DAILY_RM5=0 and WEEKLY_RM6=0 and WEEKLY_RM10=0
        and WEEKLY_RM19=0 and MONTHLY_RM30=0 and MONTHLY_RM50=0 and MONTHLY_RM79=0) THEN 1
          else 0 END as OTHER_MI FROM
          (SELECT SUB_ARR_ID, TOTAL_REVENUE, DAILY, WEEKLY, MONTHLY, VOLUME, ADD_ON, AVG_REVENUE,
            CASE WHEN (DAILY=1 and AVG_REVENUE=3 and ADD_ON=0) THEN 1
              else 0 END as DAILY_RM3,
            CASE WHEN (DAILY=1 and AVG_REVENUE=5 and ADD_ON=0) THEN 1
              else 0 END as DAILY_RM5,
            CASE WHEN (WEEKLY=1 and AVG_REVENUE=6 and ADD_ON=0) THEN 1
              else 0 END as WEEKLY_RM6,
            CASE WHEN (WEEKLY=1 and AVG_REVENUE=10 and ADD_ON=0) THEN 1
              else 0 END as WEEKLY_RM10,
            CASE WHEN (WEEKLY=1 and AVG_REVENUE=19 and ADD_ON=0) THEN 1
              else 0 END as WEEKLY_RM19,
            CASE WHEN (MONTHLY=1 and AVG_REVENUE=30 and ADD_ON=0) THEN 1
              else 0 END as MONTHLY_RM30,
            CASE WHEN (MONTHLY=1 and AVG_REVENUE=50 and ADD_ON=0) THEN 1
              else 0 END as MONTHLY_RM50,
            CASE WHEN (MONTHLY=1 and AVG_REVENUE=79 and ADD_ON=0) THEN 1
              else 0 END as MONTHLY_RM79
          FROM cav.cav_mi_daily_tbl x INNER JOIN
            clm.MI_PREPAID_PLANS_CELCOM_201711 x2
            on x.ADJ_PROD_DESC = x2.ADJ_PROD_DESC)
  WHERE DATA_DAY > to_date('{current_month_start_2}') and DATA_DAY < to_date('{end_plus_one_day_2}')
    and EXCLUDE_V <> 1
) t
GROUP BY SUB_ARR_ID
) tbl2
on tbl.SUB_ARR_ID = tbl2.SUB_ARR_ID
""".format(**replacement_dict)
spark.sql("DROP TABLE IF EXISTS clm.MS_TMP_{current_month_start_3}_MI_SPENDING".format(**replacement_dict))
spark.sql(sql_MI_SPENDING)

DataFrame[]

DataFrame[]

In [None]:
sql_MI_USAGE_QUOTA = """
CREATE TABLE clm.MS_TMP_{current_month_start_3}_MI_USAGE_QUOTA AS
WITH 
PCRF_USAGE_SUMMARY_IME AS (
    SELECT distinct SUBSCRIBER_ARRANGEMENT_ID, MSISDN,DATA_DATE as DATA_DAY, quota_unit AS ALLOCATED_QUOTA,
    0 AS PURCHASED_QUOTA, quota_unit - balance_unit AS DATA_USED,PRODUCT_ID, to_date(start_date) AS BILL_CYCLE_START_DATE,
    to_date(end_date) AS BILL_CYCLE_END_DATE, service_type AS SUBSCRIBER_SEGMENT_TYPE,
    ROW_NUMBER() OVER (PARTITION BY billing_subscriber_id,start_date,end_date,product_id 
                       ORDER BY DATA_DATE DESC) RN
    FROM clm.LYNX_PRICING_PCRF_WEEKLY
    WHERE service_type='Prepaid'),

PCRF_USAGE_SUMMARY AS (SELECT * FROM PCRF_USAGE_SUMMARY_IME WHERE RN=1)

SELECT threemonth.*, PRORATED_MI_DATA_USED, PRORATED_MI_QUOTA, PRORATED_MI_SPENDING, FRAC_MI_DATA_USED
    FROM (
        SELECT SUBSCRIBER_ARRANGEMENT_ID, 
          sum(DATA_USED) / avg(adj_factor) * 30 as MI_DATA_USED2,
          sum((ALLOCATED_QUOTA + PURCHASED_QUOTA))  / avg(adj_factor) * 30 as MI_QUOTA2,
          sum(MI_PRICE)  / avg(adj_factor) * 30 as MI_SPENDING2,
          avg(CASE WHEN (ALLOCATED_QUOTA + PURCHASED_QUOTA) <> 0 
            THEN DATA_USED / CAST ((ALLOCATED_QUOTA + PURCHASED_QUOTA) as FLOAT)
            ELSE NULL END) as FRAC_MI_DATA_USED2
        FROM(
          SELECT *
          FROM (
            SELECT distinct SUBSCRIBER_ARRANGEMENT_ID, DATA_DAY, PRODUCT_ID, SUBSCRIBER_SEGMENT_TYPE,
            BILL_CYCLE_START_DATE, BILL_CYCLE_END_DATE, DATA_USED, ALLOCATED_QUOTA, PURCHASED_QUOTA 
            FROM PCRF_USAGE_SUMMARY
          ) y
          LEFT JOIN clm.MS_MI_PRODUCT_PRICE_TBL y1 
          ON y.PRODUCT_ID = y1.PRODUCT_ID
          LEFT JOIN (
            SELECT SUBSCRIBER_ARRANGEMENT_ID as TMP_ID,
            sum(CASE WHEN SUBSCRIPTION_TENURE IS NOT NULL THEN 1 END) as adj_factor
            FROM cav.cav_prepaid_daily_tbl
            WHERE 
            to_date(DATA_DAY) between to_date('{minus_two_month_2}') and to_date('{current_month_end_3}')
            GROUP BY SUBSCRIBER_ARRANGEMENT_ID
          ) z
          ON y.SUBSCRIBER_ARRANGEMENT_ID = z.TMP_ID
          WHERE to_date(BILL_CYCLE_START_DATE) between to_date('{minus_two_month_2}') and to_date('{end_plus_one_day_2}') 
          and SUBSCRIBER_SEGMENT_TYPE = 'Prepaid'
        ) tmp
        GROUP BY SUBSCRIBER_ARRANGEMENT_ID
        ) threemonth
        LEFT JOIN
        -- PRORATED MI DATA (MAIN MONTH)
        (
        SELECT SUBSCRIBER_ARRANGEMENT_ID as ID, 
          sum(PRORATED_DATA_USED) as PRORATED_MI_DATA_USED,
          sum(PRORATED_TOTAL_QUOTA) as PRORATED_MI_QUOTA,
          sum(PRORATED_MI_PRICE) as PRORATED_MI_SPENDING,
          avg(FRAC_DATA_USED) as FRAC_MI_DATA_USED
        FROM (
            SELECT *,
              CASE WHEN ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0) >= 0
                and ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0) <= 0
                THEN (DATA_USED * ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0)) / ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(BILL_CYCLE_START_DATE))/3600.0)
                WHEN ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{end_plus_one_day_2}'))) /3600.0) >= 0
                and ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{end_plus_one_day_2}'))) /3600.0) <= 0
                THEN (DATA_USED * (unix_timestamp(to_date('{end_plus_one_day_2}')) - unix_timestamp(BILL_CYCLE_START_DATE) )/3600.0) /  ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(BILL_CYCLE_START_DATE))/3600.0)
                WHEN ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{current_month_start_2}')))/3600.0) >= 0
                and ((unix_timestamp(to_date('{end_plus_one_day_2}')) - unix_timestamp(BILL_CYCLE_END_DATE))/3600.0) >= 0
                THEN DATA_USED ELSE 0 END as PRORATED_DATA_USED,

              CASE WHEN ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0) >= 0
                and ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0) <= 0
                THEN ((ALLOCATED_QUOTA + PURCHASED_QUOTA) * ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0)) / ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(BILL_CYCLE_START_DATE))/3600.0)
                WHEN ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{end_plus_one_day_2}'))) /3600.0) >= 0
                and ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{end_plus_one_day_2}'))) /3600.0) <= 0
                THEN ((ALLOCATED_QUOTA + PURCHASED_QUOTA) * (unix_timestamp(to_date('{end_plus_one_day_2}')) - unix_timestamp(BILL_CYCLE_START_DATE) )/3600.0) /  ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(BILL_CYCLE_START_DATE))/3600.0)
                WHEN ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{current_month_start_2}')))/3600.0) >= 0
                and ((unix_timestamp(to_date('{end_plus_one_day_2}')) - unix_timestamp(BILL_CYCLE_END_DATE))/3600.0) >= 0
                THEN (ALLOCATED_QUOTA + PURCHASED_QUOTA) ELSE 0 END as PRORATED_TOTAL_QUOTA,
                
              CASE WHEN (ALLOCATED_QUOTA + PURCHASED_QUOTA) <> 0 THEN DATA_USED / CAST ((ALLOCATED_QUOTA + PURCHASED_QUOTA) as FLOAT)
                ELSE NULL END as FRAC_DATA_USED,
              CASE WHEN ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0) >= 0
                and ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0) <= 0
                THEN (MI_PRICE * ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{current_month_start_2}'))) /3600.0)) / ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(BILL_CYCLE_START_DATE))/3600.0)
                WHEN ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(to_date('{end_plus_one_day_2}'))) /3600.0) >= 0
                and ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{end_plus_one_day_2}'))) /3600.0) <= 0
                THEN (MI_PRICE * (unix_timestamp(to_date('{end_plus_one_day_2}')) - unix_timestamp(BILL_CYCLE_START_DATE) )/3600.0) /  ((unix_timestamp(BILL_CYCLE_END_DATE) - unix_timestamp(BILL_CYCLE_START_DATE))/3600.0)
                WHEN ((unix_timestamp(BILL_CYCLE_START_DATE) - unix_timestamp(to_date('{current_month_start_2}')))/3600.0) >= 0
                and ((unix_timestamp(to_date('{end_plus_one_day_2}')) - unix_timestamp(BILL_CYCLE_END_DATE))/3600.0) >= 0
                THEN MI_PRICE ELSE 0 END as PRORATED_MI_PRICE
            FROM (SELECT distinct SUBSCRIBER_ARRANGEMENT_ID, DATA_DAY, PRODUCT_ID, SUBSCRIBER_SEGMENT_TYPE,
               BILL_CYCLE_START_DATE, BILL_CYCLE_END_DATE, DATA_USED, ALLOCATED_QUOTA, PURCHASED_QUOTA 
               FROM PCRF_USAGE_SUMMARY) x
              LEFT JOIN clm.MS_MI_PRODUCT_PRICE_TBL x1 
              ON x.PRODUCT_ID = x1.PRODUCT_ID
              INNER JOIN
              (SELECT * 
              FROM clm.MI_PREPAID_PLANS_CELCOM_201711 
              WHERE (DAILY=1 and AVG_REVENUE=3 and ADD_ON=0)
                    OR (DAILY=1 and AVG_REVENUE=5 and ADD_ON=0)
                    OR (WEEKLY=1 and AVG_REVENUE=6 and ADD_ON=0)
                    OR (WEEKLY=1 and AVG_REVENUE=10 and ADD_ON=0)
                    OR (WEEKLY=1 and AVG_REVENUE=19 and ADD_ON=0)
                    OR (MONTHLY=1 and AVG_REVENUE=30 and ADD_ON=0)
                    OR (MONTHLY=1 and AVG_REVENUE=50 and ADD_ON=0)
                    OR (MONTHLY=1 and AVG_REVENUE=79 and ADD_ON=0)) x2
              ON x.PRODUCT_ID = x2.ADJ_CODE_PROD_ID
              
            WHERE to_date(BILL_CYCLE_START_DATE) between to_date('{minus_one_month_2}') and to_date('{end_plus_one_day_2}')
            and SUBSCRIBER_SEGMENT_TYPE = 'Prepaid'
            ) t
        GROUP BY SUBSCRIBER_ARRANGEMENT_ID
    ) onemonth
ON threemonth.SUBSCRIBER_ARRANGEMENT_ID = onemonth.ID
""".format(**replacement_dict)

In [None]:
print(sql_MI_USAGE_QUOTA)


CREATE TABLE clm.MS_TMP_201810_MI_USAGE_QUOTA AS
WITH 
PCRF_USAGE_SUMMARY_IME AS (
    SELECT distinct SUBSCRIBER_ARRANGEMENT_ID, MSISDN,DATA_DATE as DATA_DAY, quota_unit AS ALLOCATED_QUOTA,
    0 AS PURCHASED_QUOTA, quota_unit - balance_unit AS DATA_USED,PRODUCT_ID, to_date(start_date) AS BILL_CYCLE_START_DATE,
    to_date(end_date) AS BILL_CYCLE_END_DATE, service_type AS SUBSCRIBER_SEGMENT_TYPE,
    ROW_NUMBER() OVER (PARTITION BY billing_subscriber_id,start_date,end_date,product_id 
                       ORDER BY DATA_DATE DESC) RN
    FROM clm.LYNX_PRICING_PCRF_WEEKLY
    WHERE service_type='Prepaid'),

PCRF_USAGE_SUMMARY AS (SELECT * FROM PCRF_USAGE_SUMMARY_IME WHERE RN=1)

SELECT threemonth.*, PRORATED_MI_DATA_USED, PRORATED_MI_QUOTA, PRORATED_MI_SPENDING, FRAC_MI_DATA_USED
    FROM (
        SELECT SUBSCRIBER_ARRANGEMENT_ID, 
          sum(DATA_USED) / avg(adj_factor) * 30 as MI_DATA_USED2,
          sum((ALLOCATED_QUOTA + PURCHASED_QUOTA))  / avg(adj_factor) * 30 as MI_QUOTA2

In [None]:
spark.sql("DROP TABLE IF EXISTS clm.MS_TMP_{current_month_start_3}_MI_USAGE_QUOTA".format(**replacement_dict))
spark.sql(sql_MI_USAGE_QUOTA)

DataFrame[]

In [9]:
#spark.sql("SELECT * FROM clm.MS_TMP_201902_MI_USAGE_QUOTA").limit(10).toPandas()

In [None]:
sql_MAIN_MI_TBL = """
CREATE TABLE clm.MS_TMP_{current_month_start_3}_MAIN_MI_TBL AS
SELECT * 
FROM(
  SELECT NVL(SUBSCRIBER_ARRANGEMENT_ID, SUB_ARR_ID) as SUBSCRIBER_ARRANGEMENT_ID,
    MI_DATA_USED2, MI_QUOTA2, MI_SPENDING2, FRAC_MI_DATA_USED2,
    PRORATED_MI_DATA_USED, PRORATED_MI_QUOTA, 
    PRORATED_MI_SPENDING, FRAC_MI_DATA_USED, t2.*,
    CASE WHEN MI_DATA_USED2 is NULL THEN 0
    else 1 END AS MATCH_USAGE,
    CASE WHEN COUNT_DAILY_RM5 is NULL THEN 0
    else 1 END AS MATCH_MI
  FROM clm.MS_TMP_{current_month_start_3}_MI_USAGE_QUOTA t
  FULL JOIN clm.MS_TMP_{current_month_start_3}_MI_SPENDING t2
  ON (t.SUBSCRIBER_ARRANGEMENT_ID = t2.SUB_ARR_ID)  
  ) tbl
WHERE SUBSCRIBER_ARRANGEMENT_ID IN (SELECT SUBSCRIBER_ARRANGEMENT_ID
  FROM cav.cav_prepaid_monthly_tbl
  WHERE 
  DATA_MONTH between {current_month_start_3} and {current_month_start_3}
  AND (LINE_OF_BUSINESS='MOBILE' OR LINE_OF_BUSINESS='UNKNOWN')
  AND DEVICE_TYPE<>'DONGLE')
""".format(**replacement_dict)
spark.sql("DROP TABLE IF EXISTS clm.MS_TMP_{current_month_start_3}_MAIN_MI_TBL".format(**replacement_dict))
spark.sql(sql_MAIN_MI_TBL)

In [None]:
#create a threadhold for MI usage
mi_threshold_query = '''
    with im_table AS 
    (SELECT cume_dist() OVER (ORDER BY MI_RATIO) AS MED_MI_RATIO_PERCENT,
            MI_RATIO
            FROM (
              SELECT NULLIF(PRORATED_MI_DATA_USED, 0)/DATA_VOLUME AS MI_RATIO
              FROM 
              (SELECT SUBSCRIBER_ARRANGEMENT_ID,DATA_VOLUME FROM cav.cav_prepaid_monthly_tbl WHERE DATA_MONTH = '{data_month}') monthly
              INNER JOIN 
              clm.MS_TMP_{data_month}_MI_USAGE_QUOTA mi
              ON monthly.SUBSCRIBER_ARRANGEMENT_ID = mi.SUBSCRIBER_ARRANGEMENT_ID
              WHERE {data_vol_con}) t1)
              
    SELECT min(case when MED_MI_RATIO_PERCENT >= 0.5 then MI_RATIO end) as MED_MI_RATIO
    FROM im_table
          '''
mi_threshold_1_query = mi_threshold_query.format(
      data_vol_con=f'DATA_VOLUME >= {low_data_threshold} AND DATA_VOLUME < {high_data_threshold}',
      data_month=current_month_start.strftime('%Y%m')
  )
mi_threshold_1 = '{:.2f}'.format(spark.sql(mi_threshold_1_query).toPandas().at[0, 'MED_MI_RATIO'])

mi_threshold_2_query = mi_threshold_query.format(
      data_vol_con=f'DATA_VOLUME >= {high_data_threshold}',
      data_month=current_month_start.strftime('%Y%m')
  )
mi_threshold_2 = '{:.2f}'.format(spark.sql(mi_threshold_2_query).toPandas().at[0, 'MED_MI_RATIO'])

replacement_dict['mi_threshold_1'] = mi_threshold_1
replacement_dict['mi_threshold_2'] = mi_threshold_2

In [12]:
replacement_dict

{'minus_one_month': '20181201',
 'minus_one_month_2': '2018-12-01',
 'minus_two_month_1': '20181101',
 'minus_two_month_2': '2018-11-01',
 'current_month_end': '20190131',
 'current_month_end_2': '20190131',
 'current_month_end_3': '2019-01-31',
 'end_plus_one_day_1': '20190201',
 'end_plus_one_day_2': '2019-02-01',
 'current_month_start_1': '20190101',
 'current_month_start_2': '2019-01-01',
 'current_month_start_3': '201901',
 'current_month_start_4': '01',
 'low_data': '0.00',
 'high_data': '11776100.63',
 'call_threshold': '4.00',
 'sms_threshold': '2.00',
 'mi_threshold_1': '94.16',
 'mi_threshold_2': '162.09'}

In [13]:
sql_MAIN_SEGMENT_TBL = """
CREATE TABLE clm.MS_TMP_{current_month_start_3}_MAIN_SEGMENT_TBL_FULL AS
SELECT *,
{low_data} AS LOW_DATA_THRESHOLD,
{high_data} AS HIGH_DATA_THRESHOLD,
{call_threshold} AS CALL_THRESHOLD,
{sms_threshold} AS SMS_THRESHOLD,
  -- IDD call based on 1mth
  -- Remember to create non-active as a segment (No call, no voice, no/low data)n
  CASE WHEN MAIN_SEGMENT = 'Inactive' THEN 'Inactive'
     WHEN INT_USERS = 'INT' THEN 'Int Users'
     WHEN MAIN_SEGMENT = 'Low Activity' THEN 'Low Activity'
     WHEN MAIN_SEGMENT = 'Traditional Users' and CALL_GROUP='High Call' and SMS_GROUP='High SMS' THEN 'High Call High SMS'
     WHEN MAIN_SEGMENT = 'Traditional Users' and CALL_GROUP='No / Low Call' and SMS_GROUP='High SMS' THEN 'Low Call High SMS'
     WHEN MAIN_SEGMENT = 'Traditional Users' and CALL_GROUP='High Call' and SMS_GROUP='No / Low SMS' THEN 'High Call Low SMS'
     WHEN MAIN_SEGMENT = 'Traditional Users' and DATA_GROUP='Low Data' THEN 'Low Call Low SMS Low Data'
     -- Missing info on mi usage has two cases (only add-on spending or mix(but not recorded in usage tbl))
     -- For those with missing info on mi usage use spending to impute (auto categorise in low util)
     WHEN MAIN_SEGMENT = 'Data Users' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) and ADD_ON_REV2=0 and MAIN_MI_REV2=0 Then 'Data Users, No MI, No Add-On'
     WHEN MAIN_SEGMENT = 'Data Users' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) and ADD_ON_REV2>0 and MAIN_MI_REV2=0 Then 'Data Users, No MI, Only Add-On'
     WHEN MAIN_SEGMENT = 'Data Users' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1)and MAIN_MI_REV2 > ADD_ON_REV2 Then 'Data Users, High MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Users' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) and MAIN_MI_REV2 <= ADD_ON_REV2 Then 'Data Users, Low MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Users' and FRAC_DATA_VOLUME_MI2=0 and ADD_ON_REV2=0 and MAIN_MI_REV2=0 Then 'Data Users, No MI, No Add On'
     WHEN MAIN_SEGMENT = 'Data Users' and FRAC_DATA_VOLUME_MI2=0 and ADD_ON_REV2>0 and MAIN_MI_REV2=0 Then 'Data Users, No MI, Only Add-On'
     WHEN MAIN_SEGMENT = 'Data Users' and FRAC_DATA_VOLUME_MI2>=0 and FRAC_DATA_VOLUME_MI2<={mi_threshold_1} and FRAC_MI_DATA_USED2 <=0.5 Then 'Data Users, Low MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Users' and FRAC_DATA_VOLUME_MI2>=0 and FRAC_DATA_VOLUME_MI2<={mi_threshold_1} and FRAC_MI_DATA_USED2 >0.5 Then 'Data Users, Low MI (High Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Users' and FRAC_DATA_VOLUME_MI2>{mi_threshold_1} and FRAC_MI_DATA_USED2 <=0.5  Then 'Data Users, High MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Users' and FRAC_DATA_VOLUME_MI2>{mi_threshold_1} and FRAC_MI_DATA_USED2 >0.5  Then 'Data Users, High MI (High Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Addicts' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) and ADD_ON_REV2=0 and MAIN_MI_REV2=0 Then 'Data Addicts, No MI, No Add-On'
     WHEN MAIN_SEGMENT = 'Data Addicts' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) and ADD_ON_REV2>0 and MAIN_MI_REV2=0 Then 'Data Addicts, No MI, Only Add-On'
     WHEN MAIN_SEGMENT = 'Data Addicts' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) and MAIN_MI_REV2 > ADD_ON_REV2 Then 'Data Addicts, High MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Addicts' and (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) and MAIN_MI_REV2 <= ADD_ON_REV2 Then 'Data Addicts, Low MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Addicts' and FRAC_DATA_VOLUME_MI2=0 and ADD_ON_REV2=0 and MAIN_MI_REV2=0 Then 'Data Addicts, No MI, No Add-On'
     WHEN MAIN_SEGMENT = 'Data Addicts' and FRAC_DATA_VOLUME_MI2=0 and ADD_ON_REV2>0 and MAIN_MI_REV2=0 Then 'Data Addicts, No MI, Only Add-On'
     WHEN MAIN_SEGMENT = 'Data Addicts' and FRAC_DATA_VOLUME_MI2>=0 and FRAC_DATA_VOLUME_MI2<={mi_threshold_2} and FRAC_MI_DATA_USED2 <=0.5 Then 'Data Addicts, Low MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Addicts' and FRAC_DATA_VOLUME_MI2>=0 and FRAC_DATA_VOLUME_MI2<={mi_threshold_2} and FRAC_MI_DATA_USED2 >0.5 Then 'Data Addicts, Low MI (High Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Addicts' and FRAC_DATA_VOLUME_MI2>{mi_threshold_2} and FRAC_MI_DATA_USED2 <=0.5  Then 'Data Addicts, High MI (Low Utilisation)'
     WHEN MAIN_SEGMENT = 'Data Addicts' and FRAC_DATA_VOLUME_MI2>{mi_threshold_2} and FRAC_MI_DATA_USED2 >0.5  Then 'Data Addicts, High MI (High Utilisation)'
     END AS DETAILED_SEGMENT,
     CASE WHEN MAIN_SEGMENT = 'Data Users' and FRAC_DATA_VOLUME_MI=0 and MI_COUNT_ADD_ON=0 THEN 1
     ELSE 0 END AS TEST
    FROM(
      SELECT *,CASE WHEN (SUBSCRIPTION_TENURE <= 2) and SWX_ACTIVITY <= 0.1 THEN 'Inactive'
             WHEN INT_USERS = 'INT' THEN 'Int Users'
             WHEN (DATA_GROUP = 'No Data') and CALL_GROUP = 'No / Low Call' and SMS_GROUP = 'No / Low SMS' THEN 'Low Activity'
             WHEN (DATA_GROUP = 'No Data' or DATA_GROUP = 'Low Data') THEN 'Traditional Users'
             WHEN DATA_GROUP = 'Mid Data' THEN 'Data Users'
             WHEN DATA_GROUP = 'High Data' THEN 'Data Addicts'
             END AS MAIN_SEGMENT
      FROM (SELECT *, CASE WHEN (NUM_OUT_CALL_IDD > 30) THEN 'INT' ElSE 'NOT INT' END AS INT_USERS,
      CASE WHEN SUBSCRIPTION_TENURE <= 3 THEN 'LOS_<=3M' ELSE 'LOS_>3M' END AS LOS_GROUP,
        CASE WHEN (DEVICE_TYPE = 'BASIC PHONE') or (DEVICE_TYPE = 'FEATURE PHONE') THEN 'NON-SMARTPHONE'
           WHEN (DEVICE_TYPE = 'TABLETS') THEN 'TABLETS'
           ELSE 'SMARTPHONE' END AS DEVICE_TYPE_GROUP,
        CASE WHEN AGE < 25 THEN 'YOUTH'
           WHEN AGE >= 25 AND AGE < 45 THEN 'MATURE I'
           WHEN AGE >= 45 AND AGE <= 55 THEN 'MATURE II'
           WHEN AGE >= 55 THEN 'MATURE III' END AS AGE_GROUP,
        CASE WHEN (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) THEN 0
           WHEN (DATA_VOLUME2 >0 ) THEN NVL(PRORATED_MI_DATA_USED / CAST(DATA_VOLUME2 as FLOAT), 0)
            ELSE 0 END as FRAC_DATA_VOLUME_MI,
        CASE WHEN (DATA_VOLUME2 >0 and MATCH_USAGE=0 and MATCH_MI=1) THEN 0
          WHEN (DATA_VOLUME2 >0 ) THEN NVL(MI_DATA_USED2 / CAST(DATA_VOLUME2 as FLOAT), 0)
            ELSE 0 END as FRAC_DATA_VOLUME_MI2,
        CASE WHEN (DATA_VOLUME2 < {low_data}) and (DATA_VOLUME2 > 0) THEN 'Low Data'
            WHEN (DATA_VOLUME2 < {high_data}) and (DATA_VOLUME2 > {low_data}) THEN 'Mid Data'
            WHEN (DATA_VOLUME2 >= {high_data}) THEN 'High Data'
             ELSE 'No Data' END AS DATA_GROUP,
        CASE 
            WHEN (NUM_OUT_CALL2 > {call_threshold}) THEN 'High Call'
             ELSE 'No / Low Call' END AS CALL_GROUP,
        CASE 
            WHEN (NUM_OUT_SMS2 > {sms_threshold}) THEN 'High SMS'
             ELSE 'No / Low SMS' END AS SMS_GROUP,
        CASE WHEN (DEVICE_TYPE = 'SMART PHONE') THEN 1
             ELSE 0 END AS SMART_PHONE_IND
         FROM 
          (SELECT t.*,MOBILE_NO MSISDN, DATA_VOLUME2, NUM_OUT_SMS2, NUM_OUT_CALL2, NUM_OUT_CALL_IDD2,
            NVL(PRORATED_MI_DATA_USED, 0) as PRORATED_MI_DATA_USED, NVL(MI_DATA_USED2, 0) as MI_DATA_USED2,
            NVL(PRORATED_MI_QUOTA, 0) as PRORATED_MI_QUOTA, NVL(MI_QUOTA2, 0) as MI_QUOTA2,
            NVL(PRORATED_MI_SPENDING, 0) as PRORATED_MI_SPENDING, NVL(MI_SPENDING2, 0) as MI_SPENDING2,
            NVL(FRAC_MI_DATA_USED, 0) as FRAC_MI_DATA_USED, NVL(FRAC_MI_DATA_USED2, 0) as FRAC_MI_DATA_USED2,
            NVL(TOTAL_MI_SPENDING, 0) as TOTAL_MI_SPENDING,
            NVL(MI_COUNT_VOLUME, 0) as MI_COUNT_VOLUME,
            NVL(MI_COUNT_DAILY, 0) as MI_COUNT_DAILY,
            NVL(MI_COUNT_WEEKLY, 0) as MI_COUNT_WEEKLY,
            NVL(MI_COUNT_MONTHLY, 0) as MI_COUNT_MONTHLY,
            NVL(MI_COUNT_ADD_ON, 0) as MI_COUNT_ADD_ON,
            NVL(MAIN_MI_REV, 0) as MAIN_MI_REV, NVL(MAIN_MI_REV2, 0) as MAIN_MI_REV2,
            NVL(ADD_ON_REV, 0) as ADD_ON_REV, NVL(ADD_ON_REV2, 0) as ADD_ON_REV2,
            NVL(CAMPAIGN_TAKERS, 0) as CAMPAIGN_TAKERS,
            ((length(SWX_SMS) - length(regexp_replace(SWX_SMS, '1', '')))/31.0) +
            ((length(SWX_DATA) - length(regexp_replace(SWX_DATA, '1', '')))/31.0) +
            ((length(SWX_VOICE) - length(regexp_replace(SWX_VOICE, '1', '')))/31.0) as SWX_ACTIVITY,
            MATCH_USAGE, MATCH_MI
            FROM cav.cav_prepaid_monthly_tbl t
            LEFT JOIN (SELECT SUBSCRIBER_ARRANGEMENT_ID,
                  sum(DATA_VOLUME_0000_0100 + DATA_VOLUME_0100_0200 + DATA_VOLUME_0200_0300 + DATA_VOLUME_0300_0400 + DATA_VOLUME_0400_0500 +
                    DATA_VOLUME_0600_0700 + DATA_VOLUME_0700_0800 + DATA_VOLUME_0800_0900 + DATA_VOLUME_0900_1000 + DATA_VOLUME_1000_1100 +
                    DATA_VOLUME_1100_1200 + DATA_VOLUME_1200_1300 + DATA_VOLUME_1300_1400 + DATA_VOLUME_1400_1500 + DATA_VOLUME_1500_1600 +
                    DATA_VOLUME_1600_1700 + DATA_VOLUME_1700_1800 + DATA_VOLUME_1800_1900 + DATA_VOLUME_1900_2000 + DATA_VOLUME_2000_2100 +
                    DATA_VOLUME_2100_2200 + DATA_VOLUME_2200_2300 + DATA_VOLUME_2300_2359)/sum(CASE WHEN SUBSCRIPTION_TENURE IS NOT NULL THEN 1 END)*30 as DATA_VOLUME2,
                  sum(NUM_OUT_CALL)/sum(CASE WHEN SUBSCRIPTION_TENURE IS NOT NULL THEN 1 END)*30  as NUM_OUT_CALL2, sum(NUM_OUT_SMS)/sum(CASE WHEN SUBSCRIPTION_TENURE IS NOT NULL THEN 1 END)*30 as NUM_OUT_SMS2,
                  sum(NUM_OUT_CALL_IDD)/sum(CASE WHEN SUBSCRIPTION_TENURE IS NOT NULL THEN 1 END)*30  as NUM_OUT_CALL_IDD2
                  FROM cav.cav_prepaid_daily_tbl
                  WHERE 
                  day_key between '{minus_two_month_1}' and '{current_month_end}'
                  GROUP BY SUBSCRIBER_ARRANGEMENT_ID
                  ) t3m_avg
              on t.SUBSCRIBER_ARRANGEMENT_ID = t3m_avg.SUBSCRIBER_ARRANGEMENT_ID
              LEFT JOIN clm.MS_TMP_{current_month_start_3}_MAIN_MI_TBL t2
              on t.SUBSCRIBER_ARRANGEMENT_ID = t2.SUBSCRIBER_ARRANGEMENT_ID
              LEFT JOIN (
                  --Change the script to include Mviva instead of NBA
                  SELECT distinct SUBSCRIBER_ARRANGEMENT_ID, 1 as CAMPAIGN_TAKERS
                      FROM(
                        SELECT MSISDN, substr(contact_date,1,6) as t_month
                        FROM cav.sor_campaign_target_target_tbl
                        WHERE substr(contact_date,1,6)={current_month_start_4} and SUB_TYPE NOT LIKE '%OTT%' and SUB_TYPE NOT LIKE '%RELOAD%' and CAMPAIGN_NAME NOT LIKE '%RELOAD%'
                      ) c1 
                      INNER JOIN
                      (
                        SELECT subscriber_arrangement_id,mobile_no msisdn
                        FROM cav.cav_prepaid_monthly_tbl
                        WHERE MONTH_KEY = '{current_month_start_3}') c2
                      ON c1.MSISDN = c2.MSISDN
                      ) t3
                  on t.SUBSCRIBER_ARRANGEMENT_ID = t3.SUBSCRIBER_ARRANGEMENT_ID
              WHERE MONTH_KEY = '{current_month_start_3}'
              AND LINE_OF_BUSINESS IN ('UNKNOWN','MOBILE')
              AND DEVICE_TYPE<>'DONGLE'
              AND (SUBSCRIPTION_STATUS='ACTIVE' OR SUBSCRIPTION_STATUS='SUSPEND' OR SUBSCRIPTION_STATUS='ACTIVE_ACTIVE'))tb_im)
) tbl
""".format(**replacement_dict)

In [14]:
print(sql_MAIN_SEGMENT_TBL)


CREATE TABLE clm.MS_TMP_201901_MAIN_SEGMENT_TBL_FULL AS
SELECT *,
0.00 AS LOW_DATA_THRESHOLD,
11776100.63 AS HIGH_DATA_THRESHOLD,
4.00 AS CALL_THRESHOLD,
2.00 AS SMS_THRESHOLD,
  -- IDD call based on 1mth
  -- Remember to create non-active as a segment (No call, no voice, no/low data)n
  CASE WHEN MAIN_SEGMENT = 'Inactive' THEN 'Inactive'
     WHEN INT_USERS = 'INT' THEN 'Int Users'
     WHEN MAIN_SEGMENT = 'Low Activity' THEN 'Low Activity'
     WHEN MAIN_SEGMENT = 'Traditional Users' and CALL_GROUP='High Call' and SMS_GROUP='High SMS' THEN 'High Call High SMS'
     WHEN MAIN_SEGMENT = 'Traditional Users' and CALL_GROUP='No / Low Call' and SMS_GROUP='High SMS' THEN 'Low Call High SMS'
     WHEN MAIN_SEGMENT = 'Traditional Users' and CALL_GROUP='High Call' and SMS_GROUP='No / Low SMS' THEN 'High Call Low SMS'
     WHEN MAIN_SEGMENT = 'Traditional Users' and DATA_GROUP='Low Data' THEN 'Low Call Low SMS Low Data'
     -- Missing info on mi usage has two cases (only add-on spending or mi

In [15]:
spark.sql("DROP TABLE IF EXISTS clm.MS_TMP_{current_month_start_3}_MAIN_SEGMENT_TBL_FULL".format(**replacement_dict))
spark.sql(sql_MAIN_SEGMENT_TBL)

DataFrame[]

DataFrame[]

In [16]:
#spark.sql("DROP TABLE IF EXISTS clm.PREPAID_MICROSEGMENTATION")

In [19]:
##Need to change to insert when finish run it once
sql_FINAL_PUSH = """
SELECT SUBSCRIBER_ARRANGEMENT_ID, MSISDN,
MAIN_SEGMENT, DETAILED_SEGMENT, DATA_MONTH,FRAC_MI_DATA_USED2 AS FRAC_DATA_USED_3_MTH,
CASE WHEN DETAILED_SEGMENT LIKE '%MI%' THEN 1 ELSE 0 END AS MI_USER_FL
FROM clm.MS_TMP_{current_month_start_3}_MAIN_SEGMENT_TBL_FULL
""".format(**replacement_dict)
spark.sql(sql_FINAL_PUSH)
table_exist = str.lower('PREPAID_MICROSEGMENTATION') in sqlCtx.tableNames('clm')
if table_exist:
    #insert into
    sql_FINAL_PUSH = "INSERT INTO clm.PREPAID_MICROSEGMENTATION "+sql_FINAL_PUSH
    spark.sql(sql_FINAL_PUSH.format(**replacement_dict))
else:
    sql_FINAL_PUSH = "CREATE TABLE clm.PREPAID_MICROSEGMENTATION AS "+sql_FINAL_PUSH
    spark.sql(sql_FINAL_PUSH.format(**replacement_dict))

DataFrame[SUBSCRIBER_ARRANGEMENT_ID: string, MSISDN: string, MAIN_SEGMENT: string, DETAILED_SEGMENT: string, DATA_MONTH: string, FRAC_DATA_USED_3_MTH: double, MI_USER_FL: int]

DataFrame[]

In [20]:
elapsed_time = time.time() - start_time
time.strftime("%H:%M:%S", time.gmtime(elapsed_time))

'07:24:03'

Test

In [21]:
spark.sql("SELECT * FROM clm.PREPAID_MICROSEGMENTATION").limit(10).toPandas()

Unnamed: 0,SUBSCRIBER_ARRANGEMENT_ID,MSISDN,MAIN_SEGMENT,DETAILED_SEGMENT,DATA_MONTH,FRAC_DATA_USED_3_MTH,MI_USER_FL
0,100040000022665640,136726353,Inactive,Inactive,201902,0.0,0
1,100140000025531485,195568272,Data Addicts,"Data Addicts, No MI, No Add-On",201902,0.0,1
2,100170000018193552,194609504,Data Users,"Data Users, No MI, No Add On",201902,0.0,1
3,100190000019928659,196072034,Data Addicts,"Data Addicts, High MI (High Utilisation)",201902,0.854979,1
4,100220000025577397,137370927,Data Addicts,"Data Addicts, No MI, No Add-On",201902,0.0,1
5,100270000018592416,132782818,Data Users,"Data Users, High MI (Low Utilisation)",201902,0.0077,1
6,100290000018302236,195682769,Data Users,"Data Users, Low MI (Low Utilisation)",201902,0.230035,1
7,100330000019863302,133540680,Data Addicts,"Data Addicts, High MI (High Utilisation)",201902,0.893752,1
8,100350000019896178,192853578,Low Activity,Low Activity,201902,0.0,0
9,100370000019255726,199734076,Data Users,"Data Users, No MI, No Add On",201902,0.0,1


In [26]:
spark.sql("SELECT * FROM clm.PREPAID_MICROSEGMENTATION").groupby('DATA_MONTH').count().toPandas()

Unnamed: 0,DATA_MONTH,count
0,201902,4574973
1,201901,4566641


In [23]:
spark.sql("SELECT * FROM clm.MS_TMP_201902_MI_USAGE_QUOTA").count()

3246762

In [24]:
spark.sql("SELECT * FROM clm.MS_TMP_201902_MI_SPENDING").count()

4123064

In [27]:
spark.stop()

Done