# OBJECTIVES


*Background:*
The company has collected a comprehensive set of sales data across various dimensions, including products, customers, regions, and time periods. The goal is to gain valuable insights into the sales performance and make data-driven decisions to optimize revenue and enhance overall business strategy.

*Objectives:*

1. **Sales Performance Overview:**
   - Understand the historical sales trends and patterns to inform future business strategies.

2. **Product Analysis:**
   - Identify top-performing products and product categories.
   - Explore opportunities for product portfolio enhancement and marketing strategies.

3. **Customer Segmentation:**
   - Segment customers based on their purchasing behavior and preferences.
   - Tailor marketing and engagement strategies for different customer segments.

4. **Geographical Analysis:**
   - Evaluate sales performance across different regions.
   - Identify regions with untapped potential or areas requiring special attention.

5. **Time-based Analysis:**
   - Analyze sales trends over different time dimensions (daily, monthly, yearly).
   - Identify peak sales periods and strategize inventory and marketing efforts accordingly.

6. **Promotion and Reseller Impact:**
   - Evaluate the effectiveness of promotions on sales.
   - Assess the contribution of different resellers to overall sales and optimize partnerships.

7. **Financial Insights:**
   - Calculate key financial metrics such as revenue, profit margins, and return on investment.
   - Identify opportunities for cost optimization and revenue growth.

*Outcome:*
By addressing the objectives outlined above, the company aims to enhance its understanding of the sales landscape, identify growth opportunities, and make informed decisions to improve overall business performance.

# data importation
In setting up the data importation process, I began by initializing a Spark session, essentially creating a connection to the Spark framework for data processing. I specified the folder path where the CSV files are located, and I compiled a list of the CSV files I intended to import, including "DimCurrency.csv," "DimCustomer.csv," "DimProduct.csv," "DimGeography.csv," "DimPromotion.csv," and "DimSalesTerritory.csv."

For each CSV file in the list, I used the `spark.read.csv` method to read the contents into a Spark DataFrame. The `header=True` parameter was set to indicate that the first row of each CSV file contains column names, and `inferSchema=True` was used to let Spark automatically infer the data types of the columns. After loading each DataFrame, I created a variable for it, naming it according to the base name of the corresponding CSV file without the file extension.

This approach allows for convenient referencing of each DataFrame using variable names aligned with the file names. For instance, the `DimCurrency` DataFrame holds information about currencies, and `DimCustomer` contains details about customers. This naming convention facilitates subsequent analyses aligned with the objectives we established earlier, such as exploring product-centric data, customer segmentation, geographical analysis, and various time-based and financial insights. The groundwork has been laid for a comprehensive sales analysis, with the ability to delve into each aspect individually or combine dimensions for more complex analyses.

In [1]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

# Specify the folder path
folder_path = "C:/Users/neste/OneDrive/Desktop/karanja/DataSet_final/DataSet_final"

# List of CSV files to load
files_to_load = [
    "DimCurrency.csv",
    "DimCustomer.csv",
    "DimProduct.csv",
    "DimGeography.csv",
    "DimPromotion.csv",
    "DimSalesTerritory.csv",
    "FactInternetSales.csv"
]

# Load each CSV file into a Spark DataFrame and assign a name corresponding to the file name
for file in files_to_load:
    file_path = f"{folder_path}/{file}"
    dataframe_name = file.split('.')[0]  # Use the file name without extension as the DataFrame name
    globals()[dataframe_name] = spark.read.csv(file_path, header=True, inferSchema=True)

# Display the first 5 rows of each DataFrame
DimCurrency.show(5)
DimCustomer.show(5)
DimProduct.show(5)
DimGeography.show(5)
DimPromotion.show(5)
DimSalesTerritory.show(5)


+-----------+--------------------+--------------+
|CurrencyKey|CurrencyAlternateKey|  CurrencyName|
+-----------+--------------------+--------------+
|          1|                 AFA|       Afghani|
|          2|                 DZD|Algerian Dinar|
|          3|                 ARS|Argentine Peso|
|          4|                 AMD| Armenian Dram|
|          5|                 AWG|Aruban Guilder|
+-----------+--------------------+--------------+
only showing top 5 rows

+-----------+------------+--------------------+-----+---------+----------+--------+---------+---------+-------------+------+------+--------------------+------------+-------------+--------------------+----------------+----------------+---------------+-----------------+-----------------+----------------+--------------+---------------+-------------------+------------+-------------------+-----------------+---------------+
|CustomerKey|GeographyKey|CustomerAlternateKey|Title|FirstName|MiddleName|LastName|NameStyle|BirthDate|

