In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType,DoubleType,BooleanType,DataType

In [None]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "client_id",
    "fs.azure.account.oauth2.client.secret": 'client_secret_key',
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/client_tenant_id/oauth2/token"
}

dbutils.fs.mount(
    source="abfss://<container_name>@<storage_account_name>.dfs.core.windows.net",  #contrainer@storageacc
    mount_point="/mnt/supplychain",
    extra_configs=configs
)

In [None]:
dbutils.fs.ls("/mnt/supplychain")

In [None]:
sales_data = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/supplychain/raw_data/sales_data.csv")
inventory_data = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/supplychain/raw_data/inventory_data.csv")
supplier_data = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/supplychain/raw_data/supplier_data.csv")

In [None]:
#Basic commands to check your dataset and schema
sales_data.show()
sales_data.printSchema()
inventory_data.show()
inventory_data.printSchema()
supplier_data.show()
supplier_data.printSchema()

In [None]:
#Dropping duplicates
sales_data = sales_data.dropDuplicates()
inventory_data = inventory_data.dropDuplicates()
supplier_data = supplier_data.dropDuplicates()


In [None]:
# Cleaning data
sales_data=sales_data.drop("Row ID","Ship Mode","Region","Sub-Category" )
supplier_data=supplier_data.drop("SKU","Customer demographics","Shipping carriers","Inspection results","Routes")

In [None]:
# Organising data
from pyspark.sql.functions import when

# Categorize products based on sales performance
sales_data = sales_data.withColumn("Performance_category",
    when(sales_data["Sales"] > 100, "High Performer")
    .when(sales_data["Sales"] > 50, "Medium Performer")
    .otherwise("Low Performer"))


In [None]:

# Create a column for stock level status
from pyspark.sql.functions import when

inventory_data = inventory_data.withColumn("Stock_status",
    when(inventory_data["Quantity"] < 10, "Low Stock")
    .when(inventory_data["Quantity"] > 100, "Excess Stock")
    .otherwise("Normal Stock"))


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define a window specification based on product_id
window_spec = Window.partitionBy("StockCode").orderBy("StockCode")

# Add a row number to each entry
inventory_data = inventory_data.withColumn("Row_number", row_number().over(window_spec))

# Use the row number to calculate lag
inventory_data = inventory_data.withColumn("Previous_stock_level", lag("Quantity").over(window_spec))

# Calculate the difference
inventory_data = inventory_data.withColumn("Stock_change", inventory_data["Quantity"] - inventory_data["Previous_stock_level"])

inventory_data.show()


In [None]:
import pyspark.sql.functions as F
import random
from pyspark.sql.types import DateType
from datetime import timedelta, datetime

# Define the start and end date range
start_date = datetime(2022, 1, 1)
end_date = datetime(2023, 1, 1)

# Create a UDF to generate random dates
def random_date():
    return start_date + timedelta(days=random.randint(0, (end_date - start_date).days))

random_date_udf = F.udf(random_date, DateType())

# Replace NULL dates with random dates
inventory_data = inventory_data.withColumn(
    "InvoiceDate", 
    F.when(F.col("InvoiceDate").isNull(), random_date_udf()).otherwise(F.col("InvoiceDate"))
)

# Show the updated DataFrame
inventory_data.show()


In [None]:
# Save the DataFrame as a Parquet file
sales_data.repartition(1).write.mode("overwrite").option("header","true").parquet("/mnt/supplychain/transformed_data/sales_data.parquet")
inventory_data.repartition(1).write.mode("overwrite").option("header","true").parquet("/mnt/supplychain/transformed_data/inventory_data.parquet")
supplier_data.repartition(1).write.mode("overwrite").option("header","true").parquet("/mnt/supplychain/transformed_data/supplier_data.parquet")

In [None]:
# Load the transformeddata in our synapse workspace
inventory_data = spark.read.parquet("abfss://supply-chain-data@supplychainstoragek.dfs.core.windows.net/transformed_data/inventory_data.parquet")
supplier_data = spark.read.parquet("abfss://supply-chain-data@supplychainstoragek.dfs.core.windows.net/transformed_data/supplier_data.parquet")
sales_data = spark.read.parquet("abfss://supply-chain-data@supplychainstoragek.dfs.core.windows.net/transformed_data/sales_data.parquet")


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Define a window specification
window_spec = Window.partitionBy("StockCode").orderBy("Invoicedate").rowsBetween(-3, 0)

# Create a rolling average feature
inventory_data = inventory_data.withColumn("Rolling_avg_stock", avg("Quantity").over(window_spec))


In [None]:
from pyspark.ml.feature import VectorAssembler

# Make sure the column name is exactly as it appears in your DataFrame
feature_cols = ["Rolling_avg_stock"]

# Initialize the VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the DataFrame
inventory_data = assembler.transform(inventory_data)

# Show the transformed DataFrame
inventory_data.select("features").show()


In [None]:
# Split the data into training and test sets
train_data, test_data = inventory_data.randomSplit([0.8, 0.2], seed=1234)

# Check the number of rows in each set
print(f"Training Data Count: {train_data.count()}")
print(f"Test Data Count: {test_data.count()}")


In [None]:
# Split the data into training and test sets
train_data, test_data = inventory_data.randomSplit([0.8, 0.2], seed=1234)

# Check the number of rows in each set
print(f"Training Data Count: {train_data.count()}")
print(f"Test Data Count: {test_data.count()}")


In [None]:
from pyspark.ml.regression import LinearRegression

# Initialize the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="Quantity")

# Fit the model on the training data
lr_model = lr.fit(train_data)

# Print out the coefficients and intercept for linear regression
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the test data
test_results = lr_model.transform(test_data)

# Initialize the regression evaluator
evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="Quantity",
    metricName="rmse"
)

# Evaluate the model's performance on the test data
rmse = evaluator.evaluate(test_results)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")


In [None]:
# Save the trained model
lr_model.save("/mnt/supplychain/models/inventory_forecast_model")


In [None]:
from pyspark.ml.regression import LinearRegressionModel

# Load the saved model
loaded_model = LinearRegressionModel.load("/mnt/supplychain/models/inventory_forecast_model")

# Make predictions with the loaded model
new_predictions = loaded_model.transform(test_data)
new_predictions.select("features", "Quantity", "prediction").show()


# Sample output
#+--------+--------+--------------------+
|features|Quantity|          prediction|
+--------+--------+--------------------+
|  [14.5]|       6|  14.715033842772941|
|   [2.0]|       3|  1.0593837828082833|
|   [8.0]|       2|   7.614095811591319|
|   [4.0]|       1|  3.2442877924026288|
|   [5.5]|       4|   4.882965799598388|
|  [0.75]|       2| -0.3061812231881825|
| [12.25]|      40|  12.257016831979303|
|   [5.5]|       4|   4.882965799598388|
|  [28.5]|      96|  30.009361909933357|
|  [27.5]|      96|  28.916909905136187|
|  [3.25]|       4|   2.424948788804749|
|  [28.5]|       6|  30.009361909933357|
|  [13.5]|       9|  13.622581837975769|
|   [1.0]|       1|-0.03306822198888937|
|   [6.5]|       8|    5.97541780439556|
|  [28.5]|       6|  30.009361909933357|
|  [28.0]|      28|  29.463135907534774|
| [136.0]|      24|  147.44795242562944|
|   [1.0]|       1|-0.03306822198888937|
| [11.25]|       5|  11.164564827182131|
+--------+--------+--------------------+
only showing top 20 rows