# Machine Learning with Snowpark Python and Java UDFs


## Using rental listings we would like to train a ML model to estimate the rent price of our sale listing : 

![title](img/BusinessUseCase.png)

## We will use the following architecture to prepare our data, training our XGBoost linear regression model and run our model on Snowflake :

![title](img/TechUseCase.png)



# Now to the fun stuff !
### Imports the Snowpark library 

In [None]:
from snowflake.snowpark import *
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
import json
import pandas as pd

### Initialising our connection to Snowflake 

In [None]:
with open('Creds.json') as f:
    connection_parameters = json.load(f)    

mySnowSess = Session.builder.configs(connection_parameters).create()

# Clean Stage
# mySnowSess.sql("rm @SnowParkDemo_Stage").collect()

# Data Preparation using Data Frames
We have Real Estate listings and their postcodes, what we want to do is to remove outliers. For this we create a dataframe which calculates the median.



In [None]:
ADS_df_Raw = mySnowSess.table("ADS").filter(col("ADS_CATEGORY_NAME") ==  "Locations" )
ADS_df_GrpbyCodeInsee_MedianPrice = ADS_df_Raw.select(
  col("ADS_CODEINSEE")
, col("ADS_ATTR_REAL_ESTATE_TYPE").as_("ADS_ATTR_REAL_ESTATE_TYPE_MEDIAN")
, col("ADS_ID")
, (col("ADS_PRICE") / col("ADS_ATTR_SQUARE")).as_("ADS_PRICE_SQUARE")).\
  groupBy(col("ADS_CODEINSEE"),col("ADS_ATTR_REAL_ESTATE_TYPE_MEDIAN")).\
  agg([count (col("ADS_ID")).as_("COUNT_ADS"), median(col("ADS_PRICE_SQUARE")).as_("MED_PRICE")])

In [None]:
pd.DataFrame(ADS_df_GrpbyCodeInsee_MedianPrice.limit(10).collect())

### Removing outliers
We can join our median to our listing data and filter out any listing that deviates too far from the mean


In [None]:
ADS_df_joinMedian = ADS_df_Raw.join(
  ADS_df_GrpbyCodeInsee_MedianPrice,
  (ADS_df_Raw.col("ADS_CODEINSEE") == ADS_df_GrpbyCodeInsee_MedianPrice.col("ADS_CODEINSEE"))
  &(ADS_df_Raw.col("ADS_ATTR_REAL_ESTATE_TYPE") == ADS_df_GrpbyCodeInsee_MedianPrice.col("ADS_ATTR_REAL_ESTATE_TYPE_MEDIAN")))

ADS_df_Clean = ADS_df_joinMedian.withColumn("ADS_PRICE_SQUARE", col("ADS_PRICE") / col("ADS_ATTR_SQUARE")).filter(
  (col("ADS_PRICE_SQUARE") / (col("ADS_PRICE_SQUARE") + col("MED_PRICE")) >= 0.25)
  & (col("ADS_PRICE_SQUARE") / (col("ADS_PRICE_SQUARE") + col("MED_PRICE")) < 0.75)
  & (col("ADS_PRICE_SQUARE") < 150)
  & (col("ADS_PRICE_SQUARE") > 0)
  & (col("ADS_ATTR_SQUARE") >= 9)
  & (col("ADS_ATTR_SQUARE") <= 300)
  & (col("COUNT_ADS") >= 5)
)


In [None]:
pd.DataFrame(ADS_df_Clean.limit(10).collect())

### Cleaning the data for data science
While having listing types in readable words is useful for the business, it is not ideal for our model training, we create a couple of mapping functions to transform our words to numbers!

Snowpark will seamlessly push these functions to snowflake as Python UDFs.

**Note:** We would have had to write a SQL CASE statement, but instead we wrote a Python function

In [None]:
@udf(name="ADS_ATTR_FURNISHED_Encode_Python", is_permanent=True, stage_location="@SnowParkDemo_Stage", replace=True)
def ADS_ATTR_FURNISHED_Encode_Python(x : str) -> int:
  if x == "Meublé":
    return 2
  elif x == "Non meublé":
    return 1
  else:
    return 0

