In [1]:
from pyspark.sql.functions import col, udf, unix_timestamp, year, month, dayofmonth, lpad, concat, lit, regexp_replace, when 
from pyspark.sql.types import DateType, StringType, DecimalType, IntegerType, DoubleType, FloatType 
import pyspark.sql.functions as f
from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql import functions as F 
from pyspark.sql.window import Window  
from pyspark.sql.functions import desc
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
from functools import reduce
from pyspark.sql import *

### Connect to SQL and get data

In [3]:
jdbcHostname = ""
jdbcDatabase = ""
jdbcPort = int


jdbcUsername = dbutils.secrets.get(scope ='',  key = '')
jdbcPassword = dbutils.secrets.get(scope ='',  key = '')
 

jdbcUrl = ''.format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  'user' : ,
  'password' : ,
  'driver' : ''
}

## pull from our FP&A data table

In [5]:
pushDown_query = """(select* from FACT_FPA)as s"""
df = spark.read.jdbc(url=jdbcUrl, table=pushDown_query, properties=connectionProperties)

for c in df.columns:
  df = df.withColumnRenamed(c, c.replace(' ','').lower())

### Parameters for date

In [7]:
#Get Parameters from Data Factory

dbutils.widgets.text("userDateinput", "","")
param_interval= dbutils.widgets.get("userDateinput")
print ("Param1:", param_interval)

## creating input year and month column variables

In [9]:
currentMonth = int(param_interval[:2])
currentYear = int(param_interval[2:4]) + 2000
previousYear = currentYear - 1 
preciousYear = str(previousYear)
print('currentMonth:', currentMonth, 'CurrentYear:', currentYear, 'previousYear:', previousYear)

## creating month dataframe

In [11]:
dfmonth = spark.createDataFrame(
    [
        (1, 'Jan'), # create your data here, be consistent in the types.
        (2, 'Feb'),
       (3, 'Mar'),
      (4, 'Apr'),
      (5, 'May'),
      (6, 'Jun'),
      (7, 'Jul'),
      (8, 'Aug'),
      (9, 'Sep'),
      (10, 'Oct'),
      (11, 'Nov'),
      (12, 'Dec'), 
    ],
    ['m', 'month1'] # add your columns label here
)

## joining month to the df

In [13]:
#cost year key to only selec tfrom 2018-2020
df = dfmonth.join(df, dfmonth.month1 == df.month, how='inner')

df = (df
     .where(col('m').isin(currentMonth))
     .drop('month1', 'month')
     .withColumnRenamed('m', 'month')
     )

df = (df
      .withColumn("value3", df['value'].cast(FloatType())).drop('value')
      .withColumnRenamed('value3', 'value')
     )

### Comparisons to Sales

In [15]:
#new way to pull data into one df

dfsales = df.where((col('entity')== 'TOM') & (col('view')== 'MTD') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS Proj$'))
dfsales = dfsales.where((col('product')== 'Total Stryker') | (col('product')== 'MSNT') | (col('product')== 'Ortho & Spine') | (col('product')== 'APAC Group') | (col('product')== 'EEMEA') | (col('product')== 'LATAM'))

dfsales = dfsales.withColumn('prevord', when( (col('product') == 'Total Stryker'), lit('a')).
                               otherwise( when( (col('product') == 'MSNT'), lit('b')).
                               otherwise( when( (col('product') == 'Ortho & Spine'), lit('c')).
                               otherwise( when( (col('product') == 'APAC Group'), lit('d')).
                               otherwise( when( (col('product') == 'EEMEA'), lit('e')).
                               otherwise( when( (col('product') == 'LATAM'), lit('f'))))))))

dfsales = dfsales.orderBy(col('prevord').asc()).drop('prevord')

## creating index column for ordering

In [17]:
#Create index column
from pyspark.sql.functions import monotonically_increasing_id 
df = df.select("*").withColumn("id", monotonically_increasing_id())

### Comparisons to Prior Year

In [19]:
#Comparisons to Prior Year

#Total Stryker
  #OI 
dfoipy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'OI VS Proj bps')).drop('id')

  #PY 
dfeps = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'EPS')).drop('id')
dfeps = dfeps.withColumn('account1', lit('EPS CY')).drop('account').withColumnRenamed('account1', 'account')
dfeps = dfeps.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

dfsalespy = df.where((col('entity')== 'TOM') & (col('view')== 'MTD') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS PY%'))
dfsalespy = dfsalespy.where((col('product')== 'Total Stryker') | (col('product')== 'MSNT') | (col('product')== 'Ortho & Spine') | (col('product')== 'APAC Group') | (col('product')== 'EEMEA') | (col('product')== 'LATAM'))

dfsalespy = dfsalespy.withColumn('prevord', when( (col('product') == 'Total Stryker'), lit('a')).
                               otherwise( when( (col('product') == 'MSNT'), lit('b')).
                               otherwise( when( (col('product') == 'Ortho & Spine'), lit('c')).
                               otherwise( when( (col('product') == 'APAC Group'), lit('d')).
                               otherwise( when( (col('product') == 'EEMEA'), lit('e')).
                               otherwise( when( (col('product') == 'LATAM'), lit('f'))))))))

dfsalespy = dfsalespy.orderBy(col('prevord').asc()).drop('prevord','id')


#Union to create one prior year dataframe
dfpy = [dfoipy, dfeps, dfsalespy]

dfpy = reduce(DataFrame.unionAll, dfpy)

## lots of unions are required for this project so every case of unpersists is to increase performance on the cluster

In [21]:
#unpersisits()
dfoipy.unpersist()
dfeps.unpersist()
dfsalespy.unpersist()

### Comparisons to Operating Income

In [23]:
#Op Income compared to projection

dfoiproj = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'CC') & (col('account')== 'OI'))
dfoiproj = dfoiproj.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))
dfoiproj = dfoiproj.withColumn("value2", dfoiproj['value'].cast(DoubleType())).drop('value')
dfoiproj = dfoiproj.withColumn("value3", dfoiproj['value2'].cast(DoubleType())).drop('value2').withColumnRenamed('value3', 'value')


