In [1]:
import findspark
findspark.init('/home/taras/spark-3.5.4-bin-hadoop3')

import pyspark.sql.functions as F
from pyspark.sql import SparkSession 
from pyspark.sql.window import Window

In [2]:
jar_files = [
    "spark-3.5.4-bin-hadoop3/jars/postgresql-42.7.5.jar",
    "spark-3.5.4-bin-hadoop3/jars/clickhouse-jdbc-0.4.6.jar"
]

In [None]:
spark = (
    SparkSession
    .builder
    .appName("Spark-Webinar")
    .config("spark.jars", ",".join(jar_files))
    .getOrCreate()
)

In [4]:
spark

## Connecting to data sources

### CSV

In [None]:
campaigns_dict = (
    spark.read
    .option('header', True)
    .csv('Data/campaigns_dict.csv')
)

In [6]:
campaigns_dict.show(5, truncate=False)

+-----------+------------------------------------------+
|campaign_id|campaign_name                             |
+-----------+------------------------------------------+
|1          |year_modern_kitchen_launch_20250115       |
|2          |quarter_custom_kitchens_showcase_20240210 |
|3          |month_smart_kitchen_promotion_20240305    |
|4          |year_luxury_kitchens_exhibit_20240420     |
|5          |quarter_ecofriendly_kitchen_offer_20240512|
+-----------+------------------------------------------+
only showing top 5 rows



In [7]:
campaigns_dict.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)



### Parquet

In [8]:
submits = spark.read.parquet('Data/submits.parquet')

In [9]:
submits.show(5, truncate=False)

+---------+--------+-----------+
|submit_id|name    |phone      |
+---------+--------+-----------+
|7017     |Lawrence|38451903550|
|1341     |Steven  |38404238487|
|1400     |Chelsea |38526910551|
|4605     |Brenda  |38532627985|
|3274     |John    |38210569664|
+---------+--------+-----------+
only showing top 5 rows



In [10]:
submits.printSchema()

root
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: long (nullable = true)



In [11]:
deals = spark.read.parquet('Data/deals.parquet')

In [12]:
deals.show(5, truncate=False)

+-------+----------+---------------+-----------+------------------------+------------------------------------------------------+
|deal_id|deal_date |fio            |phone      |email                   |address                                               |
+-------+----------+---------------+-----------+------------------------+------------------------------------------------------+
|1      |2025-01-15|Jennifer Garcia|38896664832|barnettsarah@example.org|1772 Mike Harbors Apt. 729, Ryanland, FL 52653        |
|2      |2024-09-06|Amanda Mills   |38987999315|kimberlydean@example.org|Unit 9167 Box 9119, DPO AA 68389                      |
|3      |2024-07-20|Krista Davis   |38784086816|davidchan@example.net   |431 Jennifer Station Suite 110, West John, WA 60345   |
|4      |2024-07-30|Barbara Hurst  |38099041526|sandra42@example.net    |3510 Brandy Drive Apt. 313, West Micheleview, VI 21235|
|5      |2024-03-21|Thomas Clay    |38856195597|hharris@example.org     |717 William Crest, Port 

In [13]:
deals.show(2, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------
 deal_id   | 1                                              
 deal_date | 2025-01-15                                     
 fio       | Jennifer Garcia                                
 phone     | 38896664832                                    
 email     | barnettsarah@example.org                       
 address   | 1772 Mike Harbors Apt. 729, Ryanland, FL 52653 
-RECORD 1---------------------------------------------------
 deal_id   | 2                                              
 deal_date | 2024-09-06                                     
 fio       | Amanda Mills                                   
 phone     | 38987999315                                    
 email     | kimberlydean@example.org                       
 address   | Unit 9167 Box 9119, DPO AA 68389               
only showing top 2 rows



In [14]:
deals.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)



In [15]:
import pyarrow as pa
import pyarrow.parquet as pq

In [16]:
pq.read_metadata('Data/submits.parquet')

<pyarrow._parquet.FileMetaData object at 0x7f51454cf060>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 3
  num_rows: 4000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2367

### PostgreSQL

