# Sales forecasting with SynapseML

# Leverage Power of Azure Synapse Link for SQL

With Azure Synapse Link for SQL is an automated system for replicating data from your transactional databases (both SQL Server 2022 and Azure SQL Database) into a dedicated SQL pool in Azure Synapse Analytics. 
Once set up is done, your initial data is replicated into the target dedicated SQL pool. After the initial table seeding, changes made to your source data are replicated in near real-time. 
This notebook scenario is to:

- Reading data from  Dedicated SQL Pool using Connector for Apache Spark
    - We are going to read tables that we've just replicated from the Azure SQL DB without performing an ETL/ELT
- Join & aggregate operational data across Dedicated SQL pool tables
- Perform Sales forecasting using SynapseML

_**This notebood uses Spark 3.2**_ - Attach to **demo** Apache Spark Pool


<img src="https://synapselinkdemoworkshop.blob.core.windows.net/synapse-link/cosmosdb/synapse-analytics-sql-db-architecture.png" alt="Synapse Link for SQL"/>



In [1]:
%%configure -f
{
  "name": "synapseml",
  "conf": {
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.10.1",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
      "spark.yarn.user.classpath.first": "true"
  }
}

StatementMeta(, 31, -1, Finished, Available)

## Reading data from  Dedicated SQL Pool using Connector for Apache Spark

The Azure Synapse Dedicated SQL Pool Connector for Apache Spark in Azure Synapse Analytics enables efficient transfer of large data sets between the Apache Spark runtime and the Dedicated SQL pool. 

[Azure Synapse Dedicated SQL Pool Connector for Apache Spark](https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-spark-sql-pool-import-export)

In [2]:
account_name = "synapsedatalakebalabuchu" # replace using your adls gen 2 - synapsedatalakesuffix
container_name = "data"
relative_path = "temp"

path = 'abfss://%s@%s.dfs.core.windows.net/%s/' % (container_name, account_name, relative_path)

server_name = 'synapse-link-balabuch' # replace using your Dedicated SQL endpoint name - synapsedatalakesuffix
dedicate_sql_end_point = '%s.sql.azuresynapse.net' %(server_name)


StatementMeta(demo, 31, 2, Finished, Available)

In [3]:
# Add required imports
import com.microsoft.spark.sqlanalytics
from com.microsoft.spark.sqlanalytics.Constants import Constants
from pyspark.sql.functions import col

# Read from existing internal table
Products = (spark.read
                     # If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
                     # to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
                     .option(Constants.SERVER, dedicate_sql_end_point)# "synapse-link-balabuch.sql.azuresynapse.net")
                     # Defaults to storage path defined in the runtime configurations
                     .option(Constants.TEMP_FOLDER, path) #"abfss://data@synapsedatalakebalabuch.dfs.core.windows.net/temp/")
                     # Three-part table name from where data will be read.
                     .synapsesql("dw.dbo.Products")
                     # Column-pruning i.e., query select column values.
                     #.select("<some_column_1>", "<some_column_5>", "<some_column_n>")
                     # Push-down filter criteria that gets translated to SQL Push-down Predicates.
                     #.filter(col("Title").contains("E"))
                     # Fetch a sample of 10 records
                     )


RetailSales = (spark.read
                     .option(Constants.SERVER,dedicate_sql_end_point)
                     .option(Constants.TEMP_FOLDER, path)
                     .synapsesql("dw.dbo.RetailSales")
                )
StoreDemoGraphics = (spark.read
                     .option(Constants.SERVER,dedicate_sql_end_point)
                     .option(Constants.TEMP_FOLDER, path) 
                     .synapsesql("dw.dbo.StoreDemoGraphics")
                )


StatementMeta(demo, 31, 3, Finished, Available)

In [4]:
## from pyspark.sql.types import *
## from pyspark.sql.functions import *

StatementMeta(demo, 31, 4, Finished, Available)

## Perform joins using PySpark

In [5]:
data = (RetailSales.join(Products, RetailSales.productCode == Products.productCode)
                   .join(StoreDemoGraphics, RetailSales.storeId == StoreDemoGraphics.storeId)
                   .select(
                         RetailSales.storeId                      
                       , Products.productCode
                       , Products.wholeSaleCost
                       , Products.basePrice
                       , StoreDemoGraphics.ratioAge60
                       , StoreDemoGraphics.collegeRatio
                       , StoreDemoGraphics.income
                       , StoreDemoGraphics.highIncome150Ratio
                       , StoreDemoGraphics.largeHH
                       , StoreDemoGraphics.minoritiesRatio
                       , StoreDemoGraphics.more1FullTimeEmployeeRatio
                       , StoreDemoGraphics.distanceNearestWarehouse
                       , StoreDemoGraphics.salesNearestWarehousesRatio
                       , StoreDemoGraphics.avgDistanceNearest5Supermarkets
                       , StoreDemoGraphics.salesNearest5StoresRatio
                       , RetailSales.quantity
                       , RetailSales.logQuantity
                       , RetailSales.advertising
                       , RetailSales.price
                       , RetailSales.weekStarting) 
                   .sort(RetailSales.weekStarting, RetailSales.storeId, Products.productCode)

)