In [24]:
my_window = Window.partitionBy().orderBy(col("id").desc())
dfoiproj = dfoiproj.withColumn("prev_value", F.lag(dfoiproj.value).over(my_window)) 
dfoiproj = dfoiproj.withColumn("diff", F.when(F.isnull(dfoiproj.value - dfoiproj.prev_value), 0) .otherwise(dfoiproj.value - dfoiproj.prev_value))

dfoiproj = dfoiproj.drop('value', 'id', 'prev_value')
dfoiproj = dfoiproj.withColumnRenamed('diff', 'value')
dfoiproj = dfoiproj.where(col('scenario') == 'Flash')
dfoiproj = dfoiproj.withColumn('account1', lit('OI VS Proj$')).drop('account').withColumnRenamed('account1', 'account')

#SYK OI compared to Projection
dfsykoi = dfoiproj.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

#display(dfsykoi)

In [25]:
#Op Income compared to PY

dfoicc = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'CC') & (col('account')== 'OI')) 
dfoicc = dfoicc.where((col('scenario')== 'Flash') | (col('scenario')== previousYear))

my_window = Window.partitionBy().orderBy(col("id").asc())

dfoicc = dfoicc.withColumn("prev_value", F.lag(dfoicc.value).over(my_window)) 
dfoicc = dfoicc.withColumn("diff", F.when(F.isnull(dfoicc.value - dfoicc.prev_value), 0).otherwise(dfoicc.value - dfoicc.prev_value))

dfoicc = (dfoicc
          .drop('value', 'id', 'prev_value')
          .withColumnRenamed('diff', 'value')
          .where(col('scenario') == 'Flash')
          .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
         )
dfoicc = dfoicc.withColumn('account1', lit('OI VS PY$')).drop('account').withColumnRenamed('account1', 'account')

dfoicc = dfoicc.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')


In [26]:
#eps vs py
dfepspy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'Reported') & (col('account')== 'EPS')) 
dfepspy = dfepspy.where((col('scenario')== 'Flash') | (col('scenario')== previousYear))


my_window = Window.partitionBy().orderBy(col("id").asc())

dfepspy = dfepspy.withColumn("prev_value", F.lag(dfepspy.value).over(my_window)) 
dfepspy = dfepspy.withColumn("diff", F.when(F.isnull(dfepspy.value - dfepspy.prev_value), 0).otherwise(dfepspy.value - dfepspy.prev_value))


dfepspy = dfepspy.drop('value', 'id', 'prev_value').withColumnRenamed('diff', 'value')
dfepspy = dfepspy.where(col('scenario') == 'Flash')
dfepspy = dfepspy.withColumn('account1', lit('EPS VS PY')).drop('account').withColumnRenamed('account1', 'account')
dfepspy = dfepspy.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')



In [27]:
#EPS Projection
dfepstest = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'Reported') & (col('scenario')== 'Projection') & (col('account')== 'EPS')).drop('id')
dfepstest = dfepstest.withColumn('account1', lit('EPS CY')).drop('account').withColumnRenamed('account1', 'account')
dfepstest = dfepstest.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

### MTD tax rate ETR

In [29]:
#tax rate ETR
taxratediff = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('account')== 'ETR'))
taxratediff = taxratediff.where((col('rate')== 'Reported') | (col('rate')== 'CC') | (col('scenario')== 'Flash') | (col('scenario')== 'Projection'))


my_window = Window.partitionBy().orderBy(col("id").asc())

taxratediff = taxratediff.withColumn("prev_value", F.lag(taxratediff.value).over(my_window)) 
taxratediff = taxratediff.withColumn("diff", F.when(F.isnull(taxratediff.value - taxratediff.prev_value), 0).otherwise((taxratediff.value - taxratediff.prev_value)*-1))


taxratediff = taxratediff.drop('value', 'id', 'prev_value')
taxratediff = taxratediff.withColumnRenamed('diff', 'value')
taxratediff = taxratediff.where(col('scenario') == 'Flash')
taxratediff = taxratediff.withColumn('account1', lit('ETR VS Proj')).drop('account').withColumnRenamed('account1', 'account')
taxratediff = taxratediff.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')


taxrate = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'ETR')).drop('id')

## first set of QTD data

### comparisons to sales for QTD

