In [0]:

import datetime
from pyspark.sql import types as T
from pyspark.sql import functions as F

from aadatapipelinecore.core.urn import Urn
from aadatapipelinecore.core.pipeline import type_
from applications.common.parser import SqlParser
from applications.common.executor import SqlExecutor
from applications.auto_pipeline.transform import _view
from aadatapipelinecore.core.utils.spark import eject_all_caches
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

spark.sparkContext.addPyFile("/home/hadoop/bdp/application/libs/python/dependencies.zip")
import aaplproxy

"""
get date:  [ [month, [days]], [month, [days]], [month, [days]], ....... ]
"""
def get_date_list(start_date, end_date, freq="D"):
    import pandas as pd
    """
    freq:   D: calendar day frequency
            M: month end frequency
            MS: month start frequency
            A, Y: year end frequency
            AS, YS: year start frequency
    """
    date_list = [x.strftime('%Y-%m-%d') for x in list(pd.date_range(start=start_date, end=end_date, freq=freq))]
    return date_list

# start = "2010-07-31"  # prod
start = "2019-01-31"    # test
end = "2019-11-30"

monthly = get_date_list(start, end, freq='M')
print monthly


date_list = []
for m in monthly:
    d = get_date_list(m[:8]+'01', m, freq='D')  # start = the first day of each month; end = each month
    month_and_day = [m, d]
    date_list.append(month_and_day)

print date_list



class DryRunSqlExecutor(SqlExecutor):
    def _verify_tasks(self):
        pass


def run(spark, raw_data, sql_text, dry_run=True):
    urn = Urn(namespace=raw_data["namespace"])
    source_data_list = raw_data.pop("source")
    raw_data.update(raw_data.pop("options"))
    _view(spark, sql_text, None, source_data_list)
    context = raw_data
    tasks = SqlParser(spark, sql_text, context).parse()
    if dry_run:
        sql_executor = DryRunSqlExecutor
    else:
        sql_executor = SqlExecutor
    sql_executor(urn, spark, tasks, type_.EventType.TRANSFORM, context).run()

# CSV schema
from pyspark.sql import types as T
from pyspark.sql import functions as F

csv_schema = T.StructType(
    [
        T.StructField("store_id", T.IntegerType(), True),
        T.StructField("date", T.DateType(), True),
        T.StructField("platform_id", T.IntegerType(), True),
        T.StructField("vertical", T.IntegerType(), True),
        T.StructField("feed", T.IntegerType(), True),
        T.StructField("id", T.LongType(), True),
        T.StructField("est", T.IntegerType(), True),
        T.StructField("category_id", T.IntegerType(), True),
        T.StructField("rank", T.IntegerType(), True)
    ]
)

