In [None]:
# %pyspark
# Import libraries
spark.version
from pyspark.sql.types import *
from pyspark import SQLContext
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import Window, Row

# %pyspark
# Read in raw POS weekly data and aggregate across channels since POS channels are not accurate and should not be used
pos_no_channel = spark.sql("select week_ending_date, retailer, state, mdlz_business, mdlz_category, mdlz_brand, mdlz_ppg, sum(pos_qty) as pos_qty, sum(pos_dollar) as pos_dollar \
from d4sa_us_disc.bluesky_pos_data \
group by week_ending_date, retailer, state, mdlz_business, mdlz_category, mdlz_brand, mdlz_ppg")

pos_no_channel = pos_no_channel.withColumn("week_of_year", F.weekofyear(F.col("week_ending_date")))
pos_no_channel = pos_no_channel.withColumn("year", F.year(F.col("week_ending_date")))
pos_no_channel = pos_no_channel.withColumn("pos_qty", F.col("pos_qty").cast('double'))
pos_no_channel = pos_no_channel.withColumn("pos_dollar", F.col("pos_dollar").cast('double'))

print(pos_no_channel.select("retailer","state","mdlz_business","mdlz_category","mdlz_brand","mdlz_ppg").distinct().count())

# Write out for POS_2_Modeling script in Python
pos_no_channel.createOrReplaceTempView("pos_no_channel")
spark.sql("drop table if exists default.cbda_pos_model_input")
spark.sql("create table default.cbda_pos_model_input as select * from pos_no_channel")

z.show(pos_no_channel)

# %pyspark
# Read in projections data - Please upload the latest projections_state.csv file from Box (https://ibm.box.com/s/d4wu2qw32fzzm3xdgkyon7ynpiqo89p6)
projections = spark.read.csv('/user/bwn2456/CBDA/projections_state_0503.csv', header = True, inferSchema = True)

# Read states name to state abbreviation mapping file. Example: "Alaska" -> "AK". This is a static file and does not need to be updated.
states = spark.read.csv('/user/bwn2456/CBDA/states.csv', header = True, inferSchema = True).withColumnRenamed('state',"Geography")

# %pyspark
# Business Filters
print(pos_no_channel.select("retailer","state","mdlz_business","mdlz_category","mdlz_brand","mdlz_ppg").distinct().count())

pos_no_channel = pos_no_channel.filter(col("week_ending_date") > lit('2019-01-01'))
pos_no_channel = pos_no_channel.filter("mdlz_category NOT IN ('None','Cookie','Display PRD')")
pos_no_channel = pos_no_channel.filter("NOT(mdlz_ppg == '' OR mdlz_ppg IS NULL)")
pos_no_channel = pos_no_channel.filter("NOT(mdlz_business == '' AND mdlz_category == '' AND mdlz_brand == '' AND mdlz_ppg != '')")
pos_no_channel = pos_no_channel.filter("pos_dollar > 0 AND pos_qty > 0")
pos_no_channel = pos_no_channel.filter("NOT(pos_dollar IS NULL AND pos_qty IS NULL)")

print(pos_no_channel.select("retailer","state","mdlz_business","mdlz_category","mdlz_brand","mdlz_ppg").distinct().count())

# %pyspark
# Create POS 2020 (This Year) and POS 2019 (Last Year) columns into a single dataframe for visualization in Tableau
pos_2020 = pos_no_channel.filter(pos_no_channel['year']==2020)
pos_2019 = pos_no_channel.filter(pos_no_channel['year']==2019)

pos_2019 = pos_2019.drop('week_ending_date')
pos_2019 = pos_2019.drop('year')
pos_2020 = pos_2020.drop('year')

pos_2019 = pos_2019.withColumnRenamed("pos_dollar", "pos_dollar_ly")
pos_2019 = pos_2019.withColumnRenamed("pos_qty", "pos_qty_ly")
pos_2020 = pos_2020.withColumnRenamed("pos_dollar", "pos_dollar_ty")
pos_2020 = pos_2020.withColumnRenamed("pos_qty", "pos_qty_ty")

pos_updated = pos_2020.join(pos_2019, on = ['retailer', 'state', 'mdlz_business', 'mdlz_category', 'mdlz_brand', 'mdlz_ppg', 'week_of_year'], how = 'left')

# Transform projections to create state milestone data markers in Tableau
projections = projections.select('Geography', 'end_day', 'peak_deaths_date', 'peak_affected_date')
projections = projections.dropDuplicates()

projections = projections.withColumn('end_day', F.split(projections['end_day'], ' ').getItem(0))
projections = projections.withColumn('peak_deaths_date', F.split(projections['peak_deaths_date'], ' ').getItem(0))
projections = projections.withColumn('peak_affected_date', F.split(projections['peak_affected_date'], ' ').getItem(0))