In [32]:
dfsalesq = df.where((col('entity')== 'TOM') & (col('view')== 'QTD') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS Proj$'))
dfsalesq = dfsalesq.where((col('product')== 'Total Stryker') | (col('product')== 'MSNT') | (col('product')== 'Ortho & Spine') | (col('product')== 'APAC Group') | (col('product')== 'EEMEA') | (col('product')== 'LATAM'))

dfsalesq = dfsalesq.withColumn('prevord', when( (col('product') == 'Total Stryker'), lit('a')).
                               otherwise( when( (col('product') == 'MSNT'), lit('b')).
                               otherwise( when( (col('product') == 'Ortho & Spine'), lit('c')).
                               otherwise( when( (col('product') == 'APAC Group'), lit('d')).
                               otherwise( when( (col('product') == 'EEMEA'), lit('e')).
                               otherwise( when( (col('product') == 'LATAM'), lit('f'))))))))

dfsalesq = dfsalesq.orderBy(col('prevord').asc()).drop('prevord','id')

### comparisons to PY for QTD

In [34]:
#Comparisons to Prior Year
#QTD
#Total Stryker
  #OI 
dfoipyq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'OI VS Proj bps')).drop('id')

  #PY 
dfepsq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'EPS')).drop('id')
dfepsq = dfepsq.withColumn('account1', lit('EPS CY')).drop('account').withColumnRenamed('account1', 'account')
dfepsq = dfepsq.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')


dfsalespyq = df.where((col('entity')== 'TOM') & (col('view')== 'QTD') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS PY%'))
dfsalespyq = dfsalespyq.where((col('product')== 'Total Stryker') | (col('product')== 'MSNT') | (col('product')== 'Ortho & Spine') | (col('product')== 'APAC Group') | (col('product')== 'EEMEA') | (col('product')== 'LATAM'))

dfsalespyq = dfsalespyq.withColumn('prevord', when( (col('product') == 'Total Stryker'), lit('a')).
                               otherwise( when( (col('product') == 'MSNT'), lit('b')).
                               otherwise( when( (col('product') == 'Ortho & Spine'), lit('c')).
                               otherwise( when( (col('product') == 'APAC Group'), lit('d')).
                               otherwise( when( (col('product') == 'EEMEA'), lit('e')).
                               otherwise( when( (col('product') == 'LATAM'), lit('f'))))))))

dfsalespyq = dfsalespyq.orderBy(col('prevord').asc()).drop('prevord','id')

#Union to create one prior year dataframe
dfpyq = [dfoipyq, dfepsq, dfsalespyq]

dfpyq = reduce(DataFrame.unionAll, dfpyq)

In [35]:
#unpersisits()
dfoipyq.unpersist()
dfepsq.unpersist()
dfsalespyq.unpersist()

### comparisons to Proj for QTD

In [37]:
#Op Income compared to projection 
#QTD

dfoiprojq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'CC') & (col('account')== 'OI'))
dfoiprojq = dfoiprojq.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))
dfoiprojq = dfoiprojq.withColumn("value2", dfoiprojq['value'].cast(DoubleType())).drop('value')
dfoiprojq = dfoiprojq.withColumn("value3", dfoiprojq['value2'].cast(DoubleType())).drop('value2').withColumnRenamed('value3', 'value')

In [38]:
my_window = Window.partitionBy().orderBy(col("id").desc())
dfoiprojq = dfoiprojq.withColumn("prev_value", F.lag(dfoiprojq.value).over(my_window)) 
dfoiprojq = dfoiprojq.withColumn("diff", F.when(F.isnull(dfoiprojq.value - dfoiprojq.prev_value), 0) .otherwise(dfoiprojq.value - dfoiprojq.prev_value))

dfoiprojq = (dfoiprojq
             .drop('value', 'id', 'prev_value')
             .withColumnRenamed('diff', 'value')
             .where(col('scenario') == 'Flash')
             .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
            )

dfoiprojq = dfoiprojq.withColumn('account1', lit('OI VS Proj$')).drop('account').withColumnRenamed('account1', 'account')
             

#SYK OI compared to Projection
dfsykoiq = dfoiprojq.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

### OP income compared to PY

In [40]:
#Op Income compared to PY
#QTD
dfoiccq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'CC') & (col('account')== 'OI')) 
dfoiccq = dfoiccq.where((col('scenario')== 'Flash') | (col('scenario')== previousYear))

my_window = Window.partitionBy().orderBy(col("id").asc())

dfoiccq = dfoiccq.withColumn("prev_value", F.lag(dfoiccq.value).over(my_window)) 
dfoiccq = dfoiccq.withColumn("diff", F.when(F.isnull(dfoiccq.value - dfoiccq.prev_value), 0).otherwise(dfoiccq.value - dfoiccq.prev_value))

dfoiccq = (dfoiccq
          .drop('value', 'id', 'prev_value')
          .withColumnRenamed('diff', 'value')
          .where(col('scenario') == 'Flash')
          .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
         )
dfoiccq = dfoiccq.withColumn('account1', lit('OI VS PY$')).drop('account').withColumnRenamed('account1', 'account')

dfoiccq = dfoiccq.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

### EPS vs PY for QTD

In [42]:
#eps vs py
dfepspyq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'Reported') & (col('account')== 'EPS')) 
dfepspyq = dfepspyq.where((col('scenario')== 'Flash') | (col('scenario')== previousYear))


my_window = Window.partitionBy().orderBy(col("id").asc())

