In [0]:
# Install Dependency

%pip install dlt  # Package for data lineage tracking


## Mount path

In [0]:
storageAccountName = "#STORAGEACCOUNT#"
blobContainerName = "data"
storage_account_access_key= "#storage_account_key#"
Source_mount = "/mnt/Source"
Dest_mount = "/mnt/dlt_destination"

Source Mount Point

In [0]:
if not any(mount.mountPoint == "/mnt/Source" for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
      source = "wasbs://data@#STORAGEACCOUNT#.blob.core.windows.net/",
      mount_point = "/mnt/Source",
      extra_configs = {"fs.azure.account.key.#STORAGEACCOUNT#.blob.core.windows.net": "#storage_account_key#"})
else:
    print("Mount point already exists.")

Destination Mount Point

In [0]:
if not any(mount.mountPoint == "/mnt/dlt_destination" for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
      source = "wasbs://data@#STORAGEACCOUNT#.blob.core.windows.net/",
      mount_point = "/mnt/dlt_destination",
      extra_configs = {"fs.azure.account.key.#STORAGEACCOUNT#.blob.core.windows.net": "#storage_account_key#"})
else:
    print("Mount point already exists.")

In [0]:
# dbutils.fs.unmount('/mnt/dlt_destination')
# dbutils.fs.unmount('/mnt/Source')

#Campaign Analytics<br/>

1. **Usecase               :** Performing Campaign analytics on static campaign data coming from OneLake Bronze Layer.<br/>
2. **Notebook Summary      :** This notebook is a part of campaign analytics application which perform `campaign analytics using various pyspark capability`.<br/>
3. **Notebook Description  :** Performing Campaign Analytics on OneLake Bronze Layer Files.


###Feature List
1. Data Profiling
2. Total Revenue, Total Revenue Target & Profit 
3. Campaign Run by Per Week 
4. Total Profit by Country Per Week
5. Top Loss-Making Campaign 

The bronze data received for processing is already curated. So, we will derive gold tables from bronze tables.

### Import Libraries

In [0]:
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import mean as _mean
from pyspark.sql.functions import max as _max
from pyspark.sql.functions import min as _min
import pyspark.sql.functions as func
import pyspark.sql.functions as F
from pyspark.sql.functions import *
import dlt 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import random
import string

###Define the Schema for the input file

In [0]:
campaignSchema = StructType([    
    StructField("Region",StringType(),True),
    StructField("Country",StringType(),True),
    StructField("ProductCategory",StringType(),True),
    StructField("Campaign_ID",IntegerType(),True),    
    StructField("Campaign_Name",StringType(),True),
    StructField("Qualification",StringType(),True),
    StructField("Qualification_Number",StringType(),True),
    StructField("Response_Status",StringType(),True),
    StructField("Responses",FloatType(),True),
    StructField("Cost",FloatType(),True),
    StructField("Revenue",FloatType(),True),
    StructField("ROI",FloatType(),True),
    StructField("Lead_Generation",StringType(),True),
    StructField("Revenue_Target",FloatType(),True),
    StructField("Campaign_Tactic",StringType(),True),
    StructField("Customer_Segment",StringType(),True),
    StructField("Status",StringType(),True),
    StructField("Profit",FloatType(),True),
    StructField("Marketing_Cost",FloatType(),True),
    StructField("CampaignID",IntegerType(),True),
    StructField("CampDate",DateType(),True), 
    StructField("SORTED_ID",IntegerType(),True)])
    

### Load the Campaign Dataset from OneLake Bronze Layer

In [0]:
# Bronze Table Setup
@dlt.table(comment="Raw data", path = "/mnt/dlt_destination/bronze_campaign_data")
def bronze_campaign_data():
#   return (spark.table("campaign.campaign_source"))
  return (spark.read.format("csv").option("header",True).schema(campaignSchema).load("/mnt/Source/CampaignData/campaign-data.csv"))

### Total Revenue, Total Revenue Target & Profit

