## Sample notebook showing end-to-end Sales prediction use case using the FedML Databricks Library. 

### The FedML Databricks Library reads the training data via SAP Datasphere, trains the model in Databricks, deploys the model in Databricks and the inference result is written back to SAP Datasphere.

### Install fedml_databricks library

In [0]:
%pip install fedml-databricks

In [0]:
pip install -U scikit-learn

In [0]:
import numpy as np
import pandas as pd
import json
from pyspark.sql.functions import col
from fedml_databricks import DbConnection, predict

### 1. Connect to SAP Datasphere, Explore, Acquire and Prepare Data

#### 1.1 Connect to SAP Datasphere.

Create a Databricks Secret Scope by referring the [(link)](https://docs.databricks.com/security/secrets/secret-scopes.html#create-a-databricks-backed-secret-scope). Then, create the Databricks Secret containing SAP Datasphere credentials in the form of json, using the [(link)](https://docs.databricks.com/security/secrets/secrets.html#create-a-secret-in-a-databricks-backed-scope). The  SAP Datasphere connection credentials can be obtained by completing the pre-requisite step using the [(link)](https://github.com/SAP-samples/data-warehouse-cloud-fedml/blob/main/Databricks/docs/dbconnection.md#pre-requisite#pre-requisite)

In [0]:
config_json={
    "address":  "985639e9-cd50-4b52-8832-f4e244263077.hana.prod-us10.hanacloud.ondemand.com",
    "port": "443",
    "user": "ROWE#SQL",
    "password": "@8iUTlZey]QyE^2A51RY-]QN^>FrM*IA",
    "schema": "ROWE"
}

In [0]:
db = DbConnection(dict_obj=config_json)

#### 1.2 List all business models available to read from

In [0]:
data= db.get_schema_views()
data

#### 1.3 Query the SAP Datasphere data using SQL Queries. Get the data as a PySpark DataFrame

In [0]:
spark_df=db.execute_query_pyspark('SELECT * FROM \"ROWE\".\"Sales_Orders"')
spark_df.display()

In [0]:
##spark_df=db.execute_query_pyspark('SELECT * FROM \"DEMOSALESANALYSIS\".\"PP_Gross_Sales_S4\"')
##spark_df.show(truncate=False)

spark_df = spark_df.withColumn("Year", col('SALESDOCUMENTDATE').substr(1, 4))

spark_df.display()

##### 1.3.1 Get Insights from the data. In the below cell, we get the average net amount for the year '2021'

In [0]:
average_sales_for_2021_df=spark_df.filter(spark_df['Year']=='2021').groupBy().avg('NETAMOUNT')
average_sales_for_2021_df.show(truncate=False)

##### 1.3.2 Convert the PySpark DataFrame to Pandas DataFrame

In [0]:
spark_df = spark_df.withColumn("NETAMOUNT",spark_df['NETAMOUNT'].cast('double'))
spark_df = spark_df.withColumn("PartnerRevenue",spark_df['PartnerRevenue'].cast('double'))
dataframe=spark_df.toPandas()

dataframe.head()

#### 1.4 Preprocess the data

##### 1.4.1 Replace the zero values with the mean values in few of the selected columns

In [0]:
dataframe=dataframe.replace({'NETAMOUNT': {0: dataframe['NETAMOUNT'].mean(skipna=True)}})
dataframe=dataframe.replace({'PartnerRevenue': {0: dataframe['PartnerRevenue'].mean(skipna=True)}})

##### 1.4.2 Perform One Hot Encoding on the Categorical columns

Note that if you use a lower version of sklearn, you will have to replace get_feature_names_out() with get_feature_names() in the below cell.

In [0]:
from sklearn.preprocessing import OneHotEncoder
def one_hot_encode(df,column):
    encoder = OneHotEncoder(handle_unknown='ignore')
    encoder_df = pd.DataFrame(encoder.fit_transform(df[[column]]).toarray())
    encoded_columns = encoder.get_feature_names_out([column])
    encoder_df.columns = encoded_columns
    return encoder_df,encoded_columns

In [0]:
encoded_column_names=[]
for column in ['Customer','Customer Name','First Name','Last Name','Promocode','City','State','Currency','SALESORGANIZATION','MATERIAL','MATERIALGROUP','PURCHASEORDERBYCUSTOMER','TRANSACTIONCURRENCY','SALESDOCUMENTDATE']:
    encoded_df, encoded_columns = one_hot_encode(dataframe, column)
    dataframe = dataframe.join(encoded_df, rsuffix='_'+column)
    encoded_column_names += encoded_columns.tolist()

### 2. Now, using the data,  train the model

In [0]:
import os,json
import pandas as pd
import mlflow 
from sklearn.linear_model import LinearRegression
import numpy as np
from sklearn.model_selection import train_test_split

label_column = 'PartnerRevenue'
y = dataframe[label_column]
dataframe.drop(label_column, axis=1, inplace=True)
X_train, X_test, y_train, y_test = train_test_split(dataframe , y, test_size=0.3)

#### 2.1 Use the columns required for training the model

In [0]:
train_columns=['NETAMOUNT']+encoded_column_names
X_train_dataframe,X_test_dataframe=X_train[train_columns],X_test[train_columns]

#### 2.2 Train the model and log results using mlflow

In [0]:
def train_model(X_train,X_test, y_train, y_test,experiment_name,model_name):
    mlflow.set_experiment(experiment_name) 
    print("Training model...")

    #Train the LinearRegression model using the fit method
    with mlflow.start_run() as run:
        model = LinearRegression().fit(X_train_dataframe, y_train)
        score = model.score(X_test_dataframe, y_test)
        mlflow.log_param("score",score)
        mlflow.sklearn.log_model(model,model_name,
                         registered_model_name = model_name)
        
    run_id = run.info.run_id
    return run_id


Replace the user with the appropriate databricks user in the below cell.

In [0]:
experiment_name,model_name='/Users/jerome.ivain@databricks.com/PartnerRevenuePredictionExperiment','PartnerRevenuePredictionModel'
run_id=train_model(X_train,X_test, y_train, y_test,experiment_name,model_name)

In [0]:
model_uri=f"runs:/{run_id}/{model_name}"
print("The MODEL_URI is '{}'".format(model_uri))

### 3. Register the  model

In [0]:
import time
model_version = mlflow.register_model(model_uri=model_uri,name=model_name)
 
# Registering the model takes a few seconds, so add a small delay
time.sleep(15)

### 4. Transition the model to production

In [0]:
from mlflow.tracking import MlflowClient
 
client = MlflowClient()
client.transition_model_version_stage(
  name=model_name,
  version=model_version.version,
  stage="Production",
)

### 5. Inference the deployed model by passing the test data

In [0]:
#X_test_dataframe['NETAMOUNT'] = X_test_dataframe['NETAMOUNT'].astype(float)
#X_test_dataframe['PartnerRevenue'] = X_test_dataframe['PartnerRevenue'].astype(float)

In [0]:
import pandas as pd
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
result=model.predict(X_test_dataframe)
inference_dataframe=pd.DataFrame(result,columns=['prediction_result'])
inference_dataframe

### 6. Store the inferencing result in SAP Datasphere

#### 6.1 Store the inference result in the pandas dataframe

In [0]:
X_test['Predicted_PartnerRevenue']=inference_dataframe['prediction_result'].values

#### 6.2 Select the required columns from pandas dataframe

In [0]:
X_test.columns

In [0]:
datasphere_write_dataframe=X_test[['SALESORGANIZATION','MATERIAL','MATERIALGROUP','PURCHASEORDERBYCUSTOMER', 'TRANSACTIONCURRENCY','NETAMOUNT','Customer','Customer Name','First Name','Last Name','Promocode','City','State','Currency','SALESDOCUMENTDATE','Year', 'Predicted_PartnerRevenue']]

#### 6.3 Renaming the columns in the pandas dataframe

In [0]:
datasphere_write_dataframe.rename(columns = {'Customer Name':'Customer_Name', 'First Name':'First_Name', 'Last Name':'Last_Name'}, inplace = True)

In [0]:
datasphere_write_dataframe

#### 6.4 Create a table in SAP Datasphere for storing the inference result

In [0]:
db.drop_table("SALES_TABLE")

db.create_table("CREATE TABLE SALES_TABLE (SALESORGANIZATION Varchar(20),MATERIAL Varchar(20),MATERIALGROUP Varchar(20),PURCHASEORDERBYCUSTOMER Varchar(20),TRANSACTIONCURRENCY Varchar(20),NETAMOUNT Float,Customer Varchar(20), Customer_Name Varchar(50),First_Name Varchar(20),Last_Name Varchar(20),Promocode Varchar(20),City Varchar(20),State Varchar(20),PartnerRevenue Float,Currency Varchar(20),SALESDOCUMENTDATE Varchar(20),Year Varchar(20),Predicted_PartnerRevenue Float)")

#### 6.5 Write the prediction results to 'SALES_TABLE' table in SAP Datasphere

In [0]:
db.insert_into_table('SALES_TABLE',datasphere_write_dataframe)

In [0]:
tables=db.get_table_metadata('SALES_TABLE')
tables

### 7. Store in the DELTA LAKE as well

#### 7.1 Create the 'SALES_TABLE' table with the new predicted field

In [0]:
schema_name = "dbdemos_jerome.sap"
table_name = "dbdemos_jerome.sap.SAP_SALES_PREDICTED"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
spark.sql(f"DROP TABLE IF EXISTS {table_name}")

dsp_spark_df=spark.createDataFrame(datasphere_write_dataframe) 
dsp_spark_df.write.format("delta").saveAsTable(table_name)

#### 7.2 Generate Audible Ready Descriptions for MATERIALGROUPS

In [0]:
%sql

CREATE TABLE dbdemos_jerome.sap.MAT_DESC AS SELECT MATERIALGROUP, ai_gen("Describe in one sentence this material column value:"||MATERIALGROUP) as Material_Desc FROM dbdemos_jerome.sap.SAP_SALES_PREDICTED GROUP BY MATERIALGROUP;

In [0]:
tbl_location = spark.sql(f"DESCRIBE DETAIL {table_name}").first().location
print(tbl_location)

files = dbutils.fs.ls(tbl_location)
display(files)

In [0]:
spark_df=spark.sql('SELECT SalesOrganization, AVG(Predicted_PartnerRevenue) FROM dbdemos_jerome.sap.SAP_SALES_PREDICTED group by SALESORGANIZATION')
spark_df.display()