In [None]:
import datetime as _hex_datetime
import json as _hex_json

import pandas as _hex_pandas

In [None]:
hex_scheduled = _hex_json.loads("false")

In [None]:
hex_user_email = _hex_json.loads("\"example-user@example.com\"")

In [None]:
hex_run_context = _hex_json.loads("\"logic\"")

In [None]:
hex_timezone = _hex_json.loads("\"America/Toronto\"")

In [None]:
hex_project_id = _hex_json.loads("\"4a2d9aab-1e21-476c-9dec-0b5f7c103158\"")

In [None]:
hex_color_palette = _hex_json.loads("[\"#4C78A8\",\"#F58518\",\"#E45756\",\"#72B7B2\",\"#54A24B\",\"#EECA3B\",\"#B279A2\",\"#FF9DA6\",\"#9D755D\",\"#BAB0AC\"]")

In [None]:
import io
import json
import os
import pickle
import re
import sys
import zipfile
from datetime import date
from tokenize import String

import cachetools
import numpy as np
import pandas as pd
import sklearn
import snowflake.snowpark
import spacy
from joblib import dump
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from snowflake.snowpark import functions as F
from snowflake.snowpark.functions import table_function, udf, udtf
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import (DateType, IntegerType, PandasSeries,
                                      PandasSeriesType, StringType,
                                      StructField, StructType, VariantType)

In [None]:
import hextoolkit

hex_snowflake_conn = hextoolkit.get_data_connection('sfpscogs-simon')
session = hex_snowflake_conn.get_snowpark_session()

session.use_schema('public')

In [None]:
# create file format to ingest training data
session.sql('''
    create or replace file format ff_pipe
        type = CSV
        field_delimiter = '|'
''')

# create the stage for python and model data
session.sql('create or replace stage raw_data').collect()
session.sql('create or replace stage model_data').collect()
session.sql('create or replace stage python_load').collect()

# create the directory stage for the data
session.sql('create or replace stage raw_data_stage directory = (enable = true)').collect()

# upload the unstructured file and stop words to the stages
session.file.put('en_core_web_sm.zip','@model_data',overwrite=True)
session.file.put('training_data.txt','@raw_data',auto_compress=False,overwrite=True)

session.file.put('new_reviews.txt','@raw_data',auto_compress=False,overwrite=True)

# refresh the stage
session.sql('alter stage raw_data_stage refresh').collect()

[]

In [None]:
session.sql('''
    create or replace table training_data as
        select  
            $2 product_id, 
            to_date($3) reviewdate,
            $4 reviewtext, 
            $5 sentiment
        from 
            @raw_data/training_data.txt (file_format => 'ff_pipe')
        where product_id is not null;
''').collect()

session.sql('''
    create or replace table new_reviews as
        select  
            $1 prodict_id,
            to_date($2) reviewdate, 
            $3 reviewtext
    from 
        @raw_data/new_reviews.txt (file_format => 'ff_pipe');
''').collect()

[Row(status='Table NEW_REVIEWS successfully created.')]

# Data Prep

We'll start this demo by first building our sentiment model, in order to do this we have a set of training data containing previous reviews and their classification for sentiment that requires cleaning and transforming.First we'll need to refine the text (remove punctuation, stopwords etc.) and then we'll want to make the sentiment classification more suitable for our algorithm.

---

To get started, lets take a look at the training data we have:



In [None]:
session.table("TRAINING_DATA").show(20)

And check the distribution of data:



In [None]:
import seaborn as sns

df = session.table('TRAINING_DATA') \
    .group_by(F.col('SENTIMENT')) \
    .agg(F.count(F.col('PRODUCT_ID')).alias('COUNT')).to_pandas()

sns.set(rc={'figure.figsize':(20,8)})
sns.barplot(x='SENTIMENT',y='COUNT',data=df)

We can see we have various reviews for products with their corresponding sentiment classification.

---

The first transformation will be to process the review text. To do this we create a UDF that will perform the following:

- Remove stop words
- Remove punctuation
- Remove currency values
- Lemmatize the text


Note that we create a vectorized UDF, so we can take advantage of batch processing in the UDF, additionally we cache the stopwords lexicon for better performance.



In [None]:
%%time
import spacy

session.add_import('@model_data/en_core_web_sm.zip')