In [None]:
@udf(name="ADS_ATTR_REAL_ESTATE_TYPE_Encode_Python", is_permanent=True, stage_location="@SnowParkDemo_Stage", replace=True)
def ADS_ATTR_REAL_ESTATE_TYPE_Encode_Python(x : str) -> int:
  if x == "Maison":
    return 2
  elif x == "Appartement":
    return 1
  else:
    return 0


Now that these functions are available in snowflake as UDFs we can simply call them from our dataframe to clean our result!

In [None]:
ADS_df_final = ADS_df_Clean.select(
        col('ADS_GEO_LAT')
        ,col('ADS_GEO_LNG')
        ,col('ADS_ATTR_ROOMS')
        ,col('ADS_ATTR_SQUARE')
        ,ADS_ATTR_REAL_ESTATE_TYPE_Encode_Python(col("ADS_ATTR_REAL_ESTATE_TYPE")).as_('ADS_ATTR_REAL_ESTATE_TYPE_NUM')
        ,ADS_ATTR_FURNISHED_Encode_Python(col("ADS_ATTR_FURNISHED")).as_('ADS_ATTR_FURNISHED_NUM')
        ,col('ADS_PRICE'))



We can get the execution plan and SQL needed to prefrom all our steps by calling explain

In [None]:
ADS_df_final.explain()

In [None]:
pd.DataFrame(ADS_df_final.limit(10).collect())

# Get the prepared data and train the XGBoost Model
We cast the retrieve data as dataframe, split these data in training / test dataframe using sklearn and cast them in DMatrix

In [None]:
# Create Test / Train
CollectedDataframe = pd.DataFrame(ADS_df_final.collect())
target = "ADS_PRICE"
predictors = [x for x in CollectedDataframe.columns if x not in [target]]

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(CollectedDataframe[predictors] , CollectedDataframe[target], test_size=0.1)

#Create DMatrix
import xgboost as xgb
DMatrix_train = xgb.DMatrix(X_train, label=y_train)
DMatrix_test = xgb.DMatrix(X_test, label=y_test)

print("DMatrix_train (", DMatrix_train.num_row() ,", ",DMatrix_train.num_col(),")")
print("DMatrix_test  (", DMatrix_test.num_row()  ,", ",DMatrix_test.num_col(),")")



We setup the XGBoost parameters for the training and run the training

In [None]:
param={}
param['booster']='gbtree'
param['objective']= 'reg:squarederror'
param['eta']=0.05
param['max_depth']=10
param['min_child_weight']=1
param['gamma']=1
param['subsample']=0.75
param['colsample_bytree']=0.75
param['scale_pos_weight']=1
param['nthread'] = -1
param['verbosity'] = 1


evallist = [(DMatrix_train, 'train'), (DMatrix_test, 'eval')]
num_round=100
bst = xgb.train(param, DMatrix_train, num_round, evallist)


Print the features importance and the variance Score

In [None]:
import matplotlib
xgb.plot_importance(bst)

# make predictions for test data
from sklearn.metrics import explained_variance_score

pred_test = bst.predict(DMatrix_test)
print("pred_test : ", explained_variance_score(pred_test,DMatrix_test.get_label()))




# Creating the Python Inference function 

### Register the Python Inference Function as Python UDF on Snowflake

In [None]:
mySnowSess.add_packages("xgboost","pandas")

@udf(name="get_XGBoost_RENT_PRICE_New", is_permanent=True, stage_location="@SnowParkDemo_Stage", replace=True)
def get_XGBoost_RENT_PRICE(lat: float, lng: float, rooms: int, square: float, realestatetype: int, furnished: int) -> float:
    import pandas
    import xgboost as xgb
    RowDataFrame = pandas.DataFrame([[lat, lng, rooms, square, realestatetype, furnished]],columns=['ADS_GEO_LAT', 'ADS_GEO_LNG', 'ADS_ATTR_ROOMS', 'ADS_ATTR_SQUARE', 'ADS_ATTR_REAL_ESTATE_TYPE_NUM', 'ADS_ATTR_FURNISHED_NUM'])
    RowDMatrix = xgb.DMatrix(RowDataFrame)
    return bst.predict(RowDMatrix)[0]


# Using our UDF Function to run our model on Snowflake 

