# PROBLEM STATEMENT

In the complex realm of our extensive data ecosystem, where dimensions such as Geography, Account, Currency, Customer, Date, Department Group, Organization, Product, Product Category, Product Subcategory, Promotion, Reseller, Sales Reason, Sales Territory, Scenario, and factual elements like Call Center, Currency Rate, and Internet Sales converge, my mission centers on crafting a targeted advertising strategy. Specifically, I aim to tap into the nuanced characteristics exhibited by real-world customers who have previously engaged in transactions with our organization.

Embarking on this journey, my focus shifts to conducting an in-depth segmentation of our customer base. This entails a meticulous analysis of real-world demographics, purchasing patterns, product preferences, and geographical nuances, identifying distinct groups that mirror tangible traits observed in our customer community. Furthermore, I aspire to scrutinize the historical buying behavior of actual customers, unraveling insights into the types of products they favor, average transaction values, and the frequency of their purchases. It is through the discernment of such real-world patterns that the foundation for our targeted advertising endeavors will be laid.

Predictive modeling becomes pivotal in this endeavor. By harnessing advanced machine learning algorithms, I aim to forecast future buying behavior based on our repository of real-world historical data. This predictive capability empowers us to identify real-world customers who are more likely to engage in future transactions, facilitating a proactive approach to advertising that mirrors the dynamics of our actual market.

The subsequent step involves crafting real-world, personalized advertising campaigns tailored to the identified customer segments. From targeted promotions to product recommendations and incentives, these campaigns will be designed with precision, aligning seamlessly with the actual preferences and behaviors of specific customer groups. Additionally, an optimization of real-world advertising channels will be undertaken, discerning the most effective mediums for reaching and engaging with our target audience, whether through social media, email marketing, online platforms, or other channels that resonate with our genuine customer base.

As a meticulous architect of this real-world strategy, I am committed to establishing robust metrics and Key Performance Indicators (KPIs) for measuring the effectiveness of our advertising campaigns in the context of actual market dynamics. Regular monitoring and assessment will guide real-world, data-driven adjustments, ensuring a continuous cycle of improvement and alignment with our broader organizational goals. This strategic pursuit not only enriches our advertising efforts but also significantly contributes to the real-world success of our overarching projects within the genuine market landscape.

# RESEARCH QUESTIONS

1. **Customer Segmentation Dynamics:**
   - How can real-world customer segmentation based on demographics, purchasing patterns, and geographic data be optimized to identify distinct groups with shared characteristics within our organization's customer base?

2. **Historical Buying Behavior Analysis:**
   - What patterns emerge from the historical buying behavior of real-world customers, specifically in terms of favored product types, average transaction values, and purchasing frequency, and how can these insights inform targeted advertising strategies?

3. **Predictive Modeling for Future Transactions:**
   - How effectively can advanced machine learning algorithms leverage our real-world historical data to predict future buying behavior, and what are the key factors that contribute to the accuracy of these predictions in the context of our specific market?

4. **Effectiveness of Personalized Advertising Campaigns:**
   - In the real-world scenario, how do personalized advertising campaigns, crafted based on the preferences and behaviors of identified customer segments, impact customer engagement and conversion rates across various advertising channels, and what adjustments can be made to enhance their effectiveness?

# OBJECTIVES

1. **Refine Customer Segmentation:**
   - **Objective:** Conduct an in-depth analysis of real-world customer data to enhance the precision of segmentation based on demographics, purchasing patterns, and geographical information. The goal is to identify distinct customer groups with shared characteristics within our organization's diverse customer base.

2. **Uncover Historical Buying Patterns:**
   - **Objective:** Investigate the historical buying behavior of real-world customers, focusing on preferences, average transaction values, and purchasing frequency. The objective is to extract meaningful patterns and insights that can be utilized to inform targeted advertising strategies and mirror actual market dynamics.

