In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.types import *
import pyspark.sql.functions as F
# Connect spark to ADLS
spark.conf.set(
    "fs.azure.account.key.salesadls.dfs.core.windows.net","646mPPXzgcDIFSKTnvCLY+7RBqT6Zyxxm/TDrmuakx/xf1zsHdT2BX5KjIn/SJhRgf+eUqeuEBrM+ASt8nC3uQ==")

## Load sales data from ADLS

In [0]:
# define adls path
#
adls_endpoint = "abfss://inputgen2@salesadls.dfs.core.windows.net/"
sales_file_path = "Amazon Sale Report.csv"

full_sales_path = adls_endpoint + sales_file_path
#
price5_path = 'May-2022.csv'
price3_path = 'P  L March 2021.csv'
full_price5_path = adls_endpoint + price5_path
full_price3_path = adls_endpoint + price3_path
#
sale_report_path = 'Sale Report.csv'
full_sku_path = adls_endpoint + sale_report_path
#
expense_path = 'Expense IIGF.csv'
full_expense_path = adls_endpoint + expense_path

In [0]:
sales_data = spark.read.csv(path=full_sales_path,header=True, inferSchema= True)
may_data = spark.read.csv(path=full_price5_path,header=True, inferSchema= True)
mar_data = spark.read.csv(path=full_price3_path,header=True, inferSchema= True)
sale_report_df = spark.read.csv(path=full_sku_path,header=True, inferSchema= True)
expense_df = spark.read.csv(path=full_expense_path,header=True, inferSchema= True)

## Sales Trend Analysis

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

# drop columns
sales_data_clean = sales_data.drop(*['index', 'Unnamed: 22'])

# drop nan
sales_data_clean = sales_data_clean.dropna(subset=['Date', 'Amount'])

# datetime
sales_data_clean = sales_data_clean.withColumn("Date", F.to_date(F.unix_timestamp("Date", "MM-dd-yy").cast("timestamp")))

In [0]:
from pyspark.sql import functions as F

# Sales Amount and Orders Amount by Month
# Add 'Month' column
sales_data_clean = sales_data_clean.withColumn("Year", F.year("Date").cast("string"))\
                                   .withColumn("Month", F.month("Date").cast("string"))

# Assert Month digits
sales_data_clean = sales_data_clean.withColumn("Month", F.lpad("Month", 2, "0"))

# Sales Trend
sales_trend = sales_data_clean.groupBy("Month")\
    .agg(
        F.sum("Amount").alias("Total_Sales"),
        F.countDistinct("Order ID").alias("Order_Count")
    )

In [0]:
display(sales_trend)

Month,Total_Sales,Order_Count
5,26226476.749999948,36865
3,101683.85,150
6,23425809.379999988,32986
4,28838708.319999997,43029


## Geographic analysis

In [0]:
# Calculate sales and order volume by state
regional_sales = sales_data_clean.groupBy("ship-state")\
    .agg(
        F.sum("Amount").alias("Total_Sales"),  # Rename'Total_Sales'
        F.countDistinct("Order ID").alias("Order_Count")  # Rename'Order_Count'
    )\
    .orderBy(F.desc("Total_Sales"))  # sort by desc


regional_sales.show(10)

## Cross-platform product pricing strategy analysis

In [0]:
from pyspark.sql.functions import col, when, lit
from pyspark.sql.types import FloatType

# clean 
def clean_and_convert_to_float(df, columns):
    for column_name in columns:
        df = df.withColumn(column_name, when((col(column_name).isNull()) | (col(column_name) == "Nill"), lit(None)).otherwise(col(column_name).cast(FloatType())))
    return df

# 
relevant_columns = ['Sku', 'Style Id', 'Ajio MRP', 'Amazon MRP', 'Flipkart MRP', 'Myntra MRP', 'Paytm MRP', 'Snapdeal MRP']