In [0]:
# Gold Table Setup
@dlt.table(comment="Aggregated data", path = "/mnt/dlt_destination/gold_country_wise_revenue")
def gold_country_wise_revenue():
    df = dlt.read("bronze_campaign_data").groupBy("Country","Region").agg(_sum("Revenue").alias("Total_Revenue"), _sum("Revenue_Target").alias("Total_Revenue_Target"),_sum("Profit").alias("Total_Profit"),_max("Cost").alias("Max_Cost"),_min("Cost").alias("Min_Cost"))
    df = df.withColumn("Total_Revenue", func.round(df["Total_Revenue"],2)).withColumn("Total_Revenue_Target", func.round(df["Total_Revenue_Target"], 2)).withColumn("Total_Profit", func.round(df["Total_Profit"], 2))
    return df

### Top Loss-Making Campaign

In [0]:
# Gold Table Setup
@dlt.table(comment="Aggregated data", path = "/mnt/dlt_destination/gold_Top_Loss_Making_Campaign")
def gold_Top_Loss_Making_Campaign():
    loss = dlt.read("bronze_campaign_data").select("Campaign_Name","Profit").filter(F.col("Profit") < 0)
    loss = loss.withColumn("Loss_Count", F.when((F.col('Profit') < 0 ) , F.lit(1)).otherwise(F.lit(0)))
#     loss = loss.groupBy('Campaign_Name').sum('Loss_Count')
    return loss

#  Retail Sales Data Preparation using Spark

Preparing retail data for training a regression model to predict total sales revenue of a product from a store using the following features: 
- Brand (The brand of the product)
- Quantity (Quantity of product purchased)
- Advert (Whether the product had an advertisement or not)
- Price (How much the product costs)

## Importing Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from io import BytesIO
from copy import deepcopy
from datetime import datetime
from dateutil import parser
import logging
from pyspark.sql.types import *

## Defining the schema for the data

In [0]:
Dataschema = StructType([
    StructField("ID", StringType()),
    StructField("WeekStarting", DateType()),
    StructField("Store", IntegerType()),
    StructField("Brand", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("Advert", IntegerType()),
    StructField("Price", FloatType()),
    StructField("Revenue", FloatType())
])


## Load the data from the source and perform the transformations

In [0]:
@dlt.table(comment="Raw data", path = "/mnt/dlt_destination/bronze_SalesTrans")    
def bronze_SalesTrans():
  return (spark.read.csv('/mnt/Source/StoreTransactionsData/SalesTransData.txt',schema=Dataschema))

In [0]:
@dlt.table(comment="Silver data", path = "/mnt/dlt_destination/silver_SalesTrans_data")
def silver_SalesTrans_data():
    pydf = dlt.read("bronze_SalesTrans").withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("WeekStarting")))
    return pydf

In [0]:
@dlt.table(comment="Silver data", path = "/mnt/dlt_destination/gold_SalesTrans_toprank")
def gold_SalesTrans_toprank():
    train = dlt.read("silver_SalesTrans_data").where("rank <= .8").drop("rank")
    return train
    
@dlt.table(comment="Silver data", path = "/mnt/dlt_destination/gold_SalesTrans_lowerrank")
def gold_SalesTrans_lowerrank():
    test = dlt.read("silver_SalesTrans_data").where("rank > .8").drop("rank")
    return test

#  Customer Churn Data Preparation using Spark

## Load the data from the source and perform the transformations

In [0]:
@dlt.table(comment="Raw data", path = "/mnt/dlt_destination/bronze_CustomerChurnTrans") 
def bronze_CustomerChurnTrans():
  return (spark.read.csv('/mnt/Source/CustomerChurnData/CustomerChurnData.csv',  header=True))

In [0]:
@dlt.table(comment="Silver data",  path = "/mnt/dlt_destination/silver_CustomerChurn_data")
def silver_CustomerChurn_data():
    pydf = dlt.read("bronze_CustomerChurnTrans")
    return pydf

The result after running DLT pipeline would look similar to the following screenshot.

![](https://stmsftbuild2024.blob.core.windows.net/dltimage/task-2.2.7.png)