3. **Optimize Predictive Modeling:**
   - **Objective:** Employ advanced machine learning algorithms to effectively leverage our rich repository of real-world historical data. The aim is to enhance the accuracy of predicting future buying behavior, with a focus on understanding the key factors that contribute to the success of predictive models within our specific market context.

4. **Evaluate Personalized Campaign Impact:**
   - **Objective:** Implement and assess the impact of real-world, personalized advertising campaigns crafted based on the preferences and behaviors of identified customer segments. The objective is to measure customer engagement and conversion rates across various advertising channels, providing insights for continuous improvement and optimization.

# DATA CLEANING  AND IMPORTATION 

The provided code for importing data into PySpark DataFrames involves a systematic approach to loading multiple CSV files, each representing different dimensions and facts of a business scenario. This process is facilitated by dynamically creating PySpark DataFrames for each CSV file found in a specified folder. This ensures that each DataFrame corresponds to a specific aspect of the business, such as geography, accounts, currency, customers, and various others.

By initializing a Spark session and using a loop to read each CSV file into a DataFrame, the code addresses the challenge of efficiently handling diverse data dimensions. The file names (without extensions) are utilized as the variable names for these DataFrames, allowing for a clear and organized representation of the data in the global environment. This aligns with the problem statement of targeted advertising based on customer characteristics, as it lays the foundation for analyzing and understanding customer behavior across various dimensions.

The approach taken in the code directly contributes to achieving the stated objectives. It enables a comprehensive segmentation of customers by providing distinct DataFrames for different dimensions like geography, products, promotions, and sales territories. The historical buying behavior analysis is facilitated through the creation of DataFrames like 'FactInternetSales,' allowing for detailed examination of product preferences, transaction values, and frequency of purchases. The predictive modeling objective is also supported by the code, as it dynamically loads DataFrames that can be utilized for machine learning algorithms, forecasting future buying behavior based on historical data.

Moreover, the code's capability to create personalized advertising campaigns is enhanced by having separate DataFrames for promotions ('DimPromotion') and products ('DimProduct'). This structure allows for targeted promotions and recommendations aligned with specific customer segments. The optimization of advertising channels is supported by the diverse DataFrames, enabling the identification of effective channels through which to reach and engage with different customer groups. Overall, the code provides a solid foundation for the subsequent stages of the analysis, aligning with the broader organizational goals and contributing to the success of the overarching projects.


In [1]:
from pyspark.sql import SparkSession
import os
from pyspark.sql import DataFrame

# Initialize a Spark session
spark = SparkSession.builder.appName("CSVLoaderScript").getOrCreate()

# Specify the folder path containing the CSV files
folder_path = r'C:\Users\neste\OneDrive\Desktop\karanja\DataSet_final\DataSet_final'

# Get a list of all CSV files in the folder
csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]

# Load each CSV file into a DataFrame and create a global variable with its name
for csv_file in csv_files:
    # Use the file name (without extension) as the variable name
    df_name = os.path.splitext(csv_file)[0]
    # Read the CSV file into a DataFrame
    globals()[df_name] = spark.read.csv(os.path.join(folder_path, csv_file), header=True, inferSchema=True)

# Example: Show the contents of each DataFrame
for df_name in globals():
    if isinstance(globals()[df_name], DataFrame):
        print(f"Showing contents of DataFrame: {df_name}")
        globals()[df_name].show()


Showing contents of DataFrame: DimAccount
+----------+----------------+-----------------------+-----------------------------+--------------------+-----------+--------+-------------+---------+-------------------+
|AccountKey|ParentAccountKey|AccountCodeAlternateKey|ParentAccountCodeAlternateKey|  AccountDescription|AccountType|Operator|CustomMembers|ValueType|CustomMemberOptions|
+----------+----------------+-----------------------+-----------------------------+--------------------+-----------+--------+-------------+---------+-------------------+
|         1|            NULL|                      1|                         NULL|       Balance Sheet|       NULL|       ~|         NULL| Currency|               NULL|
|         2|               1|                     10|                            1|              Assets|     Assets|       +|         NULL| Currency|               NULL|
|         3|               2|                    110|                           10|      Current Assets|    