StatementMeta(demo, 31, 5, Finished, Available)

In [6]:
# print some basic info
print("records read: " + str(data.count()))
print("Schema: ")
data.printSchema()
display(data.limit(10))

StatementMeta(demo, 31, 6, Finished, Available)

records read: 28949
Schema: 
root
 |-- storeId: integer (nullable = false)
 |-- productCode: string (nullable = true)
 |-- wholeSaleCost: decimal(8,2) (nullable = true)
 |-- basePrice: decimal(8,2) (nullable = true)
 |-- ratioAge60: decimal(12,2) (nullable = false)
 |-- collegeRatio: decimal(12,2) (nullable = false)
 |-- income: decimal(12,2) (nullable = false)
 |-- highIncome150Ratio: decimal(12,2) (nullable = false)
 |-- largeHH: decimal(12,2) (nullable = false)
 |-- minoritiesRatio: decimal(12,2) (nullable = false)
 |-- more1FullTimeEmployeeRatio: decimal(12,2) (nullable = false)
 |-- distanceNearestWarehouse: decimal(12,2) (nullable = false)
 |-- salesNearestWarehousesRatio: decimal(12,2) (nullable = false)
 |-- avgDistanceNearest5Supermarkets: decimal(12,2) (nullable = false)
 |-- salesNearest5StoresRatio: decimal(12,2) (nullable = false)
 |-- quantity: integer (nullable = false)
 |-- logQuantity: decimal(8,2) (nullable = false)
 |-- advertising: boolean (nullable = false)
 |-- pr

SynapseWidget(Synapse.DataFrame, 8aceabad-4386-4bcc-b9ef-c062ebbec5d6)

# LightGBM on Apache Spark


[LightGBM](https://lightgbm.readthedocs.io/en/latest/index.html) is an open-source, distributed, high-performance gradient boosting (GBDT0, GBRT, GBM, or MART) framework. 

This framework specializes in creating high-quality and GPU enabled decision tree algorithms for ranking, classification, and many other machine learning tasks. LightGBM is part of Microsoft's DMTK project.

LightGBM is a gradient boosting framework that uses tree based learning algorithms. It is designed to be distributed and efficient with the following advantages:

- Faster training speed and higher efficiency.
- Lower memory usage.
- Better accuracy.
- Support of parallel, distributed, and GPU learning.
- Capable of handling large-scale data.

## Quantile Regression with LightGBMRegressor

Leverage power of Azure [SynapseML](https://microsoft.github.io/SynapseML/) to build a simple regression model can help us to study and discover the factors that determine the number of sales that a retail store will have in the future.

This notebook scenario is [Microsoft Surface](https://www.microsoft.com/en-us/surface) sales forecasting, with artificially created data.


### Quantile regression

[Quantile regression](https://en.wikipedia.org/wiki/Quantile_regression) is a type of regression analysis used in statistics and econometrics. Whereas the method of least squares estimates the conditional mean of the response variable across values of the predictor variables, quantile regression estimates the conditional median (or other quantiles) of the response variable. 

Quantile regression is an extension of linear regression used when the conditions of linear regression are not met.

## Split dataset into train and test

In [12]:
train, test = data.randomSplit([0.85, 0.15], seed=1)

StatementMeta(demo, 31, 12, Finished, Available)

## Add featurizer to convert features to vector

In [22]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ['storeId',
 
 'wholeSaleCost',
 'basePrice',
 'ratioAge60',
 'collegeRatio',
 'income',
 'highIncome150Ratio',
 'largeHH',
 'minoritiesRatio',
 'more1FullTimeEmployeeRatio',
 'distanceNearestWarehouse',
 'salesNearestWarehousesRatio',
 'avgDistanceNearest5Supermarkets',
 'salesNearest5StoresRatio',
 'logQuantity',
 'advertising',
 'price'
]

featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)["quantity", "features"]
test_data = featurizer.transform(test)["quantity", "features"]

StatementMeta(demo, 31, 22, Finished, Available)

## Model Training

In [31]:
from synapse.ml.lightgbm import LightGBMRegressor

model = LightGBMRegressor(
    objective="quantile", alpha=0.2, learningRate=0.3, numLeaves=31, featuresCol="features", labelCol="quantity"
).fit(train_data)

StatementMeta(demo, 31, 31, Finished, Available)

In [33]:
print(model.getFeatureImportances())

StatementMeta(demo, 31, 33, Finished, Available)

[419.0, 0.0, 0.0, 29.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 2482.0, 0.0, 1.0]


## Model Prediction

In [34]:
scoredData = model.transform(test_data)
display(scoredData)

StatementMeta(demo, 31, 34, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6c36007d-71a4-42f3-a584-6c5daac93fca)

In [36]:
from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="regression", labelCol="quantity", scoresCol="prediction"
).transform(scoredData)
display(metrics)

StatementMeta(demo, 31, 36, Finished, Available)

SynapseWidget(Synapse.DataFrame, 113512c3-4070-464e-b291-06cc250979d3)