# Notebook to use a model

Once the model is trained and uploaded to the artifact server, we can use it in a new notebook. Here, we apply the model in a **distributed mode** with **PySpark**. For information, a version of pandas is available in PySpark and allows people unfamiliar with PySpark to benefit from the advantages of distributed.

Since the datascientist does not have access to the production dataset, this notebook uses the test dataset and will be packaged to generate a punchline. The production data set will be used in this punchline.

### Adding dependencies to the environment

We reuse the pex created in the previous notebook and add the model in the dependencies list.

In [None]:
%%punch_dependencies
additional-pex:demo:dependencies:1.0.0
model:demo:credit_card:1.0.0

++ java -Xmx1g -Xms256m -Dlog4j.configurationFile=/punch/conf/log4j2/log4j2-stdout.xml -cp /punch/resourcectl.jar com.github.punchplatform.resourcectl.ResourceCtl -u http://artifacts-server.punch-artifacts:4245 download -r additional-pex:demo:dependencies:1.0.0 -o /usr/share/punch/extlib/pyspark


Resource additional-pex:demo:dependencies:1.0.0 downloaded to /usr/share/punch/extlib/pyspark/dependencies-1.0.0.pex


++ java -Xmx1g -Xms256m -Dlog4j.configurationFile=/punch/conf/log4j2/log4j2-stdout.xml -cp /punch/resourcectl.jar com.github.punchplatform.resourcectl.ResourceCtl -u http://artifacts-server.punch-artifacts:4245 download -r model:demo:credit_card:1.0.0


Resource model:demo:credit_card:1.0.0 downloaded to /usr/share/punch/artifacts/demo/credit_card/1.0.0/credit_card_1.0.0.zip


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


<IPython.core.display.Javascript object>

### Changing number of executors

You can change spark configuration with [punch_spark_session](https://punch-1.gitbook.io/punch-doc/v/welcome-to-the-punch/applications/jupyter/magic-commands#punchsparksession) to increase the number of executors for example.

In [1]:
%%punch_spark_session -f
{
    "spark.executor.instances":3
}

22/12/14 11:03:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 59054)


### Importing modules

We chose to work with pyspark.pandas and our model is a mlflow package, so we will use the loading model function provided by them.

In [2]:
import pandas as pd
import pyspark.pandas as pypd
from pyspark.pandas.mlflow import load_model



### Loading the model

Punch provides you a magic line to get back the path of the model into a variable. We can thus use this variable to load the model according to the model type (ex mlflow)

In [3]:
%punch_get_model --model demo:credit_card:1.0.0 --output model_path

List of files in the model directory:
	 requirements.txt
	 credit_card_1.0.0.zip
	 conda.yaml
	 MLmodel
	 model.pkl
	 python_env.yaml

Model path is available in model_path variable.


In [4]:
credit_card_model = load_model(model_uri=model_path, predict_type="double")

### Reading data

In [5]:
%%punch_source --type file --name data -o 
options:
    header: True
path: s3a://demo/test/

22/12/14 11:04:15 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


[Stage 0:>                                                          (0 + 1) / 1]

Data is available in data variable.
Execution time: 0:00:04.233289


                                                                                

### Converting PySpark SQL DataFrame into pyspark.pandas

In [6]:
data = pypd.DataFrame(data)
data = data[['distance_from_home', 'distance_from_last_transaction',
       'ratio_to_median_purchase_price', 'repeat_retailer', 'used_chip',
       'used_pin_number', 'online_order', 'fraud']]

### Adding parameters cell

You can define parameters whose value can be overridden when the punchline is executed.

In [7]:
#parameters
nb_rows = 10000

In [8]:
data = data[0:nb_rows]

### Application of the model

In [9]:
features = data.drop('fraud', axis=1)
prediction = credit_card_model.predict(features)
features["prediction"] = prediction
columns = list(features.columns)
columns.remove("prediction")
everything = data.merge(features, on=columns)
everything.head()

                                                                                

Unnamed: 0,distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,fraud,prediction
0,11.18884245347924,0.0677842994751078,1.659848080721224,1.0,0.0,0.0,1.0,0.0,0.0
1,8.359727748339491,0.1862579567074051,0.4952585147507252,1.0,1.0,0.0,0.0,0.0,0.0
2,11.401608276239754,17.712807993684493,2.364811107092758,1.0,0.0,0.0,0.0,0.0,0.0
3,3.102588133740203,0.2588216525296174,4.853085489890698,1.0,1.0,0.0,0.0,0.0,0.0
4,4.660351104672886,2.729079523776509,5.2572618573531855,1.0,0.0,0.0,1.0,1.0,1.0


In [10]:
everything.groupby(["fraud", "prediction"]).size()

                                                                                

fraud  prediction
0.0    0.0           9144
1.0    1.0            855
0.0    1.0              1
dtype: int64

### Save results

In [11]:
everything = everything.to_spark()



In [12]:
%%punch_sink --type file -df everything
options:
    header: True
format: csv
path: s3a://demo/results/

22/12/14 11:05:37 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.




Data saved.
Execution time: 0:00:13.247500


[Stage 12:>                                                         (0 + 1) / 1]                                                                                