## __Sentiment Analysis using IMDB Movie Reviews__

### __Index__

<ol>
  <li><a href='#section1'>Initializations</a></li>
  <li><a href='#section2'>Loading Credentials</a></li>
  <li><a href='#section3'>Create Session</a></li>
  <li><a href='#section4'>Import packages and libraries in Snowflake</a></li>
  <li><a href='#section5'>Create a Staging Environment</a></li>
  <li><a href='#section6'>Create tables in PUBLIC</a></li>
  <li><a href='#section7'>Loading IMDB datasets into the Stage</a></li>
  <li><a href='#section8'>Training Model</a></li>
  <li><a href='#section9'>Import and Deploy Serialized Model</a></li>  
  <li><a href='#section10'>Prediction</a></li>
  <li><a href='#section11'>Compare Prediction with Output</a></li>
  <li><a href='#section12'>Pushing the Prediction to Snowflake</a></li>
</ol>

#### <a id='isection1'> __Initializations__ </a>

In [1]:
# pylint=disable-all

from sklearn import svm
from joblib import dump
from dotenv import load_dotenv

from nltk.corpus import stopwords
from snowflake.snowpark import functions as fn

from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as fn
from snowflake.snowpark.functions import udf, sproc, col
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType, Variant

import os
import sys
import joblib
import zipfile
import cachetools
import numpy as np
import pandas as pd
import sklearn.feature_extraction.text as txt

In [2]:
from snowflake.snowpark import version

print(f"Snowflake Snowpark Version - {version.VERSION}")

Snowflake Snowpark Version - (1, 10, 0)


#### <a id='isection2'> __Load Credentials__ </a>

In [3]:
ENV = "./credentials.env"
load_dotenv(ENV)

True

#### <a id='isection3'> __Create Session__ </a>

In [7]:
def get_snowpark_session() -> Session:
    connection_parameters = {
        "ACCOUNT": os.environ.get("ACCOUNT"),
        "USER": os.environ.get("MY_USER"),
        "PASSWORD": os.environ.get("MY_PASS"),
        "ROLE": os.environ.get("SP_ADMIN_ROLE"),
        "DATABASE": os.environ.get("DB"),
        "SCHEMA": os.environ.get("SCHEMA"),
        "WAREHOUSE": os.environ.get("WAREHOUSE")
    }

    return Session.builder.configs(connection_parameters).create()

session = get_snowpark_session()

print(f"\n{session.sql('USE ROLE ACCOUNTADMIN').collect()}")
print(session.sql('USE IMDB.PUBLIC').collect())
print(session.sql('SELECT current_role(), current_warehouse(), current_database(), current_schema()').collect(), end='\n')

[Row(status='Statement executed successfully.')]
[Row(status='Statement executed successfully.')]
[Row(CURRENT_ROLE()='ACCOUNTADMIN', CURRENT_WAREHOUSE()='SNOWPARK_ETL_WAREHOUSE', CURRENT_DATABASE()='IMDB', CURRENT_SCHEMA()='PUBLIC')]


#### <a id='isection4'> __Import packages and libraries in Snowflake__ </a>

In [9]:
session.clear_imports()
session.clear_packages()

session.add_packages("snowflake-snowpark-python", "scikit-learn", "pandas", "numpy", "nltk", "joblib", "cachetools")

The version of package 'snowflake-snowpark-python' in the local environment is 1.10.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.
The version of package 'scikit-learn' in the local environment is 1.3.2, which does not fit the criteria for the requirement 'scikit-learn'. Your UDF might not work when the package version is different between the server and your local environment.
The version of package 'pandas' in the local environment is 2.1.3, which does not fit the criteria for the requirement 'pandas'. Your UDF might not work when the package version is different between the server and your local environment.
The version of package 'numpy' in the local environment is 1.26.2, which does not fit the criteria for the requirement 'numpy'. Your UDF might not work when the package version is different between the server and your local environment

#### <a id='isection5'> __Create a Staging Environment__ </a>

In [8]:
session.sql("CREATE STAGE IF NOT EXISTS sentiment_model").collect()

[Row(status='Stage area SENTIMENT_MODEL successfully created.')]