In [2]:
# Show the contents of each DataFrame
df_list = [DimGeography, DimAccount, DimCurrency, DimCustomer, DimDate,
           DimDepartmentGroup, DimOrganization, DimProduct, DimProductCategory,
           DimProductSubcategory, DimPromotion, DimReseller, DimSalesReason,
           DimSalesTerritory, DimScenario, FactCallCenter, FactCurrencyRate,
           FactInternetSales]

for df in df_list:
    print(f"Showing contents of DataFrame: {df}")
    df.show()


Showing contents of DataFrame: DataFrame[GeographyKey: int, City: string, StateProvinceCode: string, StateProvinceName: string, CountryRegionCode: string, EnglishCountryRegionName: string, SpanishCountryRegionName: string, FrenchCountryRegionName: string, PostalCode: string, SalesTerritoryKey: int, IpAddressLocator: string]
+------------+--------------+-----------------+-----------------+-----------------+------------------------+------------------------+-----------------------+----------+-----------------+----------------+
|GeographyKey|          City|StateProvinceCode|StateProvinceName|CountryRegionCode|EnglishCountryRegionName|SpanishCountryRegionName|FrenchCountryRegionName|PostalCode|SalesTerritoryKey|IpAddressLocator|
+------------+--------------+-----------------+-----------------+-----------------+------------------------+------------------------+-----------------------+----------+-----------------+----------------+
|           1|    Alexandria|              NSW|  New South Wal

# DATA PREPARATIONS

In tackling the challenge of optimizing advertisement targeting for customers who have previously made purchases, I employed a comprehensive approach to data preparation using PySpark. My initial step involved meticulously selecting pertinent columns from various Dimension DataFrames, such as product details, customer information, promotions, currency, sales territories, product subcategories, and geographical data. This deliberate column selection was essential to streamline the dataset for subsequent analysis.

Next, the code executed a series of join operations between the primary transaction data (FactInternetSales) and each of the selected Dimension DataFrames. These joins were orchestrated based on specific keys such as ProductKey, CustomerKey, PromotionKey, CurrencyKey, SalesTerritoryKey, ProductSubcategoryKey, and GeographyKey. By intertwining information from these diverse dimensions, I aimed to create a consolidated DataFrame, named 'prepared_data,' which encapsulates a holistic view of customer transactions along with relevant contextual details.

To enhance the temporal aspect of the dataset, I converted the numeric representation of the order dates to a more interpretable DateType. This facilitated a nuanced understanding of temporal patterns in customer transactions, a crucial aspect for targeted advertising. The resulting 'OrderDate' column thus captured the temporal dimension of each transaction.

Moreover, I recognized the need for code modularity and reusability, leading me to create the 'perform_joins' function. This function encapsulates the series of join operations, making the code more readable, maintainable, and adaptable for future analyses. This modular design is akin to creating a versatile toolset for navigating complex datasets in subsequent projects.

In addition, the code went beyond mere data integration by extracting finer details about each transaction. The creation of 'OrderMonth' and 'OrderYear' columns was an intentional step to facilitate a granular analysis of purchase trends over time. This temporal dissection is instrumental in crafting targeted advertising strategies that align with customer behavior patterns across months and years.

In essence, the code represents a sophisticated yet user-friendly data preparation pipeline, meticulously designed to transform raw datasets into a refined 'prepared_data' DataFrame. This prepared dataset serves as a robust foundation for subsequent analyses, enabling me to derive actionable insights and optimize advertisement targeting strategies for customers based on their historical purchase characteristics.


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, expr

# Select specific columns from DimProduct
product_columns = ["ProductKey", "ProductSubcategoryKey", "EnglishProductName", "FinishedGoodsFlag",
                    "Color", "SafetyStockLevel", "ReorderPoint", "SizeRange", "DaysToManufacture"]