dfepspyq = dfepspyq.withColumn("prev_value", F.lag(dfepspyq.value).over(my_window)) 
dfepspyq = dfepspyq.withColumn("diff", F.when(F.isnull(dfepspyq.value - dfepspyq.prev_value), 0).otherwise(dfepspyq.value - dfepspyq.prev_value))


dfepspyq = (dfepspyq
  .drop('value', 'id', 'prev_value')
  .withColumnRenamed('diff', 'value')
  .where(col('scenario') == 'Flash')
  .withColumn('account1', lit('EPS VS PY'))
  .drop('account')
  .withColumnRenamed('account1', 'account')
  .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
)

## EPS proj QTD

In [44]:
#EPS Projection
dfepstestq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'Reported') & (col('scenario')== 'Projection') & (col('account')== 'EPS')).drop('id')
dfepstestq = dfepstestq.withColumn('account1', lit('EPS CY')).drop('account').withColumnRenamed('account1', 'account')
dfepstestq = dfepstestq.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

### tax rate ETR QTD

In [46]:
#tax rate ETR
taxratediffq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('account')== 'ETR'))
taxratediffq = taxratediffq.where((col('rate')== 'Reported') | (col('rate')== 'CC'))
taxratediffq = taxratediffq.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))


my_window = Window.partitionBy().orderBy(col("id").asc())

taxratediffq = taxratediffq.withColumn("prev_value", F.lag(taxratediffq.value).over(my_window)) 
taxratediffq = taxratediffq.withColumn("diff", F.when(F.isnull(taxratediffq.value - taxratediffq.prev_value), 0).otherwise((taxratediffq.value - taxratediffq.prev_value)*-1))


taxratediffq = (taxratediffq
                .drop('value', 'id', 'prev_value')
                .withColumnRenamed('diff', 'value')
                .where(col('scenario') == 'Flash')
                .withColumn('account1', lit('ETR VS Proj'))
                .drop('account')
                .withColumnRenamed('account1', 'account')
                .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
               )
taxrateq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'ETR')).drop('id')

### set of YTD comparisons

### YTD comparisons for sales

In [49]:
dfsalesy = df.where((col('entity')== 'TOM') & (col('view')== 'YTD') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS Proj$'))
dfsalesy = dfsalesy.where((col('product')== 'Total Stryker') | (col('product')== 'MSNT') | (col('product')== 'Ortho & Spine') | (col('product')== 'APAC Group') | (col('product')== 'EEMEA') | (col('product')== 'LATAM'))

dfsalesy = dfsalesy.withColumn('prevord', when( (col('product') == 'Total Stryker'), lit('a')).
                               otherwise( when( (col('product') == 'MSNT'), lit('b')).
                               otherwise( when( (col('product') == 'Ortho & Spine'), lit('c')).
                               otherwise( when( (col('product') == 'APAC Group'), lit('d')).
                               otherwise( when( (col('product') == 'EEMEA'), lit('e')).
                               otherwise( when( (col('product') == 'LATAM'), lit('f'))))))))

dfsalesy = dfsalesy.orderBy(col('prevord').asc()).drop('prevord','id')

## YTD prior year

In [51]:
#Comparisons to Prior Year
#YTD
#Total Stryker
  #OI 
dfoipyy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'OI VS Proj bps')).drop('id')

  #PY 
dfepsy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'EPS')).drop('id')
dfepsy = dfepsy.withColumn('account1', lit('EPS CY')).drop('account').withColumnRenamed('account1', 'account')

#test needs to be removed 
dfepsy = dfepsy.withColumn('value2', col('value')+ .00).drop('value')
dfepsy = dfepsy.withColumnRenamed('value2', 'value').drop('value2')

dfepsy = dfepsy.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')


dfsalespyy = df.where((col('entity')== 'TOM') & (col('view')== 'YTD') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS PY%'))
dfsalespyy = dfsalespyy.where((col('product')== 'Total Stryker') | (col('product')== 'MSNT') | (col('product')== 'Ortho & Spine') | (col('product')== 'APAC Group') | (col('product')== 'EEMEA') | (col('product')== 'LATAM'))

dfsalespyy = dfsalespyy.withColumn('prevord', when( (col('product') == 'Total Stryker'), lit('a')).
                               otherwise( when( (col('product') == 'MSNT'), lit('b')).
                               otherwise( when( (col('product') == 'Ortho & Spine'), lit('c')).
                               otherwise( when( (col('product') == 'APAC Group'), lit('d')).
                               otherwise( when( (col('product') == 'EEMEA'), lit('e')).
                               otherwise( when( (col('product') == 'LATAM'), lit('f'))))))))

dfsalespyy = dfsalespyy.orderBy(col('prevord').asc()).drop('prevord','id')

#Union to create one prior year dataframe
dfpyy = [dfoipyy, dfepsy, dfsalespyy]

dfpyy = reduce(DataFrame.unionAll, dfpyy)

In [52]:
#unpersisits()
dfoipyy.unpersist()
dfepsy.unpersist()
dfsalespyy.unpersist()

### comparisons to projection for YTD

In [54]:
#Op Income compared to projection 
#QTD

dfoiprojy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'CC') & (col('account')== 'OI'))
dfoiprojy = dfoiprojy.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))
dfoiprojy = dfoiprojy.withColumn("value2", dfoiprojy['value'].cast(DoubleType())).drop('value')
dfoiprojy = dfoiprojy.withColumn("value3", dfoiprojy['value2'].cast(DoubleType())).drop('value2').withColumnRenamed('value3', 'value')