def test_monthkly_data(test_data):
    print test_data[0]
    print test_data[1]
    
    month_indicator = test_data[0]
    ### 1. only csv, but date range is '2010-07-04' to '2010-07-31' ###
    if month_indicator == '2010-07-31':
        temp_date_range = get_date_list('2010-07-04', '2010-07-31', freq='D')
        df_1 = spark.read.option("basePath","s3://b2c-prod-dca-store-estimates/store_est/v_final/DAY/").schema(csv_schema).csv( "s3://b2c-prod-dca-store-estimates/store_est/v_final/DAY/{%s}/ios/sbe_est_app/*/"%",".join(temp_date_range), sep="\t").withColumn("platform", F.lit("ios")).select('id', 'store_id', 'category_id', 'platform_id', 'vertical', 'rank', 'feed', 'est', 'date', 'platform').cache()
        

    ### 2. only csv
    elif month_indicator > '2010-08-01' and month_indicator < '2019-07-01':
        df_1 = spark.read.option("basePath","s3://b2c-prod-dca-store-estimates/store_est/v_final/DAY/").schema(csv_schema).csv("s3://b2c-prod-dca-store-estimates/store_est/v_final/DAY/{%s}/ios/sbe_est_app/*/"%",".join(test_data[1]), sep="\t").withColumn("platform", F.lit("ios")).select('id', 'store_id', 'category_id', 'platform_id', 'vertical', 'rank', 'feed', 'est', 'date', 'platform').cache()
        

    ### 3. half is csv, half is parquet ###
    elif month_indicator == '2019-07-31':
        # First half of 2019-07
        temp_date_range = get_date_list('2019-07-01', '2019-07-14')
        first_half_month_df = spark.read.option("basePath", "s3://b2c-prod-dca-store-estimates/store_est/v_final/DAY/").schema(csv_schema).csv("s3://b2c-prod-dca-store-estimates/store_est/v_final/DAY/{%s}/ios/sbe_est_app/*/"%",".join(temp_date_range), sep="\t").withColumn("platform", F.lit("ios")).select('id', 'store_id', 'category_id', 'platform_id', 'vertical', 'rank', 'feed', 'est', 'date', 'platform').cache()
        # Second half of 2019-07
        temp_date_range = get_date_list('2019-07-15', '2019-07-31')
        second_half_month = spark.read.option("basePath","s3://b2c-prod-dca-store-estimates/store_estv2/APP_ESTIMATES_FINAL/version=2.0.0/range_type=DAY/").parquet("s3://b2c-prod-dca-store-estimates/store_estv2/APP_ESTIMATES_FINAL/version=2.0.0/range_type=DAY/date={%s}/platform=*/*/" %  ",".join(temp_date_range)).select('id', 'store_id', 'category_id', 'platform_id', 'vertical', 'rank', 'feed', 'est', 'date', 'platform').cache()
        df_1 = first_half_month_df.union(second_half_month)
        

    ### 4. only parquet ###
    else:  # month_indicator >= '2019-08-31'
        df_1 = spark.read.option("basePath","s3://b2c-prod-dca-store-estimates/store_estv2/APP_ESTIMATES_FINAL/version=2.0.0/range_type=DAY/").parquet("s3://b2c-prod-dca-store-estimates/store_estv2/APP_ESTIMATES_FINAL/version=2.0.0/range_type=DAY/date={%s}/platform=*/*/" %  ",".join(test_data[1])) .cache()

    df_1.createOrReplaceTempView("daily_data")
    
    weekly_df_ho = spark.read.format("delta").load("s3://b2c-prod-data-pipeline-unified-store-paid/unified/store.app-est.v3/fact/").where("granularity='monthly' and date='{}' and data_stage='final'".format(test_data[0])).cache()
    weekly_df_ho.createOrReplaceTempView("unified_monthly")

    sql_text = """
    
    WITH filter_top_N_raw_data AS(
    SELECT
     distinct
      id,
      Sum(est) AS est,
      store_id,
      platform_id,
      feed,
      vertical,
      platform
    FROM
      (
        SELECT
          DISTINCT d1.id,
          d1.est,
          d1.store_id,
          d1.date,
          d1.feed,
          d1.vertical,
          d1.platform_id,
          d1.platform
        FROM
          daily_data AS d1
          JOIN daily_data AS d2 
          ON d1.id = d2.id
          AND d1.store_id = d2.store_id
          AND d1.feed = d2.feed
          AND d1.vertical = d2.vertical
          AND d1.platform_id = d2.platform_id
        WHERE (d1.rank <= 4000 and d2.rank<=4000 and d1.store_id == 0 and d1.platform = 'ios' ) 
            OR (d1.rank <= 1000 and d2.rank<=1000 and  d1.store_id != 0 and d1.platform = 'ios' )
            OR  (d1.rank <= 4000 and d2.rank<=4000 and d1.store_id == 1000 and d1.platform = 'android' ) 
            OR (d1.rank <= 1000 and d2.rank<=1000 and  d1.store_id != 1000 and d1.platform = 'android' )
      ) AS t
    WHERE
      feed IN (
        0,
        1,
        2,
        101,
        100,
        102
      )
    GROUP BY
      id,
      store_id,
      platform_id,
      vertical,
      feed,
      platform);
      
     WITH replace_metric AS (
     SELECT * ,
         case 
        when feed='0' and platform='ios' then 'free_app_download'
        when feed='1' and platform='ios' then 'paid_app_download'
        when feed='2' and platform='ios' then 'revenue' 
        when feed='101' and platform='ios' then 'free_app_download' 
        when feed='100' and platform='ios' then 'paid_app_download' 
        when feed='102' and platform='ios' then 'revenue' 
        when feed='0' and platform='android' then 'free_app_download' 
        when feed='1' and platform='android' then 'paid_app_download' 
        when feed='2' and platform='android' then 'revenue' 
        end as metric from filter_top_N_raw_data);
      
      
         WITH replace_metric_device_code AS (
        SELECT * ,
         case 
        when feed='0' and platform='ios' then 'ios-phone'
        when feed='1' and platform='ios' then 'ios-phone'
        when feed='2' and platform='ios' then 'ios-phone' 
        when feed='101' and platform='ios' then 'ios-tablet' 
        when feed='100' and platform='ios' then 'ios-tablet' 
        when feed='102' and platform='ios' then 'ios-tablet' 
        when feed='0' and platform='android' then 'android-all' 
        when feed='1' and platform='android' then 'android-all' 
        when feed='2' and platform='android' then 'android-all' 
        end as device_code from replace_metric);


    WITH group_by_metric_1 AS (
        SELECT max(est) as est, id, metric,device_code, store_id, platform from replace_metric_device_code where store_id not in (3,4,5,6) and device_code in ('ios-phone' ,'ios-tablet' ) and feed not in (1000, 1001, 1002) group by id, store_id, metric,device_code, platform
        );
        
    WITH group_by_metric_2 AS (
        SELECT max(est) as est, id, metric,device_code, store_id, platform from replace_metric_device_code where store_id not in ( 1003, 1005, 1006,1007) and device_code='android-all' and feed not in (1000, 1001, 1002) group by id, store_id, metric,device_code, platform
    );
    
    WITH group_by_metric AS(
        SELECT * FROM group_by_metric_1
        UNION ALL
        SELECT * FROM group_by_metric_2
        );

      -- pivot metric column
    WITH pivot_metric_raw AS (

    SELECT 
        distinct id as app_id, store_id, platform, device_code, free_app_download,revenue, paid_app_download
    FROM
          group_by_metric
     PIVOT (
        max(est) 
    	FOR metric IN ('free_app_download','revenue', 'paid_app_download')
      )
    );
    
    
    -- union all platform with country_code mapping

    WITH country_code_mapping AS (
    select *, 'android' as market_code from android_country_mapping 
    UNION ALL select *, 'ios' market_code from ios_country_mapping
    UNION ALL select 143502, 'VE', 'VESA', 'ios'
    UNION ALL select 0, 'WW', 'worldwide', 'ios'
    UNION ALL select 36, 'CZ', 'CZ', 'android'
    UNION ALL select 5, 'ES', 'ES', 'android'

    );



    -- map raw with country_code

    WITH country_category_mapping_raw AS (
    select app_id, country_code, device_code, free_app_download, paid_app_download, revenue 
     from country_code_mapping 
     inner join 
         pivot_metric_raw 
     on 
         country_code_mapping.store_id=pivot_metric_raw.store_id 
     and 
         country_code_mapping.market_code=pivot_metric_raw.platform
    where country_name!='Global'
    );


      """
      
    diff_df1 = spark.sql("select * from country_category_mapping_raw except all select app_id, country_code, device_code, free_app_download, paid_app_download, revenue from unified_monthly ")
    diff_df2 = spark.sql("select app_id, country_code, device_code, free_app_download, paid_app_download, revenue from unified_monthly  except all select * from country_category_mapping_raw")

    diff_df1.show()
    diff_df2.show() 

    
    # store_unified , rank_unified
    namespace = "aa.store.market-size.v1"
    ingest_msg = {
        "namespace": "aa.store.market-size.v1",
        "job_type": "routine",
        "options": {},
        "source": [
            {
            "data_encoding": "csv",
            "compression": "gzip",
            "name":"ios_country_mapping",
            "data_schema": [
                            {"name":"store_id","type":"int","nullable": False},
                            {"name":"country_code","type":"string","nullable": False},
                            {"name":"country_name","type":"string","nullable": False}
                            ],
             "csv_options": {
          'header': True,
          'sep': '\t',
          'quote': '',
          'encoding': 'utf-8',
          'escape': ''
          },

            "path": ["s3://b2c-prod-dca-store-estimates/store_back/dimension/IOS_COUNTRY_MAPPING"],
        },   
        {
            "data_encoding": "csv",
            "compression": "gzip",
            "name":"android_country_mapping",
            "data_schema": [
                            {"name":"store_id","type":"int","nullable": False},
                            {"name":"country_code","type":"string","nullable": False},
                            {"name":"country_name","type":"string","nullable": False}
                            ],
             "csv_options": {
          'header': True,
          'sep': '\t',
          'quote': '',
          'encoding': 'utf-8',
          'escape': ''
          },

            "path": ["s3://b2c-prod-dca-store-estimates/store_back/dimension/ANDROID_COUNTRY_MAPPING"],
        }
        ]
    }
    
    run(spark, ingest_msg, sql_text)
    eject_all_caches(spark)


sc.parallelize(map(test_monthkly_data, date_list), 1)
    