#### <a id='isection6'> __Create tables in PUBLIC__ </a>

In [73]:
print(session.sql('''CREATE TABLE PUBLIC.TRAIN_DATASET(REVIEW VARCHAR, SENTIMENT VARCHAR);''').collect())
print(session.sql('''CREATE TABLE PUBLIC.TEST_DATASET(REVIEW VARCHAR, SENTIMENT VARCHAR);''').collect())

[Row(status='Table TRAIN_DATASET successfully created.')]
[Row(status='Table TEST_DATASET successfully created.')]


In [74]:
session.sql("SHOW TABLES LIKE '%_DATASET';").show()

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"created_on"                      |"name"         |"database_name"  |"schema_name"  |"kind"  |"comment"  |"cluster_by"  |"rows"  |"bytes"  |"owner"       |"retention_time"  |"automatic_clustering"  |"change_tracking"  |"search_optimization"  |"search_optimization_progress"  |"search_optimization_bytes"  |"is_external"  |"enable_schema_evolution"  |"owner_role_type"  |"is_event"  |"budget"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### <a id='isection7'> __Loading IMDB datasets into the Stage__ </a>

In [None]:
def extract_zip(zip_file_path, extraction_path):
    for path in zip_file_path:
        with zipfile.ZipFile(path, 'r') as zip_ref:
            zip_ref.extractall(extraction_path)

zip_file_path = ['./dataset/TEST_DATASET.zip', './dataset/TRAIN_DATASET.zip']
extraction_path = './dataset'

extract_zip(zip_file_path, extraction_path)

In [75]:
train_dataset_path = "./dataset/TRAIN_DATASET.csv"
test_dataset_path = "./dataset/TEST_DATASET.csv"

IMDB_stage = "@sentiment_model"

session.file.put(train_dataset_path, IMDB_stage, auto_compress=False, overwrite=True)
session.file.put(test_dataset_path, IMDB_stage, auto_compress=False, overwrite=True)