@cachetools.cached(cache={})
def load_file(import_dir):
    input_file = import_dir + 'en_core_web_sm.zip'
    output_dir = '/tmp/en_core_web_sm' + str(os.getpid())
            
    with zipfile.ZipFile(input_file, 'r') as zip_ref:
        zip_ref.extractall(output_dir)
        
    return spacy.load(output_dir + "/en_core_web_sm/en_core_web_sm-2.3.0")    

@udf(name='remove_stopwords_vect',packages=['spacy==2.3.5','cachetools'], session=session, is_permanent=True, replace=True, max_batch_size=10000,stage_location='python_load',)
def remove_stopwords_vect(raw_text: PandasSeries[str]) -> PandasSeries[str]:
    nlp = load_file(sys._xoptions['snowflake_import_directory'])
    stop_words = nlp.Defaults.stop_words

    result = []
    
    for s in raw_text:
        doc = nlp(s)
        text = [str(t.lemma_) for t in doc if  
                t not in stop_words 
                and not t.is_punct 
                and not t.is_currency
                and t.lemma_ != '-PRON-']
        text = list(map(lambda x: x.replace(' ', ''), text))
        text = list(map(lambda x: x.replace('\n', ''), text))
        result.append(' '.join(token.lower() for token in text))
    
    return pd.Series(result)

In [None]:
session.sql('''
    select 
        remove_stopwords_vect(
                'This surfboard is amazing! It\\'s a perfect length and weight, and I really like the design. 
                 If I was looking for a board to start learning on, this would make a great option. 
                 Really good value for money for $900') as processed_text
''').collect()[0]['PROCESSED_TEXT']
#str_sentiment = df.iat[0,0]
#print(str_sentiment)

The next transformation we'll need to do is convert the string value for sentiment into a numeric value, in order to make it more optimized for our ML algorithm.To do this we can create a simple UDF to bin the sentiment string to a value.



In [None]:
# Create and upload the UDF to bin the rating to sentiment 
@udf(name='convert_rating',
     is_permanent=True,
     replace=True,
     stage_location='python_load')

def convert_rating(x: str) -> int:
    if x == 'NEGATIVE':
        return -1
    elif x == 'NEUTRAL':
        return 0
    elif x == 'POSITIVE':
        return 1

With these UDFs we can now run a query and see what our data will look like for training:



In [None]:
%%time
df = session.table('TRAINING_DATA') \
    .filter(
        F.col('REVIEWTEXT') != ''
    ) \
    .select( \
        F.col('PRODUCT_ID'),
        F.col('REVIEWDATE'),
        F.call_udf(
            'REMOVE_STOPWORDS_VECT',
            F.col('REVIEWTEXT')).alias('PROCESSED_REVIEW'),
        F.call_udf(
            'CONVERT_RATING',
            F.col('SENTIMENT')).alias('SENTIMENT')).show(20)

## Model Training

Next we want to train a model. Doing this in Snowflake is as simple are creating a Python Stored Procedure, which also allows us to re-run this when we want to retrain the model. Model training uses Snowflake Compute.The model will be saved to an internal stage, and can be used in a UDF for model inference within Snowflake.



In [None]:
# Create and upload a stored proc to train our sentiment model  
def train_sentiment_model(session: snowflake.snowpark.Session) -> float:        
    # build a pd with review data
    df = session.table('TRAINING_DATA') \
        .filter(
            F.col('REVIEWTEXT') != '') \
        .select(
            F.call_udf(
                'REMOVE_STOPWORDS_VECT',
                F.col('REVIEWTEXT')).alias('PROCESSED_TEXT'),
            F.call_udf(
                'CONVERT_RATING',
                F.col('SENTIMENT')).alias('SENTIMENT')).toPandas()
    
    index = df.index
    df['RANDOM'] = np.random.randn(len(index))
    train = df[df['RANDOM'] <= 0.8] # 0.8
    test = df[df['RANDOM'] > 0.8] # 0.8
    
    # vectorize the data
    vectorizer = CountVectorizer(token_pattern=r'\b\w+\b')
    train_matrix = vectorizer.fit_transform(train['PROCESSED_TEXT'])
    test_matrix = vectorizer.transform(test['PROCESSED_TEXT'])
    
    # split feature and label 
    x_train = train_matrix
    x_test = test_matrix
    y_train = train['SENTIMENT']
    y_test = test['SENTIMENT']
    
    # Logistic Regression Model
    lr = LogisticRegression(multi_class='multinomial', max_iter=10000)
    lr.fit(x_train,y_train)
    predictions = lr.predict(x_test)

    model_output_dir = '/tmp'

    # Save model file
    model_file = os.path.join(model_output_dir, 'model.joblib')
    dump(lr, model_file)
    session.file.put(model_file, "@model_data",overwrite=True)

    # Save vectorizer file
    vect_file = os.path.join(model_output_dir, 'vectorizer.joblib')
    dump(vectorizer, vect_file)
    session.file.put(vect_file, "@model_data",overwrite=True)

    return accuracy_score(y_test, predictions)