In [17]:
pg_host = 'localhost'
pg_port = '5432'
pg_db = 'postgres'
pg_table = 'costs'
pg_user = 'postgres'
pg_password = 'postgres'

In [18]:
costs = (
    spark.read
    .format('jdbc')
    .option('url', f'jdbc:postgresql://{pg_host}:{pg_port}/{pg_db}')
    .option('dbtable', pg_table)
    .option('user', pg_user)
    .option('password', pg_password)
    .option('driver', 'org.postgresql.Driver')
    .load()
)

In [19]:
costs.show(5, truncate=False)

+----------+-----------+------+------+-----+
|date      |campaign_id|costs |clicks|views|
+----------+-----------+------+------+-----+
|2024-01-01|1          |670.52|40    |110  |
|2024-01-01|2          |602.5 |11    |849  |
|2024-01-01|3          |654.74|51    |566  |
|2024-01-01|4          |897.24|86    |679  |
|2024-01-01|5          |758.19|30    |585  |
+----------+-----------+------+------+-----+
only showing top 5 rows



In [20]:
costs.printSchema()

root
 |-- date: string (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- costs: float (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- views: integer (nullable = true)



### ClickHouse

In [21]:
ch_host = 'localhost'
ch_port = '8123'
ch_db = 'default'
ch_table = 'visits'

In [22]:
visits = (
    spark.read
    .format('jdbc')
    .option('url', f'jdbc:clickhouse://{ch_host}:{ch_port}/{ch_db}')
    .option('dbtable', ch_table)
    .option('driver', 'com.clickhouse.jdbc.ClickHouseDriver')
    .load()
)

In [23]:
visits.show()

+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
|visitid|      visitDateTime|                 URL|duration|clientID|  source|         UTMCampaign|          params|
+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
| 189665|2024-01-01 00:19:38|https://our-cool-...|      78|     848|  direct|quarter_ecofriend...|['submit', 2136]|
| 504698|2024-01-01 00:59:20|https://our-cool-...|       8|     527|  direct|year_modern_kitch...|['submit', 4630]|
| 632370|2024-01-01 01:49:09|https://our-cool-...|      72|     520| organic|quarter_ecofriend...|['submit', 2734]|
| 943112|2024-01-01 02:20:28|https://our-cool-...|      14|     117| organic|month_openconcept...|['submit', 5299]|
| 139778|2024-01-01 07:31:30|https://our-cool-...|      73|     655|internal|month_contemporar...| ['submit', 419]|
| 954751|2024-01-01 08:15:49|https://our-cool-...|      99|     634|inte

In [24]:
visits.show(1, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 visitid       | 189665                                       
 visitDateTime | 2024-01-01 00:19:38                          
 URL           | https://our-cool-website.com/checkout        
 duration      | 78                                           
 clientID      | 848                                          
 source        | direct                                       
 UTMCampaign   | quarter_ecofriendly_kitchens_launch_20240205 
 params        | ['submit', 2136]                             
only showing top 1 row



In [25]:
visits.printSchema()

root
 |-- visitid: integer (nullable = true)
 |-- visitDateTime: timestamp (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)



## Data preparation

### Visits (ClickHouse)

In [26]:
filtered_step1 = (
    visits
    .withColumn('dt', F.date_format(F.col('visitDateTime'), 'yyyy-MM-dd'))
    .where(F.col('dt').between('2024-01-01', '2025-01-27'))
    .where(F.col('source').isin('ad', 'direct'))
    .where(F.col('URL').rlike('.*checkout.*|.*add.*|.*home.*|.*contact.*|.*top50.*|.*customer-service.*|.*wishlist.*|.*sale.*|.*best-sellers.*|.*view.*|.*discount.*|.*featured.*|.*new-arrivals.*|.*settings.*|.*return-policy.*|.*edit.*|.*delete.*|.*reviews.*|.*products.*|.*about.*'))
    .select(
        'dt',
        'visitid',
        'clientID',
        'URL',
        'duration',
        'source',
        'UTMCampaign',
        'params',
        F.regexp_replace(F.col('params'), r'\[|\]', '').alias('params_regex')
    )
    .withColumn('params_split', F.split('params_regex', ', '))
)

In [27]:
filtered_step1.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 dt           | 2024-01-01                                   
 visitid      | 189665                                       
 clientID     | 848                                          
 URL          | https://our-cool-website.com/checkout        
 duration     | 78                                           
 source       | direct                                       
 UTMCampaign  | quarter_ecofriendly_kitchens_launch_20240205 
 params       | ['submit', 2136]                             
 params_regex | 'submit', 2136                               
 params_split | ['submit', 2136]                             
only showing top 1 row



In [28]:
filtered_step1.printSchema() 

root
 |-- dt: string (nullable = true)
 |-- visitid: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)
 |-- params_regex: string (nullable = true)
 |-- params_split: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [29]:
filtered_step2 = (
    filtered_step1
    .withColumn('event_type', F.regexp_replace(F.col('params_split')[0], "'", ''))
    .withColumn('event_id', F.col('params_split')[1].cast('int'))
)

In [30]:
filtered_step2.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 dt           | 2024-01-01                                   
 visitid      | 189665                                       
 clientID     | 848                                          
 URL          | https://our-cool-website.com/checkout        
 duration     | 78                                           
 source       | direct                                       
 UTMCampaign  | quarter_ecofriendly_kitchens_launch_20240205 
 params       | ['submit', 2136]                             
 params_regex | 'submit', 2136                               
 params_split | ['submit', 2136]                             
 event_type   | submit                                       
 event_id     | 2136                                         
only showing top 1 row



In [31]:
filtered_step2.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visitid: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)
 |-- params_regex: string (nullable = true)
 |-- params_split: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- event_type: string (nullable = true)
 |-- event_id: integer (nullable = true)



In [32]:
visits_df = (
    filtered_step2
    .where(F.col('event_type') == 'submit')
    .select(
        'dt',
        F.col('visitid').cast('string').alias('visitid'),
        F.col('clientID').cast('string').alias('clientid'),
        'URL',
        'duration',
        'source',
        'UTMCampaign',
        'event_type',
        'event_id'
    )
    .distinct()
)

In [33]:
visits_df.show(1, truncate=False, vertical=True)

-RECORD 0------------------------------------------------
 dt          | 2024-02-07                                
 visitid     | 925403                                    
 clientid    | 418                                       
 URL         | https://our-cool-website.com/best-sellers 
 duration    | 9                                         
 source      | direct                                    
 UTMCampaign | year_luxury_kitchens_event_20250120       
 event_type  | submit                                    
 event_id    | 7485                                      
only showing top 1 row



In [34]:
visits_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visitid: string (nullable = true)
 |-- clientid: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_id: integer (nullable = true)



In [35]:
visits_df.count()

2433

In [36]:
visits.count()

10000

### Costs (PostgreSQL)

In [37]:
costs_df = (
    costs
    .groupBy(
        F.col('date').cast('string').alias('date'),
        'campaign_id'
    )
    .agg(
        F.sum(F.col('costs')).cast('decimal(19,2)').alias('costs'),
        F.sum(F.col('clicks')).alias('clicks'),
        F.sum(F.col('views')).alias('views')
    )
)

In [38]:
costs_df.show(5)

+----------+-----------+------+------+-----+
|      date|campaign_id| costs|clicks|views|
+----------+-----------+------+------+-----+
|2024-01-05|         75|107.03|    26|  958|
|2024-01-05|         97|408.51|    59|  292|
|2024-01-06|         44|748.68|    33|  380|
|2024-01-06|         58|422.91|    30|  575|
|2024-01-11|         91|496.45|     5|  435|
+----------+-----------+------+------+-----+
only showing top 5 rows



In [39]:
costs_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- costs: decimal(19,2) (nullable = true)
 |-- clicks: long (nullable = true)
 |-- views: long (nullable = true)



### Campaigns (CSV)

In [40]:
campaigns_dict.show(1, truncate=False)

+-----------+-----------------------------------+
|campaign_id|campaign_name                      |
+-----------+-----------------------------------+
|1          |year_modern_kitchen_launch_20250115|
+-----------+-----------------------------------+
only showing top 1 row



In [41]:
campaigns_dict.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)