[PutResult(source='TEST_DATASET.csv', target='TEST_DATASET.csv', source_size=13868564, target_size=13868576, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [76]:
print(session.sql('''COPY INTO TRAIN_DATASET FROM @sentiment_model/TRAIN_DATASET.csv FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"')''').collect())
print(session.sql('''COPY INTO TEST_DATASET FROM @sentiment_model/TEST_DATASET.csv FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"')''').collect())

[Row(file='sentiment_model/TRAIN_DATASET.csv', status='LOADED', rows_parsed=64642, rows_loaded=64642, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]
[Row(file='sentiment_model/TEST_DATASET.csv', status='LOADED', rows_parsed=10373, rows_loaded=10373, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]


#### __View Tables__

In [77]:
session.table("TRAIN_DATASET").show()

--------------------------------------------------------------------
|"REVIEW"                                            |"SENTIMENT"  |
--------------------------------------------------------------------
|"I gave this a 2, and it only avoided a 1 becau...  |negative     |
|"I really liked this movie. If other people wan...  |positive     |
|"Robin Williams and Kurt Russell play guys in t...  |positive     |
|"Got this the other day from the Creators on DV...  |positive     |
|"I guess I am coming late to the party. I just ...  |negative     |
|"I recently rented this video after seeing ""Fi...  |positive     |
|"My first question after seeing this film was, ...  |negative     |
|"Rise of the Undead starts as some huge nuclear...  |negative     |
|"One of the best parts of Sundance is seeing mo...  |positive     |
|"Jane Eyre has always been my favorite novel! W...  |positive     |
--------------------------------------------------------------------



In [79]:
session.table("TEST_DATASET").show()

--------------------------------------------------------------------
|"REVIEW"                                            |"SENTIMENT"  |
--------------------------------------------------------------------
|REVIEW                                              |SENTIMENT    |
|aking this film into a monumental success simpl...  |negative     |
|"I remember seeing this film years ago on, I th...  |positive     |
|"A truly masterful piece of filmmaking. It mana...  |negative     |
|"Terrible. There's no way to get around it. A s...  |negative     |
|"Take a few dark and stormy nights, fog coming ...  |negative     |
|"I enjoyed the beautiful scenery in this movie ...  |negative     |
|"*********Ten out of Ten Stars********* <br /><...  |positive     |
|"This film was total rubbish. I was sitting wat...  |negative     |
|"Lady and the Tramp II: Scamp's Adventure is a ...  |positive     |
--------------------------------------------------------------------



#### <a id='isection8'> __Training Model__ </a>

In [94]:
def train_model_review_pipline(session : Session, train_dataset_name: str) -> Variant:
        
    train_dataset = session.table(train_dataset_name)
    train_dataset_flag = train_dataset.withColumn("SENTIMENT_FLAG", fn.when(train_dataset.SENTIMENT == "positive", 1).otherwise(2))
    
    nb_record = train_dataset_flag.count()
    
    train_x = train_dataset_flag.toPandas().REVIEW.values
    train_y = train_dataset_flag.toPandas().SENTIMENT_FLAG.values
    
    print('\nSize of train_x : ', len(train_x))
    print('Size of train_y : ', len(train_y), end = '\n')
    
    print('Configuring parameters ...\n')
    
    # Bags of Words: parameters
    
    analyzer = 'word'  # {‘word’, ‘char’, ‘char_wb’} - Considers each word as a token.
    ngram_range = (1, 3)   # Considering uni-grams, bi-grams, and tri-grams.
    token = "[\\w']+\\w\\b"    # Regex expression considering tokens.
    binary = True   # Considers the encoding (1 / 0) to be present/absent rather than term-frequency.
    max_df = 0.02    #50. * 1./len(train_x) -  Terms appearing in more than 2% of the documents are ignored.
    min_df = 1 * 1./len(train_x)    # Terms appearing in less than 1 document are ignored.
    svm_max_iter = 100  # An upper limit on the no. of iterations the solver can perform while optimizing the model after which it stops.
    svm_c = 1.8  # The penalty parameter of the error term controls the trade-off between achieving a low training error and a low testing error. 1.8 is a moderate regularization strength.
    
    max_features = 100000
    languages = ['english']
    lowercase = True    
    strip_accents = 'ascii' #  {‘ascii’, ‘unicode’, None} - Removes accents using ASCII encoding and performs other character normalization during the preprocessing step.
    
    print('\nBuilding Sparse Matrix ...\n')
    Cvec = txt.CountVectorizer(
        token_pattern=token, \
        ngram_range=ngram_range, \
        analyzer=analyzer,\
        max_df=max_df, \
        min_df=min_df, \
        vocabulary=None, 
        binary=binary)

    # Training Model
    bow = Cvec.fit_transform(train_x)
    print(f"\nVocabulary Size : {len(Cvec.vocabulary_.keys())}\n")
    
    print('\nFitting model ...\n')
    model = svm.LinearSVC(C=svm_c, max_iter=svm_max_iter)
    print(f"\nmodel.fit - {model.fit(bow, train_y)}\n")
    
    # Upload the Vectorizer (BOW) to a stage
    print('\nUploading the Vectorizer (BOW) to a stage\n')
    session.sql('CREATE STAGE IF NOT EXISTS sentiment_model').collect()

    model_output_dire = '/tmp'
    model_file = os.path.join(model_output_dire, 'vect_review.joblib')

    dump(Cvec, model_file, compress=True)

    session.file.put(model_file, "@sentiment_model", auto_compress=False, overwrite=True)
    
    # Upload trained model to a stage
    print('\nUpload trained model to a stage\n')
    session.sql('CREATE STAGE IF NOT EXISTS sentiment_model').collect()
    
    model_output_dire = '/tmp'
    model_file = os.path.join(model_output_dire, 'model_review.joblib')
    
    dump(model, model_file, compress=True)
    
    session.file.put(model_file, "@sentiment_model", auto_compress=False, overwrite=True)
    
    return {"STATUS": "SUCCESS", "R2 Score Train": str(model.score(bow, train_y))}

#### __Pushing_the_model_into_Snowflake__

In [95]:
val = session.sproc.register(func=train_model_review_pipline, name="train_model_review_pipline", replace=True)

#### __Run Training__

In [96]:
train_result = session.call("train_model_review_pipline", "TRAIN_DATASET")

'{\n  "R2 Score Train": "1.0",\n  "STATUS": "SUCCESS"\n}'

In [130]:
pd.DataFrame(session.sql("LIST '@sentiment_model'").collect()).drop(columns=['md5', 'last_modified'])

# The matrix (vect_review - ~821MB) takes more Memory than the model (model_review - ~276MB).

Unnamed: 0,name,size
0,sentiment_model/TEST_DATASET.csv,13868576
1,sentiment_model/TRAIN_DATASET.csv,86080208
2,sentiment_model/model_review.joblib,34563792
3,sentiment_model/vect_review.joblib,102731600


#### <a id='isection9'> __Import and Deploy Serialized Model__ </a>

#### __Import the Serialized Model__

In [131]:
session.add_import("@sentiment_model/model_review.joblib")
session.add_import("@sentiment_model/vect_review.joblib")

#### __Deploy the model using Python UDF__

In [134]:
# To leverage the cache speed for loading the model and matrix in the UDF prediction function, we add the cache decorator.

@cachetools.cached(cache={})
def load_file(filename):
    
    import_dir = sys._xoptions.get("snowflake_import_directory")
    
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

In [136]:
%%time

@udf(name='predict_review', is_permanent = False, stage_location = '@sentiment_model', replace=True, session=session)
def predict_review(args: list) -> float:

    model = load_file("model_review.joblib")
    vec = load_file("vect_review.joblib")
        
    features = list(["REVIEW", "SENTIMENT_FLAG"])
    
    row = pd.DataFrame([args], columns=features)
    bowTest = vec.transform(row.REVIEW.values)
    
    return model.predict(bowTest)

CPU times: total: 641 ms
Wall time: 3.07 s


#### <a id='isection10'> __Prediction__ </a>

In [143]:
test_dataset = session.table("TEST_DATASET")

new_df = test_dataset.withColumn(
    "SENTIMENT_FLAG", 
    fn.when(test_dataset.SENTIMENT == "positive", 1).otherwise(2)
)
print(f"Number of records in the Test Dataframe: {new_df.count()}\n")

new_df.show()

Number of records in the Test Dataframe: 10373

---------------------------------------------------------------------------------------
|"REVIEW"                                            |"SENTIMENT"  |"SENTIMENT_FLAG"  |
---------------------------------------------------------------------------------------
|REVIEW                                              |SENTIMENT    |2                 |
|aking this film into a monumental success simpl...  |negative     |2                 |
|"I remember seeing this film years ago on, I th...  |positive     |1                 |
|"A truly masterful piece of filmmaking. It mana...  |negative     |2                 |
|"Terrible. There's no way to get around it. A s...  |negative     |2                 |
|"Take a few dark and stormy nights, fog coming ...  |negative     |2                 |
|"I enjoyed the beautiful scenery in this movie ...  |negative     |2                 |
|"*********Ten out of Ten Stars********* <br /><...  |positive     |1   

In [144]:
%%time

new_df.select(
    new_df.REVIEW, 
    new_df.SENTIMENT, 
    new_df.SENTIMENT_FLAG, 
    fn.call_udf(
        "predict_review", 
        fn.array_construct(col("REVIEW"), col("SENTIMENT_FLAG"))
    ).alias('PREDICTED_REVIEW')
).show()

------------------------------------------------------------------------------------------------------------
|"REVIEW"                                            |"SENTIMENT"  |"SENTIMENT_FLAG"  |"PREDICTED_REVIEW"  |
------------------------------------------------------------------------------------------------------------
|REVIEW                                              |SENTIMENT    |2                 |2.0                 |
|aking this film into a monumental success simpl...  |negative     |2                 |2.0                 |
|"I remember seeing this film years ago on, I th...  |positive     |1                 |1.0                 |
|"A truly masterful piece of filmmaking. It mana...  |negative     |2                 |2.0                 |
|"Terrible. There's no way to get around it. A s...  |negative     |2                 |2.0                 |
|"Take a few dark and stormy nights, fog coming ...  |negative     |2                 |2.0                 |
|"I enjoyed the bea

#### <a id='isection11'> __Compare Prediction with Output__ </a>

#### __Create table for comparison__

In [145]:
session.sql(
    "CREATE OR REPLACE VIEW PUBLIC.VIEW_TEST_DATASET AS \
            SELECT REVIEW, SENTIMENT, CASE \
                    WHEN (SENTIMENT = 'positive') THEN 1 :: bigint \
                    ELSE 2 :: bigint \
                END AS SENTIMENT_FLAG \
            FROM \
                TEST_DATASET"
).collect()

[Row(status='View VIEW_TEST_DATASET successfully created.')]

#### __Review__

In [146]:
%%time

session.sql(
    "SELECT REVIEW, SENTIMENT, SENTIMENT_FLAG, \
            predict_review(ARRAY_CONSTRUCT( \
                                    REVIEW, \
                                    SENTIMENT_FLAG)) as PREDICTED_REVIEW \
                                    from VIEW_TEST_DATASET"
).show()

------------------------------------------------------------------------------------------------------------
|"REVIEW"                                            |"SENTIMENT"  |"SENTIMENT_FLAG"  |"PREDICTED_REVIEW"  |
------------------------------------------------------------------------------------------------------------
|REVIEW                                              |SENTIMENT    |2                 |2.0                 |
|aking this film into a monumental success simpl...  |negative     |2                 |2.0                 |
|"I remember seeing this film years ago on, I th...  |positive     |1                 |1.0                 |
|"A truly masterful piece of filmmaking. It mana...  |negative     |2                 |2.0                 |
|"Terrible. There's no way to get around it. A s...  |negative     |2                 |2.0                 |
|"Take a few dark and stormy nights, fog coming ...  |negative     |2                 |2.0                 |
|"I enjoyed the bea

In [147]:
session.sql(
    "SELECT REVIEW, SENTIMENT, SENTIMENT_FLAG, \
            predict_review(ARRAY_CONSTRUCT( \
                                    REVIEW, \
                                    SENTIMENT_FLAG)) as PREDICTED_REVIEW \
                                    from VIEW_TEST_DATASET"
).toPandas().head()

Unnamed: 0,REVIEW,SENTIMENT,SENTIMENT_FLAG,PREDICTED_REVIEW
0,"""This movie was absolutely ghastly! I cannot f...",negative,2,2.0
1,"""Abu, THE THIEF OF BAGDAD, helps King Ahmed re...",positive,1,1.0
2,"""It kept me on the edge of my seat. True, the ...",positive,1,1.0
3,"""This Cannon Movie Tale is the worst of the lo...",negative,2,2.0
4,I read some previous comments stating that thi...,positive,1,1.0


#### <a id='isection12'> __Pushing the Prediction to Snowflake__ </a>

In [149]:
%%time

# Using SQL

'''
session.sql(
"SELECT REVIEW, SENTIMENT, SENTIMENT_FLAG, \
predict_review(ARRAY_CONSTRUCT(REVIEW, SENTIMENT_FLAG)) as PREDICTED_REVIEW \
FROM VIEW_TEST_DATASET SAMPLE (10 rows)") \
.write.mode('overwrite').saveAsTable('review_prediction')
'''

# Using Python

new_df.select(
    new_df.REVIEW,
    new_df.SENTIMENT,
    new_df.SENTIMENT_FLAG,
    fn.call_udf(
        "predict_review", 
        fn.array_construct(col("REVIEW"), col("SENTIMENT_FLAG"))
    ).alias("PREDICTED_REVIEW"),
).write.mode("overwrite").saveAsTable("review_prediction")

CPU times: total: 328 ms
Wall time: 1min 13s


#### __Compare differences with the target label with the Predictions__

In [150]:
session.sql(
    "SELECT * FROM REVIEW_PREDICTION WHERE SENTIMENT_FLAG <> PREDICTED_REVIEW"
).show()

------------------------------------------------------------------
|"REVIEW"  |"SENTIMENT"  |"SENTIMENT_FLAG"  |"PREDICTED_REVIEW"  |
------------------------------------------------------------------
|          |             |                  |                    |
------------------------------------------------------------------



#### __Close Snowpark Session__

In [151]:
session.close()