In [55]:
my_window = Window.partitionBy().orderBy(col("id").desc())
dfoiprojy = dfoiprojy.withColumn("prev_value", F.lag(dfoiprojy.value).over(my_window)) 
dfoiprojy = dfoiprojy.withColumn("diff", F.when(F.isnull(dfoiprojy.value - dfoiprojy.prev_value), 0) .otherwise(dfoiprojy.value - dfoiprojy.prev_value))

dfoiprojy = (dfoiprojy
             .drop('value', 'id', 'prev_value')
             .withColumnRenamed('diff', 'value')
             .where(col('scenario') == 'Flash')
             .withColumn('account1', lit('OI VS Proj$'))
             .drop('account')
             .withColumnRenamed('account1', 'account')
            )

#SYK OI compared to Projection
dfsykoiy = dfoiprojy.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

### OP income compared to PY for YTD

In [57]:
#Op Income compared to PY
#YTD
dfoiccy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'CC') & (col('account')== 'OI')) 
dfoiccy = dfoiccy.where((col('scenario')== 'Flash') | (col('scenario')== previousYear))

my_window = Window.partitionBy().orderBy(col("id").asc())

dfoiccy = dfoiccy.withColumn("prev_value", F.lag(dfoiccy.value).over(my_window)) 
dfoiccy = dfoiccy.withColumn("diff", F.when(F.isnull(dfoiccy.value - dfoiccy.prev_value), 0).otherwise(dfoiccy.value - dfoiccy.prev_value))

dfoiccy = (dfoiccy
          .drop('value', 'id', 'prev_value')
          .withColumnRenamed('diff', 'value')
          .where(col('scenario') == 'Flash')
          .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
         )
dfoiccy = dfoiccy.withColumn('account1', lit('OI VS PY$')).drop('account').withColumnRenamed('account1', 'account')

dfoiccy = dfoiccy.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

### EPS vs PY YTD

In [59]:
#eps vs py
dfepspyy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'Reported') & (col('account')== 'EPS')) 
dfepspyy = dfepspyy.where((col('scenario')== 'Flash') | (col('scenario')== previousYear))


my_window = Window.partitionBy().orderBy(col("id").asc())

dfepspyy = dfepspyy.withColumn("prev_value", F.lag(dfepspyy.value).over(my_window)) 
dfepspyy = dfepspyy.withColumn("diff", F.when(F.isnull(dfepspyy.value - dfepspyy.prev_value), 0).otherwise(dfepspyy.value - dfepspyy.prev_value))


dfepspyy = (dfepspyy
            .drop('value', 'id', 'prev_value')
            .withColumnRenamed('diff', 'value')
            .where(col('scenario') == 'Flash')
            .withColumn('account1', lit('EPS VS PY')).drop('account').withColumnRenamed('account1', 'account')
            .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
           )

### EPS proj YTD

In [61]:
#EPS Projection
dfepstesty = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'Reported') & (col('scenario')== 'Projection') & (col('account')== 'EPS')).drop('id')
dfepstesty = dfepstesty.withColumn('account1', lit('EPS CY')).drop('account').withColumnRenamed('account1', 'account')
dfepstesty = dfepstesty.select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')

## tax rate ETR for YTD

In [63]:
#tax rate ETR
taxratediffy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('account')== 'ETR'))
taxratediffy = taxratediffy.where((col('rate')== 'Reported') | (col('rate')== 'CC'))
taxratediffy = taxratediffy.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))


my_window = Window.partitionBy().orderBy(col("id").asc())

taxratediffy = taxratediffy.withColumn("prev_value", F.lag(taxratediffy.value).over(my_window)) 
taxratediffy = taxratediffy.withColumn("diff", F.when(F.isnull(taxratediffy.value - taxratediffy.prev_value), 0).otherwise((taxratediffy.value - taxratediffy.prev_value)*-1))

taxratediffy = (taxratediffy
                .drop('value', 'id', 'prev_value')
                .withColumnRenamed('diff', 'value')
                .where(col('scenario') == 'Flash')
                .withColumn('account1', lit('ETR VS Proj')).drop('account').withColumnRenamed('account1', 'account')
                .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
               )

taxratey = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('account')== 'ETR')).drop('id')

### EPS VS Proj MTD, QTD, YTD

In [65]:
#eps vs proj
dfepsproj = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'MTD') & (col('rate')== 'Reported') & (col('account')== 'EPS')) 
dfepsproj = dfepsproj.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))


my_window = Window.partitionBy().orderBy(col("id").asc())

dfepsproj = dfepsproj.withColumn("prev_value", F.lag(dfepsproj.value).over(my_window)) 
dfepsproj = dfepsproj.withColumn("diff", F.when(F.isnull(dfepsproj.value - dfepsproj.prev_value), 0).otherwise((dfepsproj.value - dfepsproj.prev_value)*-1))


dfepsproj = (dfepsproj
             .drop('value', 'id', 'prev_value')
             .withColumnRenamed('diff', 'value')
             .where(col('scenario') == 'Projection')
             .withColumn('account1', lit('EPS VS Proj')).drop('account').withColumnRenamed('account1', 'account')
             .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
            )