In [42]:
campaigns_df = (
    campaigns_dict
    .withColumn('campaign_id', F.col('campaign_id').cast('integer'))
    .withColumn(
        'campaign_duration',
        F.when(F.col('campaign_name').like('year%'), 'Year')
        .when(F.col('campaign_name').like('quarter%'), 'Quarter')
        .when(F.col('campaign_name').like('month%'), 'Month')
        .otherwise(None)
    )
)

In [43]:
campaigns_df.show(5, truncate=False)

+-----------+------------------------------------------+-----------------+
|campaign_id|campaign_name                             |campaign_duration|
+-----------+------------------------------------------+-----------------+
|1          |year_modern_kitchen_launch_20250115       |Year             |
|2          |quarter_custom_kitchens_showcase_20240210 |Quarter          |
|3          |month_smart_kitchen_promotion_20240305    |Month            |
|4          |year_luxury_kitchens_exhibit_20240420     |Year             |
|5          |quarter_ecofriendly_kitchen_offer_20240512|Quarter          |
+-----------+------------------------------------------+-----------------+
only showing top 5 rows



In [44]:
campaigns_df.printSchema()

root
 |-- campaign_id: integer (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_duration: string (nullable = true)



### Submits (Parquet)

In [45]:
submits.printSchema()

root
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: long (nullable = true)



In [46]:
submits.show(1)

+---------+--------+-----------+
|submit_id|    name|      phone|
+---------+--------+-----------+
|     7017|Lawrence|38451903550|
+---------+--------+-----------+
only showing top 1 row



In [47]:
submits_df = (
    submits
    .withColumn('phone', F.col('phone').cast('string'))
    .withColumn('phone_plus', F.concat(F.lit('+'), F.col('phone')))
    .withColumn('phone_md5', F.md5('phone'))
    .withColumn('phone_plus_md5', F.md5('phone_plus'))
)

In [48]:
submits_df.show(2, truncate=False)

+---------+--------+-----------+------------+--------------------------------+--------------------------------+
|submit_id|name    |phone      |phone_plus  |phone_md5                       |phone_plus_md5                  |
+---------+--------+-----------+------------+--------------------------------+--------------------------------+
|7017     |Lawrence|38451903550|+38451903550|c79d44d2541f74f80df091409a623d81|65dc8fa95bcb53b67b08d49d8bd18117|
|1341     |Steven  |38404238487|+38404238487|4c6e732c96b6b4cbeac0cfdd0cedce44|83b5290406f00fd6ed1623c13b85b1b6|
+---------+--------+-----------+------------+--------------------------------+--------------------------------+
only showing top 2 rows



### Deals (Parquet)

In [49]:
deals.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)