dim_product_selected = DimProduct.select(product_columns)

# Perform the join operation on ProductKey
prepared_data = FactInternetSales.join(dim_product_selected, "ProductKey")


# Select specific columns from DimCustomer, including DateFirstPurchase
customer_columns = ["CustomerKey", "GeographyKey", "NameStyle", "BirthDate", "MaritalStatus", "Gender",
                    "YearlyIncome", "TotalChildren", "NumberChildrenAtHome", "EnglishEducation",
                    "EnglishOccupation", "HouseOwnerFlag", "NumberCarsOwned", "CommuteDistance",
                    "DateFirstPurchase"]

dim_customer_selected = DimCustomer.select(customer_columns)

# Perform the join operation on CustomerKey
prepared_data = prepared_data.join(dim_customer_selected, "CustomerKey")

# Select specific columns from DimPromotion
promotion_columns = ["PromotionKey", "EnglishPromotionName", "DiscountPct", "EnglishPromotionCategory", "MinQty"]

dim_promotion_selected = DimPromotion.select(promotion_columns)

# Perform the join operation on PromotionKey with the existing prepared_data DataFrame
prepared_data = prepared_data.join(dim_promotion_selected, "PromotionKey")


# Select specific columns from DimCurrency
currency_columns = ["CurrencyKey", "CurrencyName"]

dim_currency_selected = DimCurrency.select(currency_columns)

# Perform the join operation on CurrencyKey with the existing prepared_data DataFrame
prepared_data = prepared_data.join(dim_currency_selected, "CurrencyKey")


# Select specific columns from DimSalesTerritory
sales_territory_columns = ["SalesTerritoryKey", "SalesTerritoryRegion", "SalesTerritoryCountry"]

dim_sales_territory_selected = DimSalesTerritory.select(sales_territory_columns)

# Perform the join operation on SalesTerritoryKey with the existing prepared_data DataFrame
prepared_data = prepared_data.join(dim_sales_territory_selected, "SalesTerritoryKey")


# Select specific columns from DimProductSubcategory
product_subcategory_columns = ["ProductSubcategoryKey", "EnglishProductSubcategoryName"]

dim_product_subcategory_selected = DimProductSubcategory.select(product_subcategory_columns)

# Perform the join operation on ProductSubcategoryKey with the existing prepared_data DataFrame
prepared_data = prepared_data.join(dim_product_subcategory_selected, "ProductSubcategoryKey")


# Select specific columns from DimGeography
geography_columns = ["GeographyKey", "City", "StateProvinceName"]

dim_geography_selected = DimGeography.select(geography_columns)

# Perform the join operation on GeographyKey with the existing prepared_data DataFrame
prepared_data = prepared_data.join(dim_geography_selected, "GeographyKey")

# Convert the numeric representation to a string and then to a DateType
prepared_data = prepared_data.withColumn(
    "OrderDate",
    to_date(expr("cast(OrderDateKey as string)"), "yyyyMMdd")
)


from pyspark.sql import DataFrame
from pyspark.sql.functions import col, to_date, expr, month, year
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, month, year, expr

def perform_joins(fact_df: DataFrame, *dimension_dfs: DataFrame, join_keys: list, date_column: str = None):
    """
    Perform a series of joins between a Fact DataFrame and multiple Dimension DataFrames.

    Parameters:
    - fact_df: The Fact DataFrame to start with.
    - dimension_dfs: A variable number of Dimension DataFrames to join with the Fact DataFrame.
    - join_keys: A list of join keys to be used in the order of the joins.
    - date_column: Optional. If provided, convert this numeric column to DateType.

    Returns:
    - The resulting DataFrame after all the specified joins.
    """
    prepared_data = fact_df

    for dimension_df, join_key in zip(dimension_dfs, join_keys):
        prepared_data = prepared_data.join(dimension_df, join_key)

    if date_column:
        # Convert the numeric representation to a string and then to a DateType
        prepared_data = prepared_data.withColumn(
            "OrderDate",
            to_date(expr(f"cast({date_column} as string)"), "yyyyMMdd")
        )

    return prepared_data

