# Productionization

## Data Scientists, Engineers, and Analysts

### Prototyping, Deployment, and Maintenance

### Notebooks and Scripts

## Databricks: Your Unified Data Analytics Platform

### Support for Big Data

### Support For Multiple Programming Languages

### Support For ML Frameworks

### Support for Model Repository, Access Control, Data Lineage, and Versioning

## Databricks Setup

### Set Up Access to S3 Bucket

### Set Up Libraries

### Create Cluster

### Create Notebook

```python
# Make directory in DBFS
dbutils.fs.mkdirs("dbfs:/databricks/models/spacy")

# Copy files from S3 to DBFS
dbutils.fs.cp("s3a://nlp-demo/models/spacy/",
              "dbfs:/databricks/models/spacy/", True)

# Confirm files in DBFS
display(dbutils.fs.ls("dbfs:/databricks/models/spacy/"))

# Make directory in DBFS
dbutils.fs.mkdirs("dbfs:/databricks/scripts/")

# Put script in DBFS
dbutils.fs.put("dbfs:/databricks/scripts/spacy_with_models.sh", \
"""pip install /dbfs/databricks/models/spacy/en_core_web_lg-2.3.1.tar.gz \
pip install /dbfs/databricks/models/spacy/en_ner_base_V3-0.0.0.tar.gz \
pip install /dbfs/databricks/models/spacy/\
en_textcat_prodigy_V3_base_full-0.0.0.tar.gz""", True)

# Confirm file in DBFS
display(dbutils.fs.ls("dbfs:/databricks/scripts/spacy_with_models.sh"))
```

### Enable Init Script and Restart Cluster

### Run Speed Test - Inference on NER using SpaCy

In [None]:
# Load libraries
# Python
import spacy
import numpy as np
import pandas as pd

# PySpark
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [None]:
# Load data
inputPath = "s3a://nlp-demo/ag_dataset/prepared/train_prepared.csv" \
 # path to your S3 bucket
df = spark.read.format('csv').options(header='true', inferSchema='true', \
 quote="\"", escape= "\"").load(inputPath)

In [None]:
# Cache
df.cache()

In [None]:
# View shape of data
print((df.count(), len(df.columns)))

In [None]:
# Define schema
schema = ArrayType(StructType([
    StructField("text", StringType(), False),
    StructField("start_char", IntegerType(), False),
    StructField("end_char", IntegerType(), False),
    StructField("label", StringType(), False)
]))

In [None]:
# Define Function to Get Entities
def get_entities(text):
    global nlp
    try:
        doc = nlp(str(text))
    except:
        nlp = spacy.load('en_core_web_lg')
        doc = nlp(str(text))
    return [[e.text, e.start_char, e.end_char, e.label_] for e in doc.ents]

get_entities_udf = udf(get_entities, schema)

In [None]:
# Get Entities
documents_df = df.withColumn('entities', get_entities_udf('description'))

In [None]:
# Write parquet
documents_df.write.parquet(\
 "s3a://nlp-demo/ag_dataset/prepared/write_test.parquet", \
 mode="overwrite")

In [None]:
# Import libraries
'''Main Libraries'''
import numpy as np
import pandas as pd

# Connect to Google Drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
write_path = '/content/drive/My Drive/Applied-NLP-in-the-Enterprise'

# Install SpaCy
!pip install -U spacy
!python -m spacy download en_core_web_lg

In [None]:
# Load libraries
import spacy
import numpy as np
import pandas as pd
import time
 
# Start timer
start_time = time.time()
 
# Define function to read data
def read_data(file):
    read_path = '/content/drive/My Drive/Applied-NLP-in-the-Enterprise'
    data = pd.read_csv(read_path+file)
    return data
 
# Read data
data = read_data('/data/ag_dataset/prepared/train_prepared.csv')
 
# Load model
nlp = spacy.load("en_core_web_lg")
 
# Load time
load_time = time.time()
print("Time to load data and model: ", np.round(load_time-start_time,2))
 
# Apply NLP model
data["entities"] = data["description"].apply(lambda x: \
 [(e.text, e.start_char, e.end_char, e.label_) for e in nlp(x).ents])
 
# End timer
end_time = time.time()
print("Time to perform NER: ", np.round(end_time-load_time,2))
print("Total time: ", np.round(time.time()-start_time,2))

## Machine Learning Jobs

### Production Pipeline Notebook

In [None]:
# Load Libraries
# Python
import spacy
import numpy as np
import pandas as pd

# PySpark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.types import *