In [66]:
#eps vs projq
dfepsprojq = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'QTD') & (col('rate')== 'Reported') & (col('account')== 'EPS')) 
dfepsprojq = dfepsprojq.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))


my_window = Window.partitionBy().orderBy(col("id").asc())

dfepsprojq = dfepsprojq.withColumn("prev_value", F.lag(dfepsprojq.value).over(my_window)) 
dfepsprojq = dfepsprojq.withColumn("diff", F.when(F.isnull(dfepsprojq.value - dfepsprojq.prev_value), 0).otherwise((dfepsprojq.value - dfepsprojq.prev_value)*-1))


dfepsprojq = (dfepsprojq
              .drop('value', 'id', 'prev_value')
              .withColumnRenamed('diff', 'value')
              .where(col('scenario') == 'Projection')
              .withColumn('account1', lit('EPS VS Proj'))
              .drop('account')
              .withColumnRenamed('account1', 'account')
              .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
             )

In [67]:
#eps vs projq
dfepsprojy = df.where((col('entity')== 'TOM') & (col('product')== 'Total Stryker') & (col('view')== 'YTD') & (col('rate')== 'Reported') & (col('account')== 'EPS')) 
dfepsprojy = dfepsprojy.where((col('scenario')== 'Flash') | (col('scenario')== 'Projection'))


my_window = Window.partitionBy().orderBy(col("id").asc())

dfepsprojy = dfepsprojy.withColumn("prev_value", F.lag(dfepsprojy.value).over(my_window)) 
dfepsprojy = dfepsprojy.withColumn("diff", F.when(F.isnull(dfepsprojy.value - dfepsprojy.prev_value), 0).otherwise((dfepsprojy.value - dfepsprojy.prev_value)*-1))


dfepsprojy = (dfepsprojy
              .drop('value', 'id', 'prev_value')
              .withColumnRenamed('diff', 'value')
              .where(col('scenario') == 'Projection')
              .withColumn('account1', lit('EPS VS Proj'))
              .drop('account')
              .withColumnRenamed('account1', 'account')
              .select('month', 'entity', 'product', 'year', 'view', 'rate', 'scenario', 'account', 'value')
             )

### Sales Slide MTD, QTD, YTD