#
may_df_clean = clean_and_convert_to_float(may_data, relevant_columns[2:])
mar_df_clean = clean_and_convert_to_float(mar_data, relevant_columns[2:])


In [0]:
from pyspark.sql.functions import col, lit, avg

# 首先，为了简化代码，我们定义一个函数来计算平均价格和价格变化
def calculate_avg_price_changes(df1, df2, time1_label, time2_label, platforms):
    # 计算每个平台在df1的平均价格
    avg_prices_df1 = df1.groupBy().agg(*(avg(col(p)).alias(f"{p}_{time1_label}") for p in platforms))
    
    # 计算每个平台在df2的平均价格
    avg_prices_df2 = df2.groupBy().agg(*(avg(col(p)).alias(f"{p}_{time2_label}") for p in platforms))
    
    # 由于这两个DataFrame都只有一行，我们可以直接进行横向连接（crossJoin）
    combined_avg_prices = avg_prices_df1.crossJoin(avg_prices_df2)
    
    # 计算变化
    for platform in platforms:
        combined_avg_prices = combined_avg_prices.withColumn(
            f"Change_{platform}", 
            col(f"{platform}_{time1_label}") - col(f"{platform}_{time2_label}")
        )
    
    return combined_avg_prices

# 定义平台列表
platforms = ["Ajio MRP", "Amazon MRP", "Flipkart MRP", "Myntra MRP", "Paytm MRP", "Snapdeal MRP"]

# 调用函数计算平均价格和价格变化
avg_price_changes_df = calculate_avg_price_changes(may_df_clean, mar_df_clean, "May2022", "PLMarch2021", platforms)

# 选择和重排列列以匹配期望的输出格式
output_columns = []
for platform in platforms:
    output_columns.append(f"{platform}_May2022")
    output_columns.append(f"{platform}_PLMarch2021")
    output_columns.append(f"Change_{platform}")

result_df = avg_price_changes_df.select(*output_columns)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, struct, col

platforms = ["Ajio", "Amazon", "Flipkart", "Myntra", "Paytm", "Snapdeal"]

# 
new_rows = []
for platform in platforms:
    new_rows.append((
        platform, 
        result_df.select(f"{platform} MRP_May2022").collect()[0][0],
        result_df.select(f"{platform} MRP_PLMarch2021").collect()[0][0],
        result_df.select(f"Change_{platform} MRP").collect()[0][0]
    ))

new_df_schema = ["Platform", "MRP_May2022", "MRP_PLMarch2021", "Change_MRP"]
platform_df = spark.createDataFrame(new_rows, schema=new_df_schema)

platform_df.show()


## SKU Analysis

In [0]:
from pyspark.sql.functions import avg, max, min, col

# avg_stock_per_sku
avg_stock_per_sku = sale_report_df.groupBy("SKU Code").agg(avg("Stock").alias("avg_stock"))

# stock_sku 
highest_stock_sku = avg_stock_per_sku.orderBy(col("avg_stock").desc()).first()
lowest_stock_sku = avg_stock_per_sku.orderBy("avg_stock").first()

# avg_stock_per_category
avg_stock_per_category = sale_report_df.groupBy("Category").agg(avg("Stock").alias("avg_stock"))

highest_stock_sku_df = avg_stock_per_sku.orderBy(col("avg_stock").desc()).limit(1).withColumn("Type", lit("Highest"))
lowest_stock_sku_df = avg_stock_per_sku.orderBy("avg_stock").limit(1).withColumn("Type", lit("Lowest"))

# combine
combined_sku_df = highest_stock_sku_df.union(lowest_stock_sku_df)

# 
final_result_df = combined_sku_df.withColumnRenamed("SKU Code", "SKU_Code").withColumnRenamed("avg_stock", "Average_Stock")



In [0]:
display(avg_stock_per_category)

Category,avg_stock
NIGHT WEAR,15.152073732718891
KURTI,57.32142857142857
KURTA,30.68679549114332
,0.0
PALAZZO,10.582417582417582
LEHENGA CHOLI,21.02857142857143
JUMPSUIT,4.714285714285714
SET,23.46952380952381
PANT,37.75824175824176
CARDIGAN,3.411764705882353