### Classifying our entire data and writing it to a new table.

In [None]:
mySnowSess.table("LBC.PUBLIC.LBC_ADS").filter((col("ADS_CATEGORY_NAME") ==  "Ventes immobilières" )). \
    withColumn("EstimatedRentPrice",get_XGBoost_RENT_PRICE(
        col("ADS_GEO_LAT"),
        col("ADS_GEO_LNG"),
        col("ADS_ATTR_ROOMS"),
        col("ADS_ATTR_SQUARE"),
        ADS_ATTR_REAL_ESTATE_TYPE_Encode_Python(col("ADS_ATTR_REAL_ESTATE_TYPE")),
        ADS_ATTR_FURNISHED_Encode_Python(col("ADS_ATTR_FURNISHED")))).\
    write.mode("overwrite").saveAsTable("LBC_ADS_RENT_PRED_XGBOOST")


### Alternatively, use our Classifier on demand


In [None]:
pd.DataFrame(mySnowSess.table("LBC.PUBLIC.LBC_ADS").filter((col("ADS_CATEGORY_NAME") ==  "Ventes immobilières" )). \
    withColumn("EstimatedRentPrice",get_XGBoost_RENT_PRICE(
        col("ADS_GEO_LAT"),
        col("ADS_GEO_LNG"),
        col("ADS_ATTR_ROOMS"),
        col("ADS_ATTR_SQUARE"),
        ADS_ATTR_REAL_ESTATE_TYPE_Encode_Python(col("ADS_ATTR_REAL_ESTATE_TYPE")),
        ADS_ATTR_FURNISHED_Encode_Python(col("ADS_ATTR_FURNISHED")))). \
select(col("ADS_SUBJECT"),col("ADS_GEO_CITY"),col("EstimatedRentPrice"),col("ADS_PRICE")).limit(10).collect())

### Optional :  We can also write the model to file and push it on Snowflake for A/B Testing UDF

create or replace function GetLocPriceXGBoostABTest(model VARCHAR, lat  FLOAT, lng  FLOAT, rooms  NUMBER, square FLOAT, realestatetype  NUMBER, furnished  NUMBER)
returns FLOAT
language python
runtime_version = '3.8'
packages = ('xgboost', 'pandas')
imports=('@snowparkdemo_stage/AnnoncePriceLocations_A.xbmodel','@snowparkdemo_stage/AnnoncePriceLocations_B.xbmodel')
handler = 'GetLocPriceXGBoost'
as
$$
import sys
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

def GetLocPriceXGBoost(model: str, lat: float, lng: float, rooms: int, square: float, realestatetype: int, furnished: int):
    import pandas
    import xgboost as xgb
    file_path = import_dir + "AnnoncePriceLocations_" + model + ".xbmodel"
    bst = xgb.Booster({'nthread': 1})  # init model
    bst.load_model(file_path)
    RowDataFrame = pandas.DataFrame([[lat, lng, rooms, square, realestatetype, furnished]],columns=['ADS_GEO_LAT', 'ADS_GEO_LNG', 'ADS_ATTR_ROOMS', 'ADS_ATTR_SQUARE', 'ADS_ATTR_REAL_ESTATE_TYPE_NUM', 'ADS_ATTR_FURNISHED_NUM'])
    RowDMatrix = xgb.DMatrix(RowDataFrame)
    return bst.predict(RowDMatrix)[0]

$$;



In [None]:
#bst.save_model("./AnnoncePriceLocations_A.xbmodel")
#mySnowSess.sql("put file:///Users/apicard/Documents/SnowFlake/Project/SnowParkPython/Project_1/SnowparkDemo/snowpark-python-xgboost-realestate/AnnoncePriceLocations_A.xbmodel @SnowParkDemo_Stage overwrite=true").collect()

#bst.save_model("./AnnoncePriceLocations_B.xbmodel")
#mySnowSess.sql("put file:///Users/apicard/Documents/SnowFlake/Project/SnowParkPython/Project_1/SnowparkDemo/snowpark-python-xgboost-realestate/AnnoncePriceLocations_B.xbmodel @SnowParkDemo_Stage overwrite=true").collect()



In [None]:
mySnowSess.close();