# Part1: Manual Load Fasttext Model
- Does **NOT** use pybay for model discovery

### Apply Fasttext model on Dataframe


In [3]:
import pyspark

sc: pyspark.context.SparkContext = SparkContext.getOrCreate()
session: pyspark.sql.session.SparkSession = spark

mock_text = [
        ("No its you then dont say its free shipment its not exist at all dumb ass", 1),
        ("Never will buy shit from your bitch ass and giving you a horrible review", 1),
        ("I don't think I've ever seen someone so bad at their job as you are.", 1),
        ("Shut up scammer , I know what you are trying to do", 1) 
]

df1 = spark.createDataFrame(data=mock_text, schema = ['text', 'prediction'])

import fasttext_classifier_simple 

udf_predict = udf(fasttext_classifier_simple.predict)

sc.addFile('/Users/thchang/Documents/dev/git/nlp/m2m/m2m_model/projects/offensive_content_detection/insult/model/model-insult-base.bin')
sc.addFile('/Users/thchang/Documents/dev/git/pyspark_env/fasttext_classifier_simple.py')

df2 = df1.withColumn('prediction', udf_predict(col('text')))
df2.show(10, False)

+------------------------------------------------------------------------+----------+
|text                                                                    |prediction|
+------------------------------------------------------------------------+----------+
|No its you then dont say its free shipment its not exist at all dumb ass|insult    |
|Never will buy shit from your bitch ass and giving you a horrible review|insult    |
|I don't think I've ever seen someone so bad at their job as you are.    |insult    |
|Shut up scammer , I know what you are trying to do                      |clean     |
+------------------------------------------------------------------------+----------+



### fasttext_classifier_simple.py

In [33]:
!cat /Users/thchang/Documents/dev/git/pyspark_env/fasttext_classifier_simple.py


import fasttext

model = fasttext.load_model('/Users/thchang/Documents/dev/git/nlp/m2m/m2m_model/projects/offensive_content_detection/insult/model/model-insult-base.bin')

def predict(msg):
	pred = model.predict([msg])[0][0][0]
	pred = pred.replace('__label__', '')
	return str(pred)


# Part 2: Model Inference Via Pybay

#### Create DataFrame

In [40]:
import pyspark
from pyspark.sql.functions import col, udf
import fasttext_classifier_pybay


sc: pyspark.context.SparkContext = SparkContext.getOrCreate()
session: pyspark.sql.session.SparkSession = spark

mock_text = [
        ("No its you then dont say its free shipment its not exist at all dumb ass", 1),
        ("Never will buy shit from your bitch ass and giving you a horrible review", 1),
        ("I don't think I've ever seen someone so bad at their job as you are.", 1),
        ("Shut up scammer , I know what you are trying to do", 1),
        ("Have a good day", 1)
]

df_raw = spark.createDataFrame(data=mock_text, schema = ['text', 'prediction'])

df_raw.select('text').show(10, False)

+------------------------------------------------------------------------+
|text                                                                    |
+------------------------------------------------------------------------+
|No its you then dont say its free shipment its not exist at all dumb ass|
|Never will buy shit from your bitch ass and giving you a horrible review|
|I don't think I've ever seen someone so bad at their job as you are.    |
|Shut up scammer , I know what you are trying to do                      |
|Have a good day                                                         |
+------------------------------------------------------------------------+



#### Use Pybay for Model Discovery

In [34]:
import pybay.core
registry = pybay.core.Registry()
#registry.list_models("m2m*")

def parse_model_metadata(model_name: str):
    meta_data=registry.get_artifact(model_name)

    base_path = registry.download_model_data(model_name)

    configuration = meta_data.factories['microservice'].microservice_data.configuration["models"][0]

    model_path=str(base_path) + configuration["model_path"].replace('/models/', '/')
    print(f'model_path={model_path}')
    
    return model_path, configuration

model_path, configuration = parse_model_metadata("m2m-offensive-content-insult-0.1")

model_path