In [0]:
display(final_result_df)

SKU_Code,Average_Stock,Type
JNE3405-KR-XXL,1234.0,Highest
JNE3401-KR-XXL,0.0,Lowest


## Cost and Profit

In [0]:
from pyspark.sql.functions import sum as _sum, avg, col
from pyspark.sql.types import DoubleType

expense_df = expense_df.withColumnRenamed('Recived Amount','Date')\
                                     .withColumnRenamed('Unnamed: 1','Received Amount')\
                                     .withColumnRenamed('Expance','Expense Category')\
                                     .withColumnRenamed('Unnamed: 3','Expense Amount')

# tranform "Expense Amount" type.Double
expense_df = expense_df.withColumn("Expense Amount", when(col("Expense Amount").cast(DoubleType()).isNull(), 0)
                                              .otherwise(col("Expense Amount").cast(DoubleType())))

# total expense
total_expense = expense_df.agg(_sum("Expense Amount").alias("Total Expense")).collect()[0]["Total Expense"]

# clean “P & L March 2021.csv”
mar_df = mar_data.withColumn("Final MRP Old", when(col("Final MRP Old").cast(DoubleType()).isNull(), 0)
                                                .otherwise(col("Final MRP Old").cast(DoubleType())))

# total_revenue
total_revenue = mar_df.agg(_sum("Final MRP Old").alias("Total Revenue")).collect()[0]["Total Revenue"]

# profit
profit = total_revenue - total_expense

# revenue_per_category
revenue_per_category = mar_df.groupBy("Category").agg(_sum("Final MRP Old").alias("Revenue")).orderBy("Category")

# expense_result_df
expense_result_df = spark.createDataFrame([
    ("Total Expense", total_expense),
    ("Total Revenue", total_revenue),
    ("Profit", profit)
], ["Metric", "Value"])



In [0]:
display(revenue_per_category)

Category,Revenue
Gown,100840.0
Kurta,1593139.0
Kurta Set,943888.0
Nill,174441.45
Tops,77775.0


In [0]:
display(expense_result_df )

Metric,Value
Total Expense,18095.0
Total Revenue,2890083.45
Profit,2871988.45


## Writing Output tables to Data Lake Gen-2 in Parquet Format

In [0]:
adls_endpoint = "abfss://inputgen2@salesadls.dfs.core.windows.net/"
output_data_path = adls_endpoint + 'analysis_output/'

sales_analysis_path = output_data_path + 'sales_analysis/'
regional_analysis_path = output_data_path + 'regional_analysis/'
platform_analysis_path = output_data_path + 'platform_analysis/'
stock_analysis_path = output_data_path + 'stock_analysis/'
sku_analysis_path = output_data_path + 'sku_analysis/'
revenue_analysis_path = output_data_path + 'revenue_analysis/'
cost_profit_path = output_data_path + 'cost_profit_analysis/'

In [0]:
dbutils.widgets.text("run_mode", "test", "Run Mode")

run_mode = dbutils.widgets.get("run_mode")

if run_mode == "production":
    # 仅当run_mode为"production"时，才执行数据存储操作
    sales_trend.write.mode("overwrite").parquet(sales_analysis_path)
    regional_sales.write.mode("overwrite").parquet(regional_analysis_path)
    platform_df.write.mode("overwrite").parquet(platform_analysis_path)
    avg_stock_per_category.write.mode("overwrite").parquet(stock_analysis_path)
    final_result_df.write.mode("overwrite").parquet(sku_analysis_path)
    revenue_per_category.write.mode("overwrite").parquet(revenue_analysis_path)
    expense_result_df.write.mode("overwrite").parquet(cost_profit_path)
else:
    # 如果是"test"模式，可以打印消息或执行其他非存储操作
    print("Test mode, not storing data.")