# DATA CLEANING

In the data cleaning process, I carefully curated the dataset to enhance its quality and prepare it for meaningful analysis. This step is essential because raw data often contains inconsistencies, missing values, and irrelevant information that can impede accurate insights. To begin, I loaded the relevant datasets, including `FactInternetSales`, and joined them with dimension tables such as `DimCustomer`, `DimProduct`, `DimPromotion`, `DimCurrency`, `DimSalesTerritory`, and eventually `DimGeography`. This amalgamation of data provided a comprehensive view of sales transactions, customer details, product information, and contextual factors.

As part of the cleaning process, I identified specific columns that were deemed unnecessary for the intended analysis. These columns, including keys and redundant identifiers, were systematically dropped from the dataset to streamline it for focused exploration. By excluding irrelevant information, the resulting DataFrame, named `data_clean`, now offers a more concise and targeted perspective on sales data. This curated dataset serves as the foundation for subsequent analyses, enabling more efficient queries, insightful visualizations, and a deeper understanding of sales performance.

The cleaning process is pivotal in ensuring data accuracy, reliability, and relevance. It facilitates a smoother analytical workflow, allowing for a more efficient extraction of meaningful patterns and trends. Moreover, it enhances the interpretability of the data, making it easier to draw actionable conclusions and make informed business decisions. As I move forward with the analysis, the meticulously cleaned dataset, now named `data_clean`, will be instrumental in uncovering valuable insights related to product performance, customer behavior, geographical trends, and various other dimensions outlined in the initial objectives.

The meticulous data cleaning process aligns seamlessly with the objectives outlined at the beginning of our analysis journey. By selectively removing extraneous columns and redundant information, the cleaned dataset, now designated as `data_clean`, is tailored to address specific facets of the sales analysis objectives. For instance, the removal of irrelevant columns streamlines the exploration of product-centric data, enabling a closer examination of top-selling products and their associated attributes.

Furthermore, the cleaned dataset is instrumental in customer segmentation analysis. The exclusion of unnecessary customer details not only enhances data clarity but also focuses the analysis on key factors influencing customer behavior. Geographical analysis is also streamlined, as the cleaning process ensures that only pertinent location-related information remains in the dataset, facilitating a more precise examination of sales patterns across different regions.

The data cleaning steps, performed with the objectives in mind, contribute to the overall effectiveness of subsequent analyses. The resulting `data_clean` dataset provides a solid foundation for exploring time-based trends, evaluating the impact of promotions, assessing reseller contributions, and calculating financial metrics. The alignment of the cleaning process with the objectives ensures that the subsequent analyses are not only accurate but also directly contribute to achieving the overarching goals of gaining actionable insights into sales performance and making informed business decisions.

In [9]:

# Joining FactInternetSales with DimCustomer on the CustomerKey column
joined_df = FactInternetSales.join(DimCustomer, FactInternetSales.CustomerKey == DimCustomer.CustomerKey, "inner")

# Joining FactInternetSales with DimProduct on the ProductKey column
joined_df = FactInternetSales.join(DimProduct, FactInternetSales.ProductKey == DimProduct.ProductKey, "inner")

# Joining FactInternetSales with DimPromotion on PromotionKey column
joined_df = FactInternetSales.join(DimPromotion, FactInternetSales.PromotionKey == DimPromotion.PromotionKey, "inner")

# Joining with DimCurrency on CurrencyKey column
joined_df = joined_df.join(DimCurrency, joined_df.CurrencyKey == DimCurrency.CurrencyKey, "inner")

# Joining with DimSalesTerritory on SalesTerritoryKey column
joined_df = joined_df.join(DimSalesTerritory, joined_df.SalesTerritoryKey == DimSalesTerritory.SalesTerritoryKey, "inner")


# Joining FactInternetSales with DimCustomer on CustomerKey column
joined_df = FactInternetSales.join(DimCustomer, FactInternetSales.CustomerKey == DimCustomer.CustomerKey, "inner")

# Joining joined_df with DimGeography on GeographyKey column
joined_df = joined_df.join(DimGeography, joined_df.GeographyKey == DimGeography.GeographyKey, "inner")