In [50]:
deals_df = (
    deals
    .withColumn('username', F.split(F.col('email'), '@').getItem(0))
    .withColumn('domain', F.split(F.col('email'), '@').getItem(1))
    .where(F.col('domain').isin('example.com', 'example.org', 'example.net'))
    .withColumn('phone', F.col('phone').cast('string'))
)

In [51]:
deals_df.show(1)

+-------+----------+---------------+-----------+--------------------+--------------------+------------+-----------+
|deal_id| deal_date|            fio|      phone|               email|             address|    username|     domain|
+-------+----------+---------------+-----------+--------------------+--------------------+------------+-----------+
|      1|2025-01-15|Jennifer Garcia|38896664832|barnettsarah@exam...|1772 Mike Harbors...|barnettsarah|example.org|
+-------+----------+---------------+-----------+--------------------+--------------------+------------+-----------+
only showing top 1 row



In [52]:
deals_df.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- username: string (nullable = true)
 |-- domain: string (nullable = true)



## Creating Data Mart

### One Big Table (OBT)

In [53]:
customer_detailed = (
    visits_df.alias('v')
    .join(
        submits_df.alias('s'),
        F.col('v.event_id') == F.col('s.submit_id'),
        'left'
    )
    .join(
        deals_df.alias('d'),
        (F.col('s.phone') == F.col('d.phone')) &
        (F.col('v.dt') <= F.col('d.deal_date')),
        'left'
    )
    .join(
        campaigns_df.alias('camp'),
        F.col('v.utmcampaign') == F.col('camp.campaign_name'),
        'left'
    )
    .join(
        costs_df.alias('c'),
        (F.col('camp.campaign_id') == F.col('c.campaign_id')) &
        (F.col('v.dt') == F.col('c.date')),
        'left'
    )
    .select(
        'v.dt',
        F.col('v.visitid').alias('visit_id'),
        F.col('v.clientid').alias('client_id'),
        'v.url',
        'v.duration',
        'v.source',
        'v.utmcampaign',
        'v.event_type',
        'v.event_id',
        's.submit_id',
        's.name',
        's.phone',
        's.phone_plus',
        's.phone_md5',
        's.phone_plus_md5',
        'd.deal_id',
        'd.deal_date',
        'd.fio',
        F.col('d.phone').alias('phone_deal'),
        'd.email',
        'd.address',
        'd.username',
        'd.domain',
        'camp.campaign_name',
        'camp.campaign_duration',
        'c.costs',
        'c.clicks',
        'c.views'
    )
)