projections = projections.withColumn('end_day', projections['end_day'].cast(DateType()))
projections = projections.withColumn('peak_deaths_date', projections['peak_deaths_date'].cast(DateType()))
projections = projections.withColumn('peak_affected_date', projections['peak_affected_date'].cast(DateType()))

projections = projections.withColumn("end_day_dayofweek", F.dayofweek(F.col("end_day")))
projections = projections.withColumn("peak_deaths_date_dayofweek", F.dayofweek(F.col("peak_deaths_date")))
projections = projections.withColumn("peak_affected_date_dayofweek", F.dayofweek(F.col("peak_affected_date")))

# Convert to Saturday
def get_week_end_date(val):
    return 7-val

udf_end_date = F.udf(get_week_end_date, IntegerType())

projections = projections.withColumn('30_perc_date', udf_end_date('end_day_dayofweek'))
projections = projections.withColumn('peak_death_date', udf_end_date('peak_deaths_date_dayofweek'))
projections = projections.withColumn('peak_affect_date', udf_end_date('peak_affected_date_dayofweek'))

projections = projections.withColumn('week_ending_30', F.expr("date_add(end_day, 30_perc_date)"))
projections = projections.withColumn('week_ending_pd', F.expr("date_add(peak_deaths_date, peak_death_date)"))
projections = projections.withColumn('week_ending_pa', F.expr("date_add(peak_affected_date, peak_affect_date)"))

projections = projections.select('Geography', 'week_ending_30', 'week_ending_pd', 'week_ending_pa')


# Merge in state abbreviations
projections_updated = projections.join(states, on=['Geography'], how='inner')
#projections_updated = projections.join(states, (projections['Geography'] == states['Geography']), how='inner')

#projections_updated = projections_updated.withColumnRenamed('state',"state_raw")
#projections_updated = projections_updated.drop('state')
projections_updated = projections_updated.drop('Geography')

projections_updated = projections_updated.withColumnRenamed("abbr","state")
projections_updated = projections_updated.withColumn('ph', F.lit(2.0))

proj_30_pec = projections_updated.select('week_ending_30','state','ph')
proj_pd = projections_updated.select('week_ending_pd','state','ph')
proj_pa = projections_updated.select('week_ending_pa','state','ph')

proj_pd_pos = proj_pd.withColumnRenamed('week_ending_pd','week_ending_date').withColumnRenamed('ph','week_ending_pd')
proj_pa_pos = proj_pa.withColumnRenamed('week_ending_pa','week_ending_date').withColumnRenamed('ph','week_ending_pa')
proj_30_pec_pos = proj_30_pec.withColumnRenamed('week_ending_30','week_ending_date').withColumnRenamed('ph','week_ending_30')