# List of columns to drop
columns_to_drop = [
    "ProductKey", "DueDateKey", "ShipDateKey", "CustomerKey", "PromotionKey", 
    "CurrencyKey", "SalesTerritoryKey", "SalesOrderNumber", "SalesOrderLineNumber", 
    "RevisionNumber", "OrderQuantity", "CarrierTrackingNumber", "CustomerPONumber", 
    "OrderDate", "DueDate", "ShipDate", "CustomerKey", "GeographyKey", 
    "CustomerAlternateKey", "Title", "FirstName", "MiddleName", "LastName", 
    "NameStyle", "EmailAddress", "SpanishEducation", "FrenchEducation", 
    "SpanishOccupation", "FrenchOccupation", "AddressLine2", "Phone", 
    "DateFirstPurchase", "GeographyKey", "StateProvinceCode", 
    "StateProvinceName", "CountryRegionCode", "SpanishCountryRegionName", 
    "FrenchCountryRegionName", "SalesTerritoryKey", "IpAddressLocator"
]

# Dropping the specified columns and naming the cleaned DataFrame as data_clean
data_clean = joined_df.drop(*columns_to_drop)

# Displaying the cleaned DataFrame
data_clean.show(5)



+------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+----------+-------------+------+------+------------+-------------+--------------------+----------------+-----------------+--------------+---------------+-------------------+---------------+-------------+------------------------+----------+
|OrderDateKey|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight| BirthDate|MaritalStatus|Suffix|Gender|YearlyIncome|TotalChildren|NumberChildrenAtHome|EnglishEducation|EnglishOccupation|HouseOwnerFlag|NumberCarsOwned|       AddressLine1|CommuteDistance|         City|EnglishCountryRegionName|PostalCode|
+------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+----------+-------------+------+------+------------+-------------+--------------------+----------

# Date wrangling

Navigating through the dataset's intricacies, I addressed the unconventional format of the `OrderDateKey` column, opting to convert it into a recognizable string format to facilitate subsequent date functions. Correcting the explicit type conversion, I successfully transformed the column, laying the groundwork for further temporal analysis.

Building on this foundation, the creation of a new `OrderDate` column emerged, capturing the temporal aspect crucial for our problem statement's emphasis on understanding sales trends over time. This transformation set the stage for a nuanced exploration of temporal patterns, aligning precisely with the problem statement's call for a comprehensive analysis of sales data.

Expanding the temporal granularity, I extracted the year, month, and day of the week from the `OrderDate` column. This intricate yet purposeful step allowed for a detailed examination of sales trends, ensuring a more profound understanding of temporal patterns in the dataset.

In essence, the date wrangling process unfolded strategically, transforming unintuitive date representations into meaningful temporal information. This alignment with the problem statement ensures that subsequent analyses will yield insights directly relevant to the overarching objectives of understanding and optimizing sales performance.

In [10]:
from pyspark.sql import functions as F

# Assuming data_clean is the DataFrame containing the OrderDateKey column
data_clean = data_clean.withColumn("OrderDateKey", F.col("OrderDateKey").cast("string"))

# Applying the date transformation
data_clean = data_clean.withColumn(
    "OrderDate",
    F.from_unixtime(F.unix_timestamp(F.col("OrderDateKey"), "yyyyMMdd")).cast("date")
)

# Dropping the original OrderDateKey column
data_clean = data_clean.drop("OrderDateKey")


# Assuming data_clean is the DataFrame containing the OrderDate column
data_clean = data_clean.withColumn("OrderYear", F.year("OrderDate"))
data_clean = data_clean.withColumn("OrderMonth", F.month("OrderDate"))
data_clean = data_clean.withColumn("OrderDayOfWeek", F.dayofweek("OrderDate"))


# Convert BirthDate to a valid date format
data_clean = data_clean.withColumn("BirthDate", F.to_date("BirthDate", "M/d/yyyy"))

# Drop rows with null values in BirthDate
data_clean = data_clean.filter(F.col("BirthDate").isNotNull())

# Calculate age by subtracting BirthDate from the current date
data_clean = data_clean.withColumn("Age", F.datediff(F.current_date(), "BirthDate") / 365.25)

# Drop the original BirthDate and OrderDate columns
data_clean = data_clean.drop("BirthDate", "OrderDate")

# Round off the Age column to whole numbers
data_clean = data_clean.withColumn("Age", F.round("Age"))

# Displaying the updated DataFrame
data_clean.show(5)