2021-05-25 15:18:40,824 - pybay.core.factory.swift.swift_registry [MainThread  ] [INFO ]  Loading cached model data from path /Users/thchang/.cache/pybay/swift_models.json
2021-05-25 15:18:40,830 - pybay.core.factory.swift.swift_registry [MainThread  ] [DEBUG]  Downloading model metadata from https://os-object.vip.ebayc3.com/v1/KEY_45b296c6f29b4462b5aaedcac5255d99/pynlp-dev/swift_models_v5.json?temp_url_sig=a575d8ed65cff49368f3ca64959dff64f54c6d49&temp_url_expires=1625308181
2021-05-25 15:18:41,571 - pybay.core.factory.swift.swift_registry [MainThread  ] [INFO ]  Models metadata version 0.5
2021-05-25 15:18:41,579 - pybay.core.factory.swift.swift_registry [MainThread  ] [INFO ]  Found 165 models.
2021-05-25 15:18:41,581 - pybay.core.helpers.telemetry.telemetry [MainThread  ] [DEBUG]  Pushing 4 prometheus metrics
2021-05-25 15:18:42,252 - pybay.core.helpers.telemetry.telemetry [MainThread  ] [DEBUG]  Pushing 1 prometheus metrics
2021-05-25 15:18:43,029 - pybay.core.factory.swift.swift_m

downloading:   0%|          | 0/1 [00:00<?, ?it/s]

2021-05-25 15:18:43,051 - pybay.core.factory.swift.swift_model_data [MainThread  ] [DEBUG]  File /Users/thchang/.cache/pybay/fa0b6f6e-a9ad-4ec9-87d4-f64c28b39c0f/model-insult-base.bin exists, skipping: model-insult-base.bin
2021-05-25 15:18:43,054 - pybay.core.factory.swift.swift_model_data [MainThread  ] [DEBUG]  Model files are ready.
2021-05-25 15:18:43,055 - pybay.core.helpers.telemetry.telemetry [MainThread  ] [DEBUG]  Pushing 1 prometheus metrics


model_path=/Users/thchang/.cache/pybay/fa0b6f6e-a9ad-4ec9-87d4-f64c28b39c0f/model-insult-base.bin


'/Users/thchang/.cache/pybay/fa0b6f6e-a9ad-4ec9-87d4-f64c28b39c0f/model-insult-base.bin'

#### Apply Pybay FastTextClassifier on DataFrame

In [41]:
from pybay_model_tokenizer import *
from pyspark.sql.functions import col, udf

CLASSIFIER = None

def get_classifier():
    global CLASSIFIER
    if CLASSIFIER is None:
        
        tokenizer = SpacyTokenizer(name=configuration["spacy_model"])

        CLASSIFIER = FastTextClassifier(
            model_path=model_path, \
            tokenizer= tokenizer, \
            lowercase=True)

    return CLASSIFIER

def spacy_tokenize(msg):
    model = get_classifier()
    return CLASSIFIER.classify(msg).tags[0]

tokenize_udf = session.udf.register("tokenize_udf", spacy_tokenize)

df_inferenced = df_raw.withColumn('prediction', tokenize_udf(col('text')))

df_inferenced.show(10, False)

+------------------------------------------------------------------------+----------+
|text                                                                    |prediction|
+------------------------------------------------------------------------+----------+
|No its you then dont say its free shipment its not exist at all dumb ass|insult    |
|Never will buy shit from your bitch ass and giving you a horrible review|insult    |
|I don't think I've ever seen someone so bad at their job as you are.    |insult    |
|Shut up scammer , I know what you are trying to do                      |insult    |
|Have a good day                                                         |clean     |
+------------------------------------------------------------------------+----------+



#### Cat Pybay File

In [36]:
!cat /Users/thchang/Documents/dev/git/pyspark_env/pybay_model_tokenizer.py

from typing import Optional, Iterable, Mapping, List, Union
from pathlib import Path

import fasttext

#Cannot serialize SPACY!!! https://futurice.com/blog/classifying-text-with-fasttext-in-pyspark
#import spacy  
#from spacy import load

from pybay.types.nlp.tokenizer import Tokenizer, Tokenization
from pybay.types.nlp.tagger import TextClassifier, TextClassificationResult


class SpacyTokenizer(Tokenizer):
    """
    Simple tokenizer based on Spacy
    """

    #nlp: spacy.language.Language   
    name: str

    def __init__(self, name: str):
        """
        Ctor
        :param name: Spacy Name of tokenizer (e.g. ``en_core_web_sm``)
        """
        #TWC self.nlp = spacy.load(name, disable=['ner', 'tagger', 'parser', 'textcat'])
        self.name = name

    def tokenize(self, sentence: str) -> Tokenization:
        #return Tokenization(tokens=[token.text for token in self.nlp(sentence)])
        return Tokenization(tokens=[token for token in se

# Appendix

In [None]:
for text, label in df3.rdd.map(lambda row: (row[0], row[1])).collect():
    print(f'{text} --> {label}')