proj_30_pec_pos = proj_30_pec_pos.withColumn('week_ending_pa', F.lit(None).cast(DoubleType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('week_ending_pd', F.lit(None).cast(DoubleType()))

proj_pa_pos = proj_pa_pos.withColumn('week_ending_30', F.lit(None).cast(DoubleType()))
proj_pa_pos = proj_pa_pos.withColumn('week_ending_pd', F.lit(None).cast(DoubleType()))

proj_pd_pos = proj_pd_pos.withColumn('week_ending_30', F.lit(None).cast(DoubleType()))
proj_pd_pos = proj_pd_pos.withColumn('week_ending_pa', F.lit(None).cast(DoubleType()))


pos_updated = pos_updated.withColumn('week_ending_30', F.lit(None).cast(DoubleType()))
pos_updated = pos_updated.withColumn('week_ending_pd', F.lit(None).cast(DoubleType()))
pos_updated = pos_updated.withColumn('week_ending_pa', F.lit(None).cast(DoubleType()))


proj_30_pec_pos = proj_30_pec_pos.withColumn('retailer', F.lit(None).cast(StringType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('mdlz_business', F.lit(None).cast(StringType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('mdlz_category', F.lit(None).cast(StringType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('mdlz_brand', F.lit(None).cast(StringType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('mdlz_ppg', F.lit(None).cast(StringType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('pos_qty_ty', F.lit(None).cast(LongType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('pos_dollar_ty', F.lit(None).cast(DoubleType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('week_of_year', F.lit(None).cast(IntegerType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('pos_qty_ly', F.lit(None).cast(LongType()))
proj_30_pec_pos = proj_30_pec_pos.withColumn('pos_dollar_ly', F.lit(None).cast(DoubleType()))

proj_pa_pos = proj_pa_pos.withColumn('retailer', F.lit(None).cast(StringType()))
proj_pa_pos = proj_pa_pos.withColumn('mdlz_business', F.lit(None).cast(StringType()))
proj_pa_pos = proj_pa_pos.withColumn('mdlz_category', F.lit(None).cast(StringType()))
proj_pa_pos = proj_pa_pos.withColumn('mdlz_brand', F.lit(None).cast(StringType()))
proj_pa_pos = proj_pa_pos.withColumn('mdlz_ppg', F.lit(None).cast(StringType()))
proj_pa_pos = proj_pa_pos.withColumn('pos_qty_ty', F.lit(None).cast(LongType()))
proj_pa_pos = proj_pa_pos.withColumn('pos_dollar_ty', F.lit(None).cast(DoubleType()))
proj_pa_pos = proj_pa_pos.withColumn('week_of_year', F.lit(None).cast(IntegerType()))
proj_pa_pos = proj_pa_pos.withColumn('pos_qty_ly', F.lit(None).cast(LongType()))
proj_pa_pos = proj_pa_pos.withColumn('pos_dollar_ly', F.lit(None).cast(DoubleType()))

proj_pd_pos = proj_pd_pos.withColumn('retailer', F.lit(None).cast(StringType()))
proj_pd_pos = proj_pd_pos.withColumn('mdlz_business', F.lit(None).cast(StringType()))
proj_pd_pos = proj_pd_pos.withColumn('mdlz_category', F.lit(None).cast(StringType()))
proj_pd_pos = proj_pd_pos.withColumn('mdlz_brand', F.lit(None).cast(StringType()))
proj_pd_pos = proj_pd_pos.withColumn('mdlz_ppg', F.lit(None).cast(StringType()))
proj_pd_pos = proj_pd_pos.withColumn('pos_qty_ty', F.lit(None).cast(LongType()))
proj_pd_pos = proj_pd_pos.withColumn('pos_dollar_ty', F.lit(None).cast(DoubleType()))
proj_pd_pos = proj_pd_pos.withColumn('week_of_year', F.lit(None).cast(IntegerType()))
proj_pd_pos = proj_pd_pos.withColumn('pos_qty_ly', F.lit(None).cast(LongType()))
proj_pd_pos = proj_pd_pos.withColumn('pos_dollar_ly', F.lit(None).cast(DoubleType()))

column_order = ['retailer', 'state', 'mdlz_business', 'mdlz_category', 'mdlz_brand', 'mdlz_ppg', 'week_of_year', 'week_ending_date', 'pos_qty_ty', 'pos_dollar_ty', 'pos_qty_ly', 'pos_dollar_ly', 'week_ending_30', 'week_ending_pd', 'week_ending_pa']

proj_30_pec_pos = proj_30_pec_pos.select('retailer', 'state', 'mdlz_business', 'mdlz_category', 'mdlz_brand', 'mdlz_ppg', 'week_of_year', 'week_ending_date', 'pos_qty_ty', 'pos_dollar_ty', 'pos_qty_ly', 'pos_dollar_ly', 'week_ending_30', 'week_ending_pd', 'week_ending_pa')
proj_pa_pos = proj_pa_pos.select('retailer', 'state', 'mdlz_business', 'mdlz_category', 'mdlz_brand', 'mdlz_ppg', 'week_of_year', 'week_ending_date', 'pos_qty_ty', 'pos_dollar_ty', 'pos_qty_ly', 'pos_dollar_ly', 'week_ending_30', 'week_ending_pd', 'week_ending_pa')
proj_pd_pos = proj_pd_pos.select('retailer', 'state', 'mdlz_business', 'mdlz_category', 'mdlz_brand', 'mdlz_ppg', 'week_of_year', 'week_ending_date', 'pos_qty_ty', 'pos_dollar_ty', 'pos_qty_ly', 'pos_dollar_ly', 'week_ending_30', 'week_ending_pd', 'week_ending_pa')
pos_updated = pos_updated.select('retailer', 'state', 'mdlz_business', 'mdlz_category', 'mdlz_brand', 'mdlz_ppg', 'week_of_year', 'week_ending_date', 'pos_qty_ty', 'pos_dollar_ty', 'pos_qty_ly', 'pos_dollar_ly', 'week_ending_30', 'week_ending_pd', 'week_ending_pa')

# Append
df_fin = pos_updated.union(proj_30_pec_pos)
df_fin = df_fin.union(proj_pa_pos)
df_fin = df_fin.union(proj_pd_pos)


# %pyspark
# Write out the data for post-model processing and integration into Tableau
df_fin.createOrReplaceTempView('pos')
spark.sql('drop table if exists default.cbda_pos')
spark.sql('create table default.cbda_pos as select * from pos')

# %pyspark