+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+-------------+------+------+------------+-------------+--------------------+----------------+-----------------+--------------+---------------+-------------------+---------------+-------------+------------------------+----------+---------+----------+--------------+----+
|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight|MaritalStatus|Suffix|Gender|YearlyIncome|TotalChildren|NumberChildrenAtHome|EnglishEducation|EnglishOccupation|HouseOwnerFlag|NumberCarsOwned|       AddressLine1|CommuteDistance|         City|EnglishCountryRegionName|PostalCode|OrderYear|OrderMonth|OrderDayOfWeek| Age|
+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+-------------+------+------+------------+-------------+--------------------+

# Data Encoding

In the data encoding process, I utilized PySpark's StringIndexer to assign unique numerical indices to categorical columns, and then applied a custom mapping using dense_rank() to represent those indices as sequential integers. This encoding scheme ensures that each distinct string value in the categorical columns is associated with a specific integer, allowing for a numerical representation of categorical data.

The rationale behind this encoding aligns with the analysis objectives, particularly in the context of machine learning or statistical analysis. Many machine learning algorithms require numerical input, and encoding categorical variables in this manner facilitates the application of these algorithms to the dataset. It simplifies the interpretation of categorical information by transforming it into a format that can be effectively processed and analyzed by various analytical models.

By conforming to this encoding strategy, the data becomes more amenable to statistical analyses, predictive modeling, and other machine learning techniques that may be employed to gain insights into the sales data. The sequential integers assigned to unique values in the categorical columns maintain their order, preserving inherent relationships among categories. This structured representation of categorical data contributes to the effectiveness and interpretability of subsequent analyses, ultimately supporting the achievement of the predefined analysis objectives.

In [15]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F

# Assuming data_clean is the DataFrame containing the specified categorical columns
categorical_columns = ["MaritalStatus", "Suffix", "Gender", "EnglishEducation", "EnglishOccupation", "AddressLine1", "CommuteDistance", "City", "EnglishCountryRegionName", "PostalCode"]

# Apply StringIndexer with handleInvalid="keep" to obtain indexed values
indexers = [StringIndexer(inputCol=column, outputCol=f"{column}_encoded", handleInvalid="keep") for column in categorical_columns]
indexed_data = Pipeline(stages=indexers).fit(data_clean).transform(data_clean)

# Create a mapping of unique values to sequential integers
mapping_exprs = [f"dense_rank() OVER (ORDER BY {col}) AS {col}_encoded" for col in categorical_columns]

# Apply the mapping expressions to replace indexed values with sequential integers
data_encod = indexed_data.selectExpr("*", *mapping_exprs)

# Drop the original categorical columns
data_encod = data_encod.drop(*categorical_columns)

# Displaying the updated DataFrame
data_encod.show(5)


+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+------------+-------------+--------------------+--------------+---------------+---------+----------+--------------+----+---------------------+--------------+--------------+------------------------+-------------------------+--------------------+-----------------------+------------+--------------------------------+------------------+---------------------+--------------+--------------+------------------------+-------------------------+--------------------+-----------------------+------------+--------------------------------+------------------+
|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight|YearlyIncome|TotalChildren|NumberChildrenAtHome|HouseOwnerFlag|NumberCarsOwned|OrderYear|OrderMonth|OrderDayOfWeek| Age|MaritalStatus_encoded|Suffix_encoded|Gender_encoded|EnglishEducation_encoded|

# VISUALIZATIONS


In [17]:
import pandas as pd
import plotly.express as px

# Assuming data_clean is a PySpark DataFrame
# Convert it to a Pandas DataFrame
data_clean_pd = data_clean.toPandas()

# Replace 'OrderYear', 'OrderMonth', 'OrderDayOfWeek' with the actual column names in your DataFrame
order_year_counts = data_clean_pd['OrderYear'].value_counts()
order_month_counts = data_clean_pd['OrderMonth'].value_counts()
order_day_of_week_counts = data_clean_pd['OrderDayOfWeek'].value_counts()

# Create a pie chart for OrderYear
fig1 = px.pie(order_year_counts, names=order_year_counts.index, title='Order Year')

# Create a pie chart for OrderMonth
fig2 = px.pie(order_month_counts, names=order_month_counts.index, title='Order Month')

# Create a pie chart for OrderDayOfWeek
fig3 = px.pie(order_day_of_week_counts, names=order_day_of_week_counts.index, title='Order Day of Week')

# Display the charts
fig1.show()
fig2.show()
fig3.show()