In [54]:
customer_detailed.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visit_id: string (nullable = true)
 |-- client_id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- utmcampaign: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_id: integer (nullable = true)
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- phone_plus: string (nullable = true)
 |-- phone_md5: string (nullable = true)
 |-- phone_plus_md5: string (nullable = true)
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone_deal: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- username: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_duration: string (nullable = tr

In [55]:
len(customer_detailed.columns)

28

In [None]:
customer_detailed.cache()

In [57]:
customer_detailed.count()

2660

In [58]:
campaigns_agg = (
    customer_detailed
    .groupBy('campaign_name')
    .agg(
        F.countDistinct('visit_id').alias('unique_visits'),
        F.countDistinct('client_id').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views'),
        F.sum('duration').alias('total_duration')
    )
    .withColumn('avg_deal_cost', (F.col('total_costs') / F.col('unique_deals')).cast('decimal(19,2)'))
)

In [59]:
campaigns_agg.cache().count()

99

In [60]:
campaigns_agg.show(1, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------
 campaign_name  | year_traditional_kitchens_demo_20240725 
 unique_visits  | 22                                      
 unique_clients | 21                                      
 unique_submits | 9                                       
 unique_deals   | 5                                       
 total_costs    | 13676.41                                
 total_clicks   | 1600                                    
 total_views    | 15987                                   
 total_duration | 1408                                    
 avg_deal_cost  | 2735.28                                 
only showing top 1 row



In [61]:
dates_agg = (
    customer_detailed
    .groupBy(F.substring('dt', 1, 7).alias('month'))  # 2025-01-01
    .agg(
        F.countDistinct('visit_id').alias('unique_visits'),
        F.countDistinct('client_id').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views'),
        F.sum('duration').alias('total_duration')
    )
    .withColumn('avg_deal_cost', (F.col('total_costs') / F.col('unique_deals')).cast('decimal(19,2)'))
)

In [62]:
dates_agg.cache().count()

13

In [63]:
dates_agg.show(1, truncate=False, vertical=True)

-RECORD 0-------------------
 month          | 2024-09   
 unique_visits  | 191       
 unique_clients | 168       
 unique_submits | 60        
 unique_deals   | 14        
 total_costs    | 103217.36 
 total_clicks   | 10075     
 total_views    | 102169    
 total_duration | 10118     
 avg_deal_cost  | 7372.67   
only showing top 1 row



In [64]:
def save_to_postgres(df, table_name):
    (
        df.write
        .format('jdbc')
        .option('url', f'jdbc:postgresql://{pg_host}:{pg_port}/{pg_db}')
        .option('dbtable', table_name)
        .option('user', pg_user)
        .option('password', pg_password)
        .option('driver', 'org.postgresql.Driver')
        .mode("overwrite")
        .save()
    )

In [65]:
save_to_postgres(customer_detailed, 'customer_detailed')

In [66]:
save_to_postgres(campaigns_agg, 'campaigns_agg')

In [67]:
save_to_postgres(dates_agg, 'dates_agg')

## Analyzing results

In [68]:
# 1. Campaigns without revenue
(
    campaigns_agg
    .where('unique_deals = 0')
    .select('campaign_name', 'total_costs', 'unique_deals')
    .sort('total_costs')
    .show(truncate=False)
)

+------------------------------------------------+-----------+------------+
|campaign_name                                   |total_costs|unique_deals|
+------------------------------------------------+-----------+------------+
|quarter_traditional_kitchens_fair_20240518      |7169.02    |0           |
|month_smart_kitchens_launch_20240330            |7312.57    |0           |
|quarter_ecofriendly_kitchens_initiative_20240828|8306.39    |0           |
|year_luxury_kitchens_innovation_20240409        |9818.46    |0           |
|year_smart_kitchens_event_20250102              |11455.21   |0           |
|month_ecofriendly_kitchens_experience_20241219  |11737.74   |0           |
|year_traditional_kitchens_exhibit_20240130      |12038.39   |0           |
|year_traditional_kitchens_showcase_20241018     |12105.87   |0           |
|year_openconcept_kitchens_demo_20240430         |12147.50   |0           |
|month_smart_kitchens_exhibit_20240625           |12430.19   |0           |
|quarter_mod

In [69]:
(
    campaigns_agg
    .where('unique_deals = 0')
    .select(F.count('campaign_name'), F.sum('total_costs'))
    .show()
)

+--------------------+----------------+
|count(campaign_name)|sum(total_costs)|
+--------------------+----------------+
|                  16|       193036.76|
+--------------------+----------------+



In [70]:
# 2. Average deal cost
(
    campaigns_agg
    .where('unique_deals > 0')
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .sort('avg_deal_cost')
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+-------------+
|campaign_name                                  |total_costs|unique_deals|avg_deal_cost|
+-----------------------------------------------+-----------+------------+-------------+
|quarter_ecofriendly_kitchen_experience_20241111|16544.41   |10          |1654.44      |
|month_contemporary_kitchens_event_20240920     |10680.46   |6           |1780.08      |
|quarter_ecofriendly_kitchen_offer_20240512     |9024.24    |5           |1804.85      |
|month_custom_kitchens_experience_20240901      |7263.46    |4           |1815.87      |
|month_smart_kitchens_event_20240614            |11026.88   |6           |1837.81      |
|quarter_custom_kitchens_showcase_20240210      |8284.88    |4           |2071.22      |
|quarter_ecofriendly_kitchens_showcase_20240823 |10576.01   |5           |2115.20      |
|month_ecofriendly_kitchens_launch_20240325     |15366.61   |7           |2195.23      |
|year_modern_kitchen_

In [71]:
(
    campaigns_agg
    .where('unique_deals > 0')
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .sort('avg_deal_cost', ascending=False)
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+-------------+
|campaign_name                                  |total_costs|unique_deals|avg_deal_cost|
+-----------------------------------------------+-----------+------------+-------------+
|month_openconcept_kitchens_event_20241210      |16605.72   |1           |16605.72     |
|month_custom_kitchens_show_20241205            |14352.87   |1           |14352.87     |
|month_contemporary_kitchens_initiative_20240928|14157.05   |1           |14157.05     |
|quarter_spacesaving_kitchens_showcase_20240809 |14108.72   |1           |14108.72     |
|year_luxury_kitchens_initiative_20241029       |13537.64   |1           |13537.64     |
|year_modern_kitchen_innovation_20250107        |13419.85   |1           |13419.85     |
|quarter_custom_kitchens_experience_20240527    |13162.80   |1           |13162.80     |
|quarter_custom_kitchen_showcase_20240503       |12783.37   |1           |12783.37     |
|month_contemporary_k

In [72]:
# 3. Unprofitable campaigns (each deal costs 5 000)
(
    campaigns_agg
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .withColumn('revenue', F.col('unique_deals') * F.lit(5000))
    .withColumn('profit', F.col('revenue') - F.col('total_costs'))
    .sort('profit')
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+-------------+-------+---------+
|campaign_name                                  |total_costs|unique_deals|avg_deal_cost|revenue|profit   |
+-----------------------------------------------+-----------+------------+-------------+-------+---------+
|month_contemporary_kitchens_collection_20240914|18847.15   |0           |NULL         |0      |-18847.15|
|year_kitchen_design_initiative_20241030        |16014.35   |0           |NULL         |0      |-16014.35|
|year_luxury_kitchens_show_20241001             |14882.64   |0           |NULL         |0      |-14882.64|
|quarter_spacesaving_kitchen_experience_20240521|13146.61   |0           |NULL         |0      |-13146.61|
|year_modern_kitchen_initiative_20250111        |12938.15   |0           |NULL         |0      |-12938.15|
|quarter_modern_kitchens_innovation_20240812    |12686.52   |0           |NULL         |0      |-12686.52|
|month_smart_kitchens_exhibit_2024062

In [73]:
# 4. The most profitable adv campaigns
(
    campaigns_agg
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .withColumn('revenue', F.col('unique_deals') * F.lit(5000))
    .withColumn('profit', F.col('revenue') - F.col('total_costs'))
    .sort(F.desc('profit'))
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+-------------+-------+--------+
|campaign_name                                  |total_costs|unique_deals|avg_deal_cost|revenue|profit  |
+-----------------------------------------------+-----------+------------+-------------+-------+--------+
|quarter_ecofriendly_kitchen_experience_20241111|16544.41   |10          |1654.44      |50000  |33455.59|
|month_ecofriendly_kitchens_launch_20240325     |15366.61   |7           |2195.23      |35000  |19633.39|
|month_contemporary_kitchens_event_20240920     |10680.46   |6           |1780.08      |30000  |19319.54|
|month_smart_kitchens_event_20240614            |11026.88   |6           |1837.81      |30000  |18973.12|
|year_traditional_kitchens_fair_20240415        |16941.77   |7           |2420.25      |35000  |18058.23|
|month_smart_kitchen_initiative_20240311        |17481.89   |7           |2497.41      |35000  |17518.11|
|quarter_ecofriendly_kitchen_offer_20240512   

In [74]:
# 5. Metrics by month
(
    dates_agg
    .withColumn('revenue', F.col('unique_deals') * F.lit(5000))
    .withColumn('profit', F.col('revenue') - F.col('total_costs'))
    .drop('total_duration', 'total_views')
    .sort('month')
    .show()
)

+-------+-------------+--------------+--------------+------------+-----------+------------+-------------+-------+---------+
|  month|unique_visits|unique_clients|unique_submits|unique_deals|total_costs|total_clicks|avg_deal_cost|revenue|   profit|
+-------+-------------+--------------+--------------+------------+-----------+------------+-------------+-------+---------+
|2024-01|          204|           185|            62|          30|  111035.45|       10949|      3701.18| 150000| 38964.55|
|2024-02|          184|           164|            63|          39|  103068.78|        9261|      2642.79| 195000| 91931.22|
|2024-03|          182|           164|            60|          37|  101531.34|       10054|      2744.09| 185000| 83468.66|
|2024-04|          187|           166|            53|          28|  101512.77|        9829|      3625.46| 140000| 38487.23|
|2024-05|          188|           174|            65|          26|   94367.89|       10027|      3629.53| 130000| 35632.11|
|2024-06

In [75]:
# 6. How much money was spent on adv in total for the year?
dates_agg.select(F.sum('total_costs')).show()

+----------------+
|sum(total_costs)|
+----------------+
|      1325281.29|
+----------------+



In [76]:
customer_detailed.unpersist()
campaigns_agg.unpersist()
dates_agg.unpersist()

DataFrame[month: string, unique_visits: bigint, unique_clients: bigint, unique_submits: bigint, unique_deals: bigint, total_costs: decimal(29,2), total_clicks: bigint, total_views: bigint, total_duration: bigint, avg_deal_cost: decimal(19,2)]

In [77]:
spark.stop()