## Create and Populate Ad Spend Table

- Create ad spend table
- Create widget for specifying the ad spend for a given campaign
- Populate ad spend table with synthetic spend data
- View campaign ad spend details
- Explode struct into multiple rows

In [18]:
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import *
import time
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from minio import Minio


spark = SparkSession.builder.appName('demo').config("hive.metastore.uris", "thrift://localhost:9083", conf=SparkConf()).enableHiveSupport().getOrCreate()

- **View campaign ad spend details**

The channel spend data currently exists as an array. We will explode these values into separate columns in the next step

In [19]:
spark.sql("CREATE DATABASE IF NOT EXISTS user_ad_spend")

DataFrame[]

In [24]:
df = spark.createDataFrame([
    ("3d65f7e92e81480cac52a20dfdf64d5b",
     1000,
    {'Search Engine Marketing':.2,'Email':.2,
    "Social Network": 0.2,
    "Affiliates": 0.2,
    "Google Display Network": 0.2},
     to_timestamp(2020, 5, 17, 0, 0, 0))],schema='campaign_id STRING, total_spend_in_dollars FLOAT, channel_spend dict, campaign_start_date TIMESTAMP')

ParseException: 
extraneous input ',' expecting {'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DAY', 'DATA', 'DATABASE', DATABASES, 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'HOUR', 'IF', 'IGNORE', 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 'MATCHED', 'MERGE', 'MINUTE', 'MONTH', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'PLACING', 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESPECT', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SECOND', 'SCHEMA', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 'SUBSTRING', 'SYNC', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TIME', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TRY_CAST', 'TYPE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 'WINDOW', 'WITH', 'YEAR', 'ZONE', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 11)

== SQL ==
campaign_id, total_spend_in_dollars, channel_spend, campaign_start_date
-----------^^^


In [29]:
spark.read.option("header",True) \
          .csv("gold_ad_spend.csv") \
          .createOrReplaceTempView("gold_ad_spend")

In [30]:
spark.sql("select * from gold_ad_spend").show()

+---+--------------------+----------------------+--------------------+-------------------+
|_c0|         campaign_id|total_spend_in_dollars|       channel_spend|campaign_start_date|
+---+--------------------+----------------------+--------------------+-------------------+
|  0|3d65f7e92e81480ca...|                  1000|{'Search Engine M...|         'Email':.2|
+---+--------------------+----------------------+--------------------+-------------------+



Explode struct into multiple rows

### View Campaign Performance

In this section, we will create the following charts:

- Base conversion rate
- Conversions by date
- Attribution by model type
- Cost per acquisition

### Base conversion rate

In [None]:
spark.sql("""CREATE OR REPLACE TABLE base_conversion_rate
USING DELTA AS
SELECT count(*) as count,
  CASE 
    WHEN conversion == 0 
    THEN 'Impression'
    ELSE 'Conversion'
  END AS interaction_type
FROM
  gold_user_journey
GROUP BY
  conversion;""")

In [None]:
base_conversion_rate_pd = spark.table("base_conversion_rate").toPandas()
 
pie, ax = plt.subplots(figsize=[20,9])
labels = base_conversion_rate_pd['interaction_type']
plt.pie(x=base_conversion_rate_pd['count'], autopct="%.1f%%", explode=[0.05]*2, labels=labels, pctdistance=0.5)
plt.title("Base Conversion Rate");

### Conversions by date

In [None]:
conversions_by_date_pd = spark.table("conversions_by_date").toPandas()
 
plt.figure(figsize=(20,9))
pt = sns.lineplot(x='date',y='count',data=conversions_by_date_pd)
 
pt.tick_params(labelsize=20)
pt.set_xlabel('Date')
pt.set_ylabel('Number of Conversions')
plt.title("Conversions by Date");

### Attribution by model type

In [None]:
spark.sql("""CREATE OR REPLACE TABLE attribution_by_model_type 
USING DELTA AS
SELECT attribution_model, channel, round(attribution_percent * (
    SELECT count(*) FROM gold_user_journey WHERE conversion = 1)) AS conversions_attributed
FROM gold_attribution;""")

In [None]:
attribution_by_model_type_pd = spark.table("attribution_by_model_type").toPandas()
 
pt = sns.catplot(x='channel',y='conversions_attributed',hue='attribution_model',data=attribution_by_model_type_pd, kind='bar', aspect=4, legend=True)
pt.fig.set_figwidth(20)
pt.fig.set_figheight(9)
 
plt.tick_params(labelsize=15)
plt.ylabel("Number of Conversions")
plt.xlabel("Channels")
plt.title("Channel Performance");

### Cost per acquisition

In [None]:
spark.sql("""CREATE OR REPLACE TABLE cpa_summary 
USING DELTA
AS
SELECT
  spending.channel,
  spending.dollar_spend,
  attribution_count.attribution_model,
  attribution_count.conversions_attributed,
  round(spending.dollar_spend / attribution_count.conversions_attributed,2) AS CPA_in_Dollars
FROM
  (SELECT explode(channel_spend) AS (channel, spend),
   round(total_spend_in_dollars * spend, 2) AS dollar_spend
   FROM gold_ad_spend) AS spending
JOIN
  (SELECT attribution_model, channel, round(attribution_percent * (
      SELECT count(*) FROM gold_user_journey WHERE conversion = 1)) AS conversions_attributed
   FROM gold_attribution) AS attribution_count
ON spending.channel = attribution_count.channel;""")

In [None]:
cpa_summary_pd = spark.table("cpa_summary").toPandas()
 
pt = sns.catplot(x='channel', y='CPA_in_Dollars',hue='attribution_model',data=cpa_summary_pd, kind='bar', aspect=4, ci=None)
plt.title("Cost of Acquisition by Channel")
pt.fig.set_figwidth(20)
pt.fig.set_figheight(9)
 
plt.tick_params(labelsize=15)
plt.ylabel("CPA in $")
plt.xlabel("Channels")
plt.title("Channel Cost per Acquisition");

### Budget Allocation Optimization.

Now that we have assigned credit to our marketing channels using Markov Chains, we can take a data-driven approach for budget allocation.

One KPI we can take a look at is Return on Ad Spend (ROAS).
In the ecommerce world, ROAS is calculated as:
- ROAS = Revenue from marketing/ Advertising spent

In our example, instead of working with exact values, we will divide the % of conversion attributed to a channel by the % of total adspend allocated to that channel.

- ROAS = CHANNEL CONVERSION WEIGHT / CHANNEL BUDGET WEIGHT
- ROAS value > 1 signifies that the channel has been allocated less budget than warranted by its conversion rate.
- ROAS value < 1 signifies that the channel has been allocated more budget than warranted by its conversion rate.
- ROAS value = 1 signifies and optimized budget allocation.
From ROAS, we can calculate the Proposed Budget for each channel

Proposed budget = Current budget X ROAS
To calculate ROAS we will join the following exploded_gold_ad_spend and usr_attribution Tables:

**gold_attribution:** This table contains the calculated attribution % per channel based on different attribution models.
exploded_gold_ad_spend: This table contains the current budget allocated per channel. The column pct_spend documents the % of the total budget that has been allocated to a given channel.

In [None]:
spark.sql("SELECT * FROM exploded_gold_ad_spend;")"

In [None]:
spark.sql("""CREATE OR REPLACE TABLE spend_optimization_view 
USING DELTA
AS
SELECT
  a.channel,
  a.pct_spend,
  b.attribution_percent,
  b.attribution_percent / a.pct_spend as ROAS,
  a.dollar_spend,
  round(
    (b.attribution_percent / a.pct_spend) * a.dollar_spend,
    2
  ) as proposed_dollar_spend
FROM
  exploded_gold_ad_spend a
  JOIN gold_attribution b on a.channel = b.channel
  and attribution_model = 'markov_chain';
  
CREATE
OR REPLACE TABLE spend_optimization_final 
USING DELTA AS
SELECT
  channel,
  'current_spending' AS spending,
  dollar_spend as budget
 FROM exploded_gold_ad_spend
UNION
SELECT
  channel,
  'proposed_spending' AS spending,
  proposed_dollar_spend as budget
FROM
  spend_optimization_view;
""")

In [None]:
spend_optimization_final_pd = spark.table("spend_optimization_final").toPandas()
 
pt = sns.catplot(x='channel', y='budget', hue='spending', data=spend_optimization_final_pd, kind='bar', aspect=4, ci=None)
 
plt.tick_params(labelsize=15)
pt.fig.set_figwidth(20)
pt.fig.set_figheight(9)
plt.title("Spend Optimization per Channel")
plt.ylabel("Budget in $")
plt.xlabel("Channels")