In [None]:
# Load Data
inputPath = getArgument("inputPath", "default")
df = spark.read.format('csv').options(header='true', inferSchema='true', \
 quote="\"", escape= "\"").load(inputPath)

In [None]:
# Define Schema
schema = ArrayType(StructType([
    StructField("text", StringType(), False),
    StructField("start_char", IntegerType(), False),
    StructField("end_char", IntegerType(), False),
    StructField("label", StringType(), False)
]))

In [None]:
# Define Function to Get Entities
def get_entities(text):
    global nlp
    try:
        doc = nlp(text)
    except:
        nlp = spacy.load('en_ner_base_V3')
        doc = nlp(text)
    return [[e.text, e.start_char, e.end_char, e.label_] for e in doc.ents]

get_entities_udf = udf(lambda x: get_entities(x), schema)

In [None]:
# Get Entities
documents_df = df.withColumn('entities', get_entities_udf('description'))

In [None]:
# Write Parquet
outPath = getArgument("outputPath", "default")
documents_df.write.format("parquet").mode("overwrite").save(outPath)

### Scheduled Machine Learning Jobs

### Event-Driven Machine Learning Pipeline

In [None]:
const https = require("https");

exports.handler = (event, context, callback) => {
  var data = JSON.stringify({
    "job_id": XXX
  });

  var options = {
     host: "XXX-XXXXXXX-XXX.cloud.databricks.com",
     port: 443,
     path: "/api/2.0/jobs/run-now",
     method: "POST",
     // authentication headers
     headers: {
      "Authorization": "Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
      "Content-Type": "application/json",
      "Content-Length": Buffer.byteLength(data)
     }
  };

  var request = https.request(options, function(res){
    var body = "";

    res.on("data", function(data) {
      body += data;
    });

    res.on("end", function() {
      console.log(body);
    });

    res.on("error", function(e) {
      console.log("Got error: " + e.message);
    });

  });

  request.write(data);
  request.end();
};


## MLflow

### Log and Register Model

In [None]:
# Load Libraries
# SpaCY
import spacy 

# MLflow
import mlflow
import mlflow.spacy

# Load model
nlp = spacy.load("en_textcat_prodigy_V3_base_full")

In [None]:
# Print metadata
nlp.meta

In [None]:
# MLflow Tracking
with mlflow.start_run(run_name='SpaCy-TextCat-Prodigy-V3-Base-Full'):
    mlflow.set_tag('model_flavor', 'spacy')
    mlflow.spacy.log_model(spacy_model=nlp, artifact_path='model')
    mlflow.log_metric('textcat_score', 91.774875419)
    my_run_id = mlflow.active_run().info.run_id

### MLflow Model Serving

In [None]:
# Connect to Google Drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
# Load libraries
import numpy as np
import pandas as pd
 
# Define function to read data
def read_data(file):
    read_path = '/content/drive/My Drive/Applied-NLP-in-the-Enterprise'
    data = pd.read_csv(read_path+file)
    return data
 
# Read data
data = read_data('/data/ag_dataset/prepared/train_prepared.csv')

In [None]:
# Convert to JSON
data.loc[:10,"description"].to_json(path_or_buf= \
        '/content/drive/My Drive/Applied-NLP-in-the-Enterprise/data/\
        ag_dataset/prepared/sample.json', orient="records")

In [None]:
# Call the Model - CURL
MODEL_VERSION_URI = XXXXXX #the model path
DATABRICKS_TOKEN = XXXXXX #secret access token
JSON_PATH = XXXXXX #path to the JSON we created earlier in Colab
 
!curl -u token:$DATABRICKS_TOKEN -H \
 "Content-Type: application/json; format=pandas-records" \
 -d@$JSON_PATH $MODEL_VERSION_URI


In [None]:
# Define Function to Call the Model in Python
import requests
 
def score_model(model_uri, databricks_token, data):
    headers = {
        "Authorization": 'Bearer '+ databricks_token,
        "Content-Type": "application/json; format=pandas-records",
      }
    data_json = data if isinstance(data, list) else data.to_list()
    response = requests.request(method='POST', headers=headers,
        url=model_uri, json=data_json)
    if response.status_code != 200:
        raise Exception(f"Request failed with status {response.status_code},
            {response.text}")
    return response.json()

In [None]:
# Score the Model
MODEL_VERSION_URI = XXXXXX # the model path
DATABRICKS_TOKEN = XXXXXX # secret access token
 
score_model(MODEL_VERSION_URI, DATABRICKS_TOKEN, data.loc[:10,"description"])

## Alternatives to Databricks

### Amazon Sagemaker

### Saturn Cloud

## Conclusion