# Example usage:
prepared_data = perform_joins(
    FactInternetSales,
    DimProduct,
    DimCustomer,
    DimPromotion,
    DimCurrency,
    DimSalesTerritory,
    DimProductSubcategory,
    DimGeography,
    join_keys=["ProductKey", "CustomerKey", "PromotionKey", "CurrencyKey", "SalesTerritoryKey", "ProductSubcategoryKey", "GeographyKey"],
    date_column="OrderDateKey"
)

# Extract order month and order year
prepared_data = prepared_data.withColumn("OrderMonth", month("OrderDate"))
prepared_data = prepared_data.withColumn("OrderYear", year("OrderDate"))

# Show the resulting DataFrame
prepared_data.show()


+------------+---------------------+-----------------+-----------+------------+-----------+----------+------------+----------+-----------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+---------------------+----------------+----------+-------+--------+-------------------+---------------------+-------------------+--------------------+--------------------+--------------------+------------+-----------------+------+----------------+------------+---------+----+---------+------+-----------------+-----------+-----------+-----+-----+------------+--------------------+----------------+----------------+------+--------------------+-----+---------+----------+---------+---------+----------+-------------+------+------+--------------------+------------+-------------+--------------------+-------------------+--------------------+---------------+--------------

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, expr, month, year
from pyspark.sql import DataFrame
from typing import List

# Assume 'prepared_data' is the DataFrame obtained from the previous steps

columns_to_exclude = [
    "GeographyKey", "ProductSubcategoryKey", "SalesTerritoryKey", "CurrencyKey", "PromotionKey",
    "CustomerKey", "ProductKey", "OrderDateKey", "DueDateKey", "ShipDateKey", "SalesOrderNumber",
    "SalesOrderLineNumber", "RevisionNumber", "OrderQuantity", "CarrierTrackingNumber",
    "CustomerPONumber", "OrderDate", "DueDate", "ShipDate", "ProductAlternateKey",
    "SpanishProductName", "FrenchProductName", "EnglishDescription", "StartDate", "EndDate", "Status",
    "CustomerAlternateKey", "Title", "FirstName", "MiddleName", "LastName", "BirthDate", "EmailAddress",
    "SpanishEducation", "FrenchEducation", "EnglishOccupation", "SpanishOccupation", "FrenchOccupation",
    "AddressLine1", "AddressLine2", "Phone", "DateFirstPurchase", "SpanishPromotionName",
    "FrenchPromotionName", "EnglishPromotionType", "SpanishPromotionType", "FrenchPromotionType",
    "SpanishPromotionCategory", "FrenchPromotionCategory", "StartDate", "EndDate", "MaxQty",
    "CurrencyAlternateKey", "ProductCategoryKey", "StateProvinceCode", "CountryRegionCode",
    "SpanishCountryRegionName", "FrenchCountryRegionName", "PostalCode", "SalesTerritoryKey",
    "IpAddressLocator", "SalesTerritoryAlternateKey", "SalesTerritoryRegion",
    "ProductSubcategoryAlternateKey", "Suffix"
]

def exclude_columns(data_frame: DataFrame, columns_to_exclude: List[str]) -> DataFrame:
    """
    Exclude specified columns from a DataFrame.

    Parameters:
    - data_frame: The DataFrame to be processed.
    - columns_to_exclude: List of column names to be excluded.

    Returns:
    - The DataFrame with specified columns excluded.
    """
    cleaned_data = data_frame.drop(*columns_to_exclude)
    return cleaned_data

# Example usage:
# Assume 'spark' is your SparkSession
prepared_data = perform_joins(
    FactInternetSales,
    DimProduct,
    DimCustomer,
    DimPromotion,
    DimCurrency,
    DimSalesTerritory,
    DimProductSubcategory,
    DimGeography,
    join_keys=["ProductKey", "CustomerKey", "PromotionKey", "CurrencyKey", "SalesTerritoryKey", "ProductSubcategoryKey", "GeographyKey"],
    date_column="OrderDateKey"
)
 