# Register the Stored Procedure
session.sproc.register(name='train_sentiment_model',
                       func=train_sentiment_model, 
                       packages=['snowflake-snowpark-python','pandas','scikit-learn', 'joblib'],
                       replace=True, 
                       is_permanent=True,
                       stage_location='python_load')

In [None]:
session.sql('use warehouse data_science').collect()

In [None]:
df = session.table('TRAINING_DATA') \
    .filter(
        F.col('REVIEWTEXT') != '') \
    .select(
        F.call_udf(
            'REMOVE_STOPWORDS_VECT',
            F.col('REVIEWTEXT')).alias('PROCESSED_TEXT'),
        F.call_udf(
            'CONVERT_RATING',
            F.col('SENTIMENT')).alias('SENTIMENT')).toPandas()

len(df)

In [None]:
session.call('TRAIN_SENTIMENT_MODEL')

## Model Deployment

With the model artifact produced from the Stored Procedure, we can create a UDF that can be used to infer sentiment for future data ingested into Snowflake.



In [None]:
session.clear_packages()
session.clear_imports()
session.add_import('@MODEL_DATA/model.joblib.gz')
session.add_import('@MODEL_DATA/vectorizer.joblib.gz')

@cachetools.cached(cache={})
def load_model(file_name):
    model_file_path = sys._xoptions.get("snowflake_import_directory") + file_name
    return load(model_file_path)

columns = ('NEGATIVE','NEUTRAL','POSITIVE')
    
@udf(name='predict_sentiment_vect',
     is_permanent=True,
     replace=True,
     stage_location='python_load',
     max_batch_size=1000,
     input_types=[PandasSeriesType(StringType())], 
     return_type=PandasSeriesType(VariantType()),
     packages=['pandas','scikit-learn','cachetools','joblib'])     
def predict_sentiment_vector(sentiment_str):  
    model = load_model('model.joblib.gz')
    vectorizer = load_model('vectorizer.joblib.gz')                            
    
    result = []
    
    for s in sentiment_str:        
        matrix = vectorizer.transform([s])
        
        df = pd.DataFrame(model.predict_proba(matrix),columns=columns)
                
        response = df.loc[0].to_json()
        result.append(json.loads(response))
        
    return pd.Series(result)

We can quickly test our UDF with a simple SQL call:



In [None]:
session.sql('''
    select predict_sentiment_vect('PRACTICALLY PERFECT IN EVERY WAY') sentiment
''').show()

## Scoring new data

Earlier on, we ingested a seperate dataset of reviews that had no sentiment associated. We’re going to use the model and UDF to now score these new records.

First we’re going to take our new data table and run it through our text processing UDF:



In [None]:
session.table('new_reviews').select(
    F.col('product_id'),
    F.col('review_date'),
    F.col('product_review'), 
    F.call_udf(
        'remove_stopwords_vect',
        F.col('product_review')).alias('processed_review')    
).write.save_as_table('new_reviews_processed',mode="overwrite", table_type="temporary")

In [None]:
df = session.table('new_reviews_processed').select(
    F.col('product_id'),
    F.col('review_date'),
    F.col('product_review'),
    F.col('processed_review'),
    F.call_udf(
        'predict_sentiment_vect',
        F.col('processed_review')).alias('sentiment'))

df = df.select(
    F.col('product_id'),
    F.col('review_date'),
    F.col('product_review'),
    F.col('processed_review'),
    F.col('sentiment')['NEGATIVE'].alias('negative'),
    F.col('sentiment')['NEUTRAL'].alias('neutral'),    
    F.col('sentiment')['POSITIVE'].alias('positive')
).write.save_as_table('new_reviews_scored',mode="overwrite")

session.table('new_reviews_scored').select(
    F.col('product_id'),
    F.col('review_date'),
    F.col('product_review'),  
    F.col('positive'),
    F.col('neutral'),
    F.col('negative')).show(50)