In [69]:
dfsales2mqy = df.where((col('entity')== 'TOM') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS Proj$'))
dfsales2mqy = dfsales2mqy.where((col('view')== 'MTD') | (col('view')== 'QTD') | (col('view')== 'YTD'))

dfsales2mqy = dfsales2mqy.where((col('product')== 'DivisionA') | (col('product')== 'DivisionB') | (col('product')== 'DivisionC') | (col('product')== 'DivisionD') | (col('product')== 'DivisionE') | (col('product')== 'DivisionF') | (col('product')== 'DivisionG') | (col('product')== 'DivisionH') | (col('product')== 'DivisionI')).drop('id')

### OpInc Slide M,Q,Y

In [71]:
dfopincsl = df.where((col('entity')== 'TOM') & (col('rate')== 'CC') & (col('scenario')== 'Flash'))
                     
dfopincsl = dfopincsl.where((col('account')== 'OI VS PY bps')| (col('account')== 'R&D VS Proj$') | (col('account')== 'MSG&A VS Proj$') | (col('account')== 'NC VS Proj$'))
                     
dfopincsl = dfopincsl.where((col('view')== 'MTD') | (col('view')== 'QTD') | (col('view')== 'YTD'))

dfopincsl = dfopincsl.where((col('product')== 'MSNT') | (col('product')== 'Ortho & Spine') | (col('product')== 'APAC Group') | (col('product')== 'GQO') | (col('product')== 'Total Corporate') | (col('product')== 'Total Stryker')).drop('id')


dfopincsl2 = df.where((col('entity')== 'TOM') & (col('rate')== 'Reported') & (col('scenario')== 'Flash') & (col('product')== 'Total Stryker'))
                     
dfopincsl2 = dfopincsl2.where((col('account')== 'R&D VS PY bps') | (col('account')== 'MSG&A VS PY bps')| (col('account')== 'NC VS PY bps'))
                     
dfopincsl2 = dfopincsl2.where((col('view')== 'MTD') | (col('view')== 'QTD') | (col('view')== 'YTD')).drop('id')
                               


###Add additional dataframes below this cell and add to the end of the union all

### Union all dataframes (Sales, Prior Year, OpIncome)

In [74]:
#Dataframes to consolidate
#dfsales
#dfpy
#dfoicc
#dfsykoi
#dfepspy

#Union all create one dataframe
dfall = [dfsales, dfpy, dfoicc, dfsykoi, dfepspy, dfepstest, taxratediff, taxrate, dfsalesq, dfpyq, dfoiccq, dfsykoiq, dfepspyq, dfepstestq, taxratediffq, taxrateq, dfsalesy, dfpyy, dfoiccy, dfsykoiy, dfepspyy, dfepstesty, taxratediffy, taxratey, dfepsproj, dfepsprojq, dfepsprojy, dfsales2mqy, dfopincsl, dfopincsl2] #, dfmsntstuff]

dfall = reduce(DataFrame.unionAll, dfall)

#cast to FloatType
dfall = (dfall
         .withColumn("value3", dfall['value'].cast(FloatType()))
         .drop('value')
         .withColumnRenamed('value3', 'value')
        )

#Renaming columns

dfall = (dfall
         .withColumnRenamed('month', 'MonthNumber')
         .withColumnRenamed('entity', 'Entity')
         .withColumnRenamed('product', 'Product')
         .withColumnRenamed('year', 'YearNumber')
         .withColumnRenamed('view', 'View')
         .withColumnRenamed('rate', 'Rate')
         .withColumnRenamed('scenario', 'Scenario')
         .withColumnRenamed('account', 'Account')
         .withColumnRenamed('value', 'Value')
        )

In [75]:
#unpersists 
dfsales.unpersist()
dfpy.unpersist()
dfoicc.unpersist()
dfsykoi.unpersist()
dfepspy.unpersist()
dfepstest.unpersist()
taxratediff.unpersist() 
taxrate.unpersist() 
dfsalesq.unpersist() 
dfpyq.unpersist() 
dfoiccq.unpersist() 
dfsykoiq.unpersist() 
dfepspyq.unpersist() 
dfepstestq.unpersist() 
taxratediffq.unpersist() 
taxrateq.unpersist() 
dfsalesy.unpersist() 
dfpyy.unpersist() 
dfoiccy.unpersist() 
dfsykoiy.unpersist() 
dfepspyy.unpersist() 
dfepstesty.unpersist() 
taxratediffy.unpersist() 
taxratey.unpersist() 
dfepsproj.unpersist() 
dfepsprojq.unpersist() 
dfepsprojy.unpersist() 
dfsales2mqy.unpersist() 
dfopincsl.unpersist() 
dfopincsl2.unpersist()

In [76]:
#creating indexes for both dfs for a join 
from pyspark.sql.functions import monotonically_increasing_id 
dfall = dfall.select("*").withColumn("fakeid", monotonically_increasing_id())

In [77]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
 
dfall = dfall.select('*', F.row_number().over(Window.partitionBy().orderBy(dfall['fakeid'])).alias("row_num")).drop('fakeid')

dfall = dfall.withColumnRenamed('row_num', 'id')

### Rounding

In [79]:
temptablename = 'green'
dfall.createOrReplaceTempView(temptablename)

dfall = spark.sql("SELECT*, CASE WHEN Account like '%$%' then 'number' when Account like 'EPS%' then 'eps' when Account = '%$%' then 'number'  when Account like '%bps' then 'bps' when Account like '%PY%' or Account like 'ETR%' then 'percent' else 'number' END as Measurement from green")

from pyspark.sql import functions as f
dfmax = dfall.withColumn('Rounding1', when( (col('Measurement') == 'eps') , 'twodecimal').
                            otherwise( when(  (col('Measurement') == 'bps') , 'norounding').
                            otherwise( when(  (col('Value') <= -1) , 'zerodecimal').
                            otherwise( when(  (col('Value') >= 1) , 'zerodecimal').
                            otherwise('onedecimal')))))

dfall = dfmax.withColumnRenamed('Rounding1', 'Rounding')

#display(dfall)

In [80]:
#Rounding

from pyspark.sql.functions import col, when

#multiply by 100 to convert to percent, round to 1 decimal
dfmax = dfall.withColumn("value1",
          when(col("Measurement") == 'percent', f.round(dfall["Value"] * 100, 1))
          .otherwise(col("Value"))).drop('Value')

#EPS rounded to 2 decimals
dfmax = dfmax.withColumn("value2",
          when(col("measurement") == 'eps', f.round(dfmax["value1"], 2))
          .otherwise(col("value1"))).drop('value1')

#basis points rounded to 0 decimals
dfmax = dfmax.withColumn("value3",
          when(dfmax.Account.contains('bps'), f.round(dfmax["value2"], 0))
          .otherwise(col("value2"))).drop('value2')

# Values >= 1 million or <= -1 million rounded to 0 decimals, if in hundreds of thousands, round to 1 decimal
dfmax = dfmax.withColumn("value4",
           when((col('Rounding') == 'zerodecimal') & (col('Measurement')== 'number') , f.round(dfmax["value3"], 0))
           .otherwise( when(  (col('Rounding') == 'onedecimal') & (col('Measurement')== 'number') , f.round(dfmax["value3"], 1))         
           .otherwise(col('value3'))))


In [81]:
from pyspark.sql.functions import col, when

dfall = (dfmax
         .withColumnRenamed('value4', 'Value')
         .select('MonthNumber', 'Entity', 'Product', 'YearNumber', 'View', 'Rate', 'Scenario', 'Account', 'Value', 'Measurement', 'id')
        )

## determining favorability column

In [83]:
# case when statement to determine is value is favorable or unfavorable 

temptablename = 'red'
dfall.createOrReplaceTempView(temptablename)

dfall = spark.sql("SELECT*, CASE WHEN value > 0  then 'favorable' else case when value = 0 then 'inline' else 'unfavorable' END END as favorability from red")


In [84]:
dfall = dfall.withColumn('reafavs', when( (col('Account')== 'EPS CY'), (col('favorability'))).otherwise(col('Favorability')))

### creating MSNT driven by logic

In [86]:
#MSNT driven by testing
from pyspark.sql.functions import *
from pyspark.sql.window import Window



dfmsntstuff = dfall.where((col('entity')== 'TOM') & (col('view')== 'MTD') & (col('rate')== 'CC') & (col('scenario')== 'Flash') & (col('account')== 'Sales VS Proj$'))
dfmsntstuff = dfmsntstuff.where((col('product') == 'Divsion1' ) | (col('product') == 'Divsion2' ) | (col('product') == 'Division3' ) | (col('product') == 'Division4' ) | (col('product') == 'Division5' ) | (col('product') == 'Division6' ))
#creating abbrevations df (note: this is changed because I do not want to put in all the division names)
dfab = spark.createDataFrame(
    [
        ('Divsion1', 'Divsion1'), 
        ('Divsion2', 'Divsion2'),
       ('Divsion3', 'Divsion3'),
      ('Divsion4', 'Divsion4'),
      ('Divsion5', 'Divsion5'),
      ('Divsion6', 'Divsion6'),
       
    ],
    ['workplz1', 'abrevs'] # add your columns label here
)

#joining on product
dfmsntstuff = dfab.join(dfmsntstuff, dfmsntstuff.Product == dfab.workplz1, how='inner').drop('workplz1')





#creating rank 
dfmsntstuff = dfmsntstuff.orderBy(col('value'), ascending = False)
dfmsntstuff =  dfmsntstuff.withColumn(
  "rank", dense_rank().over(Window.partitionBy('reafavs').orderBy(desc("value"), desc('id'))))

# logic for the driven by sales stuff within here 
#unsure about front spacing in the comment section, but it should be there 
dfmsntstuff =dfmsntstuff.withColumn('Comments', when((col('Measurement') == 'number') & (col('reafavs') == 'favorable') & (col('rank') == '1'), concat(lit('driven by +$'), col('Value') , lit('M'), lit(" "), col('abrevs'))).
                                    otherwise(when((col('Measurement') == 'number') & (col('reafavs') == 'favorable'), concat(lit(' +$'), col('Value') , lit('M'), lit(" "), col('abrevs'))).
                                    otherwise( when( (col('Measurement') == 'number') & (col('reafavs') == 'unfavorable') & (col('rank') == '1') , concat(lit(', offset by ($'), col('Value') * -1 , lit('M)'), lit(" "), col('abrevs'))).
                                    otherwise( when( (col('Measurement') == 'number') & (col('reafavs') == 'unfavorable') , concat(lit(' ($'), col('Value') * -1 , lit('M)'), lit(" "), col('abrevs'))).
                                              otherwise(col('Value'))))))
#display(dfmsntstuff)

In [87]:
from pyspark.sql.functions import col, when
from pyspark.sql import functions as f


dftest = dfall.withColumn('Comments', when( (col('Measurement') == 'percent') & (col('reafavs') == 'favorable') , concat((col('Value')), lit('%'))).
                              otherwise( when( (col('Measurement') == 'percent') & (col('reafavs') == 'unfavorable') , concat(lit('negative'), col('Value') , lit('%)'))).
                              otherwise( when( (col('Measurement') == 'number') & (col('reafavs') == 'favorable') , concat(lit('+$'), col('Value') , lit('M'))).
                              otherwise( when( (col('Measurement') == 'number') & (col('reafavs') == 'unfavorable') , concat(lit('($'), col('Value') * -1 , lit('M)'))).
                              otherwise( when( (col('Measurement') == 'eps') & (col('reafavs') == 'favorable') , concat(lit('$'), f.format_number(col('Value'), 2))).
                              otherwise( when( (col('Measurement') == 'eps') & (col('reafavs') == 'unfavorable') & (col('favorability') == 'favorable') , concat(lit('$'), f.format_number(col('Value'), 2))).
                              otherwise( when( (col('Measurement') == 'eps') & (col('reafavs') == 'unfavorable') , concat(lit('($'), col('Value') * -1, lit(')'))).
                              otherwise( when( (col('Measurement') == 'bps') & (col('reafavs') == 'favorable') , concat(lit('+'), col('Value') , lit('bps'))).
                              otherwise( when( (col('Measurement') == 'bps') & (col('reafavs') == 'unfavorable') , concat(col('Value'), lit('bps'))).          
                              otherwise(col('Value')))))))))))


dftest = dftest.select("*", f.regexp_replace(col("Comments"), ".0M", "M").alias("replaced")).drop('Comments')
dftest = dftest.withColumnRenamed('replaced', 'Comments')
dftest = dftest.select("*", f.regexp_replace(col("Comments"), ".0bps", "bps").alias("replaced")).drop('Comments')
dftest = dftest.withColumnRenamed('replaced', 'Comments')
dftest = dftest.select("*", f.regexp_replace(col("Comments"), "negative-", "(").alias("replaced")).drop('Comments')
dftest = dftest.withColumnRenamed('replaced', 'Comments')

In [88]:
df = dftest.select('MonthNumber', 'Entity', 'Product', 'YearNumber', 'View', 'Rate', 'Scenario', 'Account', 'Comments', 'reafavs', 'id')
df = df.withColumnRenamed('reafavs', 'Favorability')
df = df.select('MonthNumber', 'Entity', 'Product', 'YearNumber', 'View', 'Rate', 'Scenario', 'Account', 'Comments', 'Favorability', 'id')

### Write to SQL table FPA_COMMENTARY_FLASH

In [90]:
df.write.jdbc(url= jdbcUrl, table= 'FPA_COMMENTARY_FLASH', mode= 'append', properties= connectionProperties)