# Apply the exclusion function
cleaned_data = exclude_columns(prepared_data, columns_to_exclude)

# Show the resulting cleaned DataFrame
cleaned_data.show()


+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+---------------------+-------------------+--------------------+------------+-----------------+------+----------------+------------+---------+----+---------+------+-----------------+-----------+-----------+-----+-----+------------+---------+-------------+------+------------+-------------+--------------------+-------------------+--------------+---------------+---------------+---------------------+--------------------+-----------+------------------------+------+--------------------+---------------------+-------------------+-----------------------------+-------------+-------------------+------------------------+
|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight|WeightUnitMeasureCode|SizeUnitMeasureCode|  EnglishProductName|StandardCost|FinishedGoodsFlag| Color|SafetyStockLevel|ReorderPoi

# DATA INDEXING 

To address the challenge of preparing the PySpark DataFrame for further analysis, I employed a robust approach by encoding categorical columns using the `StringIndexer` from PySpark MLlib. This strategy was implemented to enhance the model's ability to interpret categorical features accurately. Initially, I identified the categorical and non-categorical columns within the DataFrame, outlining a clear distinction between the two. The categorical columns, including product characteristics, customer details, and geographical information, were subjected to the StringIndexer transformation.

In the code, a series of StringIndexers were created, each assigned to a specific categorical column. These indexers were then organized into a coherent Pipeline, streamlining the transformation process. The pipeline was subsequently fitted and applied to the original DataFrame, resulting in a new DataFrame with indexed categorical columns. To ensure flexibility, the 'handleInvalid' parameter was set to "keep," allowing the model to handle unseen categories gracefully.

The final step involved selecting the non-categorical columns alongside the newly created indexed columns. This meticulous selection process facilitated the creation of a transformed DataFrame, ready for further analysis and model training. This approach not only addresses the problem statement by efficiently handling categorical data but also ensures a modular and comprehensible structure for future iterations of the data preparation pipeline.

In [10]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Categorical Columns Specification
categorical_columns = [
    "WeightUnitMeasureCode", "SizeUnitMeasureCode", "EnglishProductName",
    "FinishedGoodsFlag", "Color", "SizeRange", "ProductLine", "Class", "Style",
    "ModelName", "NameStyle", "MaritalStatus", "Gender", "EnglishEducation",
    "CommuteDistance", "EnglishPromotionName", "EnglishPromotionCategory",
    "SalesTerritoryCountry", "SalesTerritoryGroup", "EnglishProductSubcategoryName",
    "City", "StateProvinceName", "EnglishCountryRegionName", "CurrencyName"
]

# Identify non-categorical columns
non_categorical_columns = [col for col in df.columns if col not in categorical_columns]

# Create a StringIndexer for each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep") for col in categorical_columns]

# Create a Pipeline to apply the StringIndexers
pipeline = Pipeline(stages=indexers)

# Fit and transform the pipeline on the original DataFrame
transformed_data = pipeline.fit(df).transform(df)

# Select non-categorical columns and the indexed columns
selected_columns = non_categorical_columns + [f"{col}_index" for col in categorical_columns]
transformed_data = transformed_data.select(selected_columns)

# Show the result
transformed_data.show(truncate=False)


+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+------------+----------------+------------+---------+----+------+-----------------+-----------+------------+-------------+--------------------+--------------+---------------+---------------------+-----------+------+---------------------------+-------------------------+------------------------+-----------------------+-----------+---------------+-----------------+-----------+-----------+---------------+---------------+-------------------+------------+----------------------+---------------------+--------------------------+------------------------------+---------------------------+-------------------------+-----------------------------------+----------+-----------------------+------------------------------+------------------+
|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|TaxAmt  |Freight|StandardCost

In [11]:
len(transformed_data.columns)

49