# MonthlyProductSalesTrend	

## DESCRIPTION
Shows the product sales by month, seasonal analysis & trend

## STRATEGY


## REQUIRED DATA ELEMENTS


In [1]:
import findspark
findspark.init()

ModuleNotFoundError: No module named 'findspark'

In [17]:
from pyspark.sql import SparkSession
from faker import Faker
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import seaborn as sns
import matplotlib.pyplot as plt
import mysql.connector
from datetime import datetime, timedelta
from helpersP import *
from Data_Manipulations import *
import findspark
findspark.init()

# Initialising Faker for generating fake data
fake = Faker()

# Initialising SparkSession
spark=SparkSession.builder.appName('MonthlyProductSalesTrend').getOrCreate()

# Set the random seed for Faker
fake.seed_instance(0)

In [18]:
reader = DbReader(  host = "localhost",
                   user = "root", 
                   password = "pass", 
                   database = 'cpga_data',
                   spark=spark)
reader.connect()

In [19]:
# Transactions table
query = 'SELECT TransactionId, TransactionDate, ProductId, SaleAmount FROM transactions'
reader.execute_query(query)
transaction_df = reader.create_df()
transaction_df.show(5)

+--------------------+-------------------+---------+--------------------+
|       TransactionId|    TransactionDate|ProductId|          SaleAmount|
+--------------------+-------------------+---------+--------------------+
|88ef2101-7b7d-482...|2007-12-17 00:00:00|     4580|8444.220000000000...|
|6c2ebd06-1f5e-468...|2008-04-28 00:00:00|     1081|2589.170000000000...|
|4ffcee12-a847-484...|2018-02-19 00:00:00|    15704|7837.990000000000...|
|2d9d0010-39ab-43e...|2020-04-23 00:00:00|    12715|5833.820000000000...|
|8bd07027-0e52-4c1...|2005-09-17 00:00:00|     8509|2818.380000000000...|
+--------------------+-------------------+---------+--------------------+
only showing top 5 rows



In [20]:
reader.disconnect()

Disconnected from the database.


## PRE-CONDITION

In [21]:
# Transaction_ID serves as a primary key (Unique Identifier for every transaction)
# No duplicate values in the transaction ID
duplicate_counts = transaction_df.groupBy(F.col('TransactionId')).count().filter(F.col("count") > 1)
duplicate_counts.show()

+-------------+-----+
|TransactionId|count|
+-------------+-----+
+-------------+-----+



## PRE-PROCESS
Pre-process the data as per the requirements of the analysis

### DATA CLEANSING


## WHEN (TRAIN)


In [22]:
# Add the CurrentDate column
transaction_df = transaction_df.withColumn(
    'MonthEndDate',
    F.last_day(F.date_trunc('month', 'TransactionDate'))
)
transaction_df = transaction_df.withColumn('Month', F.month('TransactionDate'))
transaction_df = transaction_df.withColumn('Year', F.year('TransactionDate')).orderBy('MonthEndDate')

transaction_df.show()

+--------------------+-------------------+---------+--------------------+------------+-----+----+
|       TransactionId|    TransactionDate|ProductId|          SaleAmount|MonthEndDate|Month|Year|
+--------------------+-------------------+---------+--------------------+------------+-----+----+
|cf6b83c3-636d-4de...|2001-01-07 00:00:00|     9886|1564.800000000000...|  2001-01-31|    1|2001|
|a55527c0-f3a6-40e...|2001-01-30 00:00:00|    23607|751.7700000000000...|  2001-01-31|    1|2001|
|bcb54291-2dfa-4db...|2001-01-27 00:00:00|    18119|5584.850000000000...|  2001-01-31|    1|2001|
|642d6010-a6c4-443...|2001-01-22 00:00:00|     5454|1956.850000000000...|  2001-01-31|    1|2001|
|01da2ddb-3fe3-4b2...|2001-01-17 00:00:00|    23631|6987.590000000000...|  2001-01-31|    1|2001|
|bdbd6dd5-d0ad-4d5...|2001-01-01 00:00:00|    12050|8521.420000000000...|  2001-01-31|    1|2001|
|857f65f0-971f-4ed...|2001-01-18 00:00:00|      995|2804.470000000000...|  2001-01-31|    1|2001|
|5f6e6cb1-f101-4c9..

In [23]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def summary_stats(df, groupby_columns, value_column, count_column, mnemonic):

    grouped_data = df.groupBy(*groupby_columns)
    window_spec = Window.partitionBy(*groupby_columns)

    # Calculate the count, sum, average, minimum, and maximum for each group
    result = grouped_data.agg(
        F.count(F.col(count_column)).alias("countValue"),
        F.sum(F.col(value_column)).alias("amountValue"),
        F.avg(F.col(value_column)).alias("avgValue"),
        F.min(F.col(value_column)).alias("minValue"),
        F.max(F.col(value_column)).alias("maxValue")
    )

    result = result.withColumn("GroupByColumns", F.array([F.lit(c) for c in groupby_columns])) \
        .withColumn("GroupByValues", F.array(*groupby_columns)).withColumn('Mnemonic', F.lit(mnemonic))

    # Population variance and standard deviation
    mean_value = result.select(F.mean(F.col("amountValue")).alias("mean_value")).first()["mean_value"]
    squared_diff = F.pow(F.col("amountValue") - mean_value, 2)
    population_variance = F.avg(squared_diff).over(window_spec)
    population_stddev = F.sqrt(population_variance)

    result = result.withColumn("Variance", population_variance) \
        .withColumn("StdDev", population_stddev).drop(*groupby_columns)

    result = z_score(result, "amountValue")

    result = result.select(F.lit(None).alias("AnalysisDetailId"), 'Mnemonic', \
    "GroupByColumns", "GroupByValues", "countValue", "amountValue", "avgValue", "minValue",\
    "maxValue", "StdDev", "Variance", 'z_score', 'normalized_score')

    return result


In [24]:
result = summary_stats(transaction_df, ["ProductId", 'Month', 'Year'], "SaleAmount", 'TransactionId', 'MonthlyProductSalesTrend')
result.show()

+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+-------------------+
|AnalysisDetailId|            Mnemonic|      GroupByColumns|    GroupByValues|countValue|         amountValue|            avgValue|            minValue|            maxValue|            StdDev|            Variance|             z_score|   normalized_score|
+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+-------------------+
|            null|MonthlyProductSal...|[ProductId, Month...|   [100, 8, 2022]|         1|4455.880000000000...|4455.880000000000...|4455.880000000000...|4455.880000000000...| 573.2018198486818|  328560.32627784065|-0.19585824599649157|0

In [25]:
value = result.select("GroupByValues").collect()[0]["GroupByValues"]
value

['12936', '3', '2008']

In [26]:
final_df = z_score(result, "amountValue")
final_df.show(5)

+----------------+--------------------+--------------------+----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+-------------------+
|AnalysisDetailId|            Mnemonic|      GroupByColumns|   GroupByValues|countValue|         amountValue|            avgValue|            minValue|            maxValue|            StdDev|            Variance|             z_score|   normalized_score|
+----------------+--------------------+--------------------+----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+-------------------+
|            null|MonthlyProductSal...|[ProductId, Month...|  [100, 8, 2022]|         1|4455.880000000000...|4455.880000000000...|4455.880000000000...|4455.880000000000...| 573.2018198486818|  328560.32627784065|-0.19585824599649157|0.228

In [27]:
final_df = final_df.orderBy(F.desc(F.col("normalized_score")))

window_spec = Window.orderBy(F.desc(F.col("normalized_score")))

# Assign ranks using row_number()
final_df = final_df.withColumn("LocalRank", F.row_number().over(window_spec))

final_df.show()

+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+------------------+---------+
|AnalysisDetailId|            Mnemonic|      GroupByColumns|    GroupByValues|countValue|         amountValue|            avgValue|            minValue|            maxValue|            StdDev|            Variance|           z_score|  normalized_score|LocalRank|
+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+------------------+---------+
|            null|MonthlyProductSal...|[ProductId, Month...|[17624, 12, 2015]|         2|19483.32000000000...|9741.660000000000...|9575.420000000000...|9907.900000000000...|14454.238180151318| 2.089250013685441E8| 

In [28]:
def percentile(dataframe, number, order = None):
    if order == 'Bottom' or order == 'bottom':
        df = dataframe.filter(F.col('normalized_score') >= (1 - number))
    else:
        df = dataframe.filter(F.col('normalized_score') <= number)
    
    return df

result = percentile(final_df, 0.2, 'Bottom')
result.show()

+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+------------------+---------+
|AnalysisDetailId|            Mnemonic|      GroupByColumns|    GroupByValues|countValue|         amountValue|            avgValue|            minValue|            maxValue|            StdDev|            Variance|           z_score|  normalized_score|LocalRank|
+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+------------------+---------+
|            null|MonthlyProductSal...|[ProductId, Month...|[17624, 12, 2015]|         2|19483.32000000000...|9741.660000000000...|9575.420000000000...|9907.900000000000...|14454.238180151318| 2.089250013685441E8| 

In [29]:
result = percentile(final_df, 0.2, 'Top')
result.show()

+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------------------+---------+
|AnalysisDetailId|            Mnemonic|      GroupByColumns|    GroupByValues|countValue|         amountValue|            avgValue|            minValue|            maxValue|            StdDev|          Variance|             z_score|   normalized_score|LocalRank|
+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------------------+---------+
|            null|MonthlyProductSal...|[ProductId, Month...|  [6783, 4, 2022]|         1|3896.660000000000...|3896.660000000000...|3896.660000000000...|3896.660000000000...|1132.4218198486817|1282379.1780694001|

In [30]:
result = percentile(final_df, 0.01, 'Top')
result.show()

+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------------------+--------------------+---------+
|AnalysisDetailId|            Mnemonic|      GroupByColumns|    GroupByValues|countValue|         amountValue|            avgValue|            minValue|            maxValue|            StdDev|            Variance|            z_score|    normalized_score|LocalRank|
+----------------+--------------------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------------------+--------------------+---------+
|            null|MonthlyProductSal...|[ProductId, Month...|  [1619, 2, 2001]|         1|195.0800000000000...|195.0800000000000...|195.0800000000000...|195.0800000000000...| 4834.001819848681|2.33675735943

## THEN (FIT)
Fit the data as per the model

## POST CONDITION
Verify the analysis results are accurate and meet the acceptance criteria

## POST PROCESS


## VISUALIZATIONS
Include any suitable visualizations