In [1]:
import pandas as pd
import numpy as np


from analysis_src.basic_data_ingestion import DataLoader

In [2]:
data_loader = DataLoader("postgresql://postgres:3333@localhost:5432/NLP")
data_loader.load_data("customer_reviews")
df =data_loader.get_data()

In [4]:
df.head()

Unnamed: 0,id,product_id,user_id,helpfulness_numerator,helpfulness_denominator,score,time,review_text
0,414001,B000G6RYNE,ACYR6O588USK,14,17,5,1200614400,These potato chips are excellent.There are no ...
1,414002,B0025ULYKI,ACYR6O588USK,1,1,5,1259020800,"I'm not a potato chip addict, but sometimes li..."
2,414003,B003M8GSWQ,ACYR6O588USK,0,0,5,1318982400,These inexpensive little rewards for dogs seem...
3,414004,B001LGGH40,ACYR6O588USK,1,3,5,1235433600,"Though it is a bit expensive, this juice with ..."
4,414005,B0001FQVCA,ACYR6O588USK,14,16,5,1203897600,"I could eat it with a spoon, it's so good.<br ..."


In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 60962 entries, 0 to 60961
Data columns (total 8 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   id                       60962 non-null  int64 
 1   product_id               60962 non-null  object
 2   user_id                  60962 non-null  object
 3   helpfulness_numerator    60962 non-null  int64 
 4   helpfulness_denominator  60962 non-null  int64 
 5   score                    60962 non-null  int64 
 6   time                     60962 non-null  int64 
 7   review_text              60962 non-null  object
dtypes: int64(5), object(3)
memory usage: 3.7+ MB


In [4]:
import logging
import pandas as pd
import numpy as np
import nltk
import re
import string
from abc import ABC, abstractmethod
from bs4 import BeautifulSoup
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

nltk.download('stopwords')
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('wordnet')


# Abstract Base Class for Preprocessing Strategy
# -----------------------------------------------
# This class defines a common interface for different preprocessing strategies.
# Subclasses must implement the preprocess method.
class PreprocessingStrategy(ABC):
    @abstractmethod
    def data_preprocessing(self, df:pd.DataFrame) -> pd.DataFrame:
        """
        Abstract method to preprocess the DataFrame.

        Parameters:
            df (pd.DataFrame): The input DataFrame to be processed.

        Returns:
            pd.DataFrame: The processed DataFrame.
        """
        pass

# Concrete Strategy for Basic Preprocessing
# ---------------------------------------------
# This strategy implements basic preprocessing and text_preprocessing steps for customer reviews data.
class BasicPreprocessingStrategy(PreprocessingStrategy):
    def __init__(self):
        """
        Initializes the BasicPreprocessingStrategy with stop words and lemmatizer.

        Attributes:
            my_stopword (set): A set of English stop words.
            my_lemmatizer (WordNetLemmatizer): An instance of the WordNetLemmatizer for lemmatization.
        """
        self.my_stopword = set(stopwords.words('english'))
        self.my_lemmatizer = WordNetLemmatizer()

    def data_preprocessing(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Preprocesses the customer reviews DataFrame.

        Parameters:
            df (pd.DataFrame): The input DataFrame to be processed.

        Returns:
            pd.DataFrame: The processed DataFrame.
        """
        logging.info("Started basic preprocessing of the data.")

        df = df[['review_text', 'score']]

        # Drop NA values and reset index
        df = df.dropna().reset_index(drop=True)
        df['score'] = df['score'].astype(int)

        # Remove neutral reviews (Score 3)
        df = df[df['score'] != 3]

        # Label the reviews
        df['label'] = np.where(df['score'] >= 4, 1, 0)  # 1 for positive, 0 for negative

        df = df.drop(columns=['score'])

        #Apply text preprocessing 
        df["review_text"] = df["review_text"].apply(self.text_preprocessing)

        logging.info(f"Basic preprocessing completed. Number of records after preprocessing: {len(df)}.")

        return df 
    
    def text_preprocessing(self, text):
        """
        Apply a series of text preprocessing steps to the given text.

        Parameters:
            text (str): The input text to be preprocessed.

        Returns:
            str: The processed text.
        """
        text = self.lower_text(text)
        text = self.remove_html_tags(text)
        text = self.remove_urls(text)
        text = self.replace_special_character_to_string_equivalent(text)
        text = self.expand_contractions(text)
        text = self.remove_non_alpha(text)
        text = self.remove_extra_spaces(text)
        text = self.remove_stopwords(text)
        text = self.text_lemmatization(text)
        return text
    
    def lower_text(self, text):
        """
        Converts the input text to lowercase.

        Parameters:
            text (str): The input text to convert.

        Returns:
            str: The lowercase version of the input text.
        """
        text = text.lower()
        return text
    
    def remove_html_tags(self, text):
        """
        Removes HTML tags from the input text.

        Parameters:
            text (str): The input text potentially containing HTML tags.

        Returns:
            str: The text without HTML tags.
        """
        text = BeautifulSoup(text, "html.parser").get_text()
        return text
    
    def remove_urls(self, text):
        """
        Removes URLs from the input text.

        Parameters:
            text (str): The input text potentially containing URLs.

        Returns:
            str: The text without URLs.
        """
        text = re.sub(r"http\S+","", text)
        return text
    
    def replace_special_character_to_string_equivalent(self,text):
        """
        Replaces special characters in the input text with their string equivalents.

        Parameters:
            text (str): The input text containing special characters.

        Returns:
            str: The text with special characters replaced by their equivalents.
        """
        replacements = {
            '%':"percent", 
            '$':"dollar",
            '₹':"rupee",
            '€':"euro",
            '@':"at",
        }
        for char,word in replacements.items():
            text = text.replace(char,word)
        return text
    
    def expand_contractions(self, text):
        """
        Expands contractions in the input text to their full form.

        Parameters:
            text (str): The input text containing contractions.

        Returns:
            str: The text with contractions expanded.
        """
        contractions = {
            "won't":"will not",
            "can't":"cannot",
            "n't":"not",
            "'re":"are",
            "'s":"is",
            "'d":"would",
            "'ll":"will",
            "'t":"not",
            "'ve":"have",
            "'m":"am",
        }
        for contraction, expand in contractions.items():
            text = re.sub(contraction, expand, text)
        return text
    
    def remove_non_alpha(self,text):
        """
        Removes non-alphabetical characters from the input text.

        Parameters:
            text (str): The input text to process.

        Returns:
            str: The text with non-alphabetical characters removed.
        """
        words = nltk.word_tokenize(text)
        words = [re.sub('[^A-Za-z]','',word) for word in words]
        return ' '.join(words)
    
    def remove_extra_spaces(self,text):
        """
        Removes extra spaces from the input text.

        Parameters:
            text (str): The input text to process.

        Returns:
            str: The text with extra spaces removed.
        """
        text = re.sub(r'\s+',' ', text).strip()
        return text 
    
    def remove_stopwords(self,text):
        """
        Removes stopwords from the input text.

        Parameters:
            text (str): The input text from which to remove stopwords.

        Returns:
            str: The text with stopwords removed.
        """
        words = text.split()
        filtered_words = [word for word in words if word not in self.my_stopword]
        return ' '.join(filtered_words)
    
    def text_lemmatization(self, text):
        """
        Lemmatizes the words in the input text.

        Parameters:
            text (str): The input text to lemmatize.

        Returns:
            str: The lemmatized text.
        """
        words = nltk.word_tokenize(text)
        lemmatized_words = [self.my_lemmatizer.lemmatize(word) for word in words]
        return ' '.join(lemmatized_words)
    

    
# Context Class for Data Preprocessing
# --------------------------------------
# This class uses a PreprocessingStrategy to preprocess the data.
class DataPreprocessor:
    def __init__(self, strategy:PreprocessingStrategy):
        """
        Initializes the DataPreprocessor with the DataFrame and a strategy.

        Parameters:
            df (pd.DataFrame): The DataFrame containing customer reviews data.
            strategy (PreprocessingStrategy): The strategy for preprocessing.
        """
        self._strategy=strategy

    def set_strategy(self, strategy:PreprocessingStrategy):
        """
        Sets a new strategy for the DataPreprocessor.

        Parameters:
            strategy (PreprocessingStrategy): The new strategy to be used for preprocessing.
        """
        logging.info("Switching preprocessing strategy")
        self._strategy=strategy

    def preprocess(self, df:pd.DataFrame) -> pd.DataFrame:
        """
        Executes the preprocessing using the current strategy.

        Returns:
            pd.DataFrame: The processed DataFrame.
        """
        logging.info("Preprocessing data using the selected strategy.")
        return self._strategy.data_preprocessing(df)
    

# Example usage
if __name__ == "__main__":
    
    # # Example DataFrame (replace with actual data loading)
    # df = pd.DataFrame({
    #     'review_text': ['Good product! Highly recommend.', 'Just okay.', 'Worst product ever!'],
    #     'score': [5, 4, 1],
    #     'label': [1, 1, 0]
    # })

    # # Initialize data preprocessor with a specific strategy
    # strategy = BasicPreprocessingStrategy()
    # preprocessor = DataPreprocessor(strategy)
    # preprocessed_data = preprocessor.preprocess(df)
    # print(preprocessed_data)

    pass


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [5]:
strategy = BasicPreprocessingStrategy()
df_preprocessor = DataPreprocessor(strategy)
df_pre = df_preprocessor.preprocess(df)

  text = BeautifulSoup(text, "html.parser").get_text()


In [6]:
df_pre.info()

<class 'pandas.core.frame.DataFrame'>
Index: 56208 entries, 0 to 60961
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   review_text  56208 non-null  object
 1   label        56208 non-null  int64 
dtypes: int64(1), object(1)
memory usage: 1.3+ MB


In [7]:
import pandas as pd
import logging

class DataSampler:
    """A class to sample customer reviews data for model training.
    
    Attributes:
        df: pd.DataFrame
            The DataFrame containing labeled customer reviews data.
    
    Methods:
        sample_data() -> pd.DataFrame:
            Samples 100,000 reviews (50,000 positive and 50,000 negative).
    """
    def __init__(self, df:pd.DataFrame):
        """
        Initializes the DataSampler with the DataFrame.
        
        Parameters:
            df: pd.DataFrame
                The DataFrame containing labeled customer reviews data.
        """
        self.df = df

    def sample_data(self, df:pd.DataFrame) -> pd.DataFrame:
        """
        Samples 100,000 reviews by shuffling the DataFrame 
        and selecting 50,000 positive and 50,000 negative reviews.
        
        Returns:
            pd.DataFrame: A DataFrame containing the sampled reviews.
        """
        logging.info("Started sampling data for model training.")

        #Shuffle the data
        df_shuffled = df.sample(frac=1, random_state=42).reset_index(drop=True)

        # Select 50,000 negative reviews (label 0)
        negative_reviews = df_shuffled[df_shuffled["label"]==0][:5000]

        #select 50,000 positive reviews (label 1)
        positive_reviews = df_shuffled[df_shuffled["label"]==1][:5000]

        # Combine the selected reviews
        sampled_data = pd.concat([negative_reviews, positive_reviews], ignore_index=True)

        logging.info(f"Sampling completed. Number of records after sampling: {len(sampled_data)}.")

        return sampled_data

In [8]:
sampler = DataSampler(df_pre)

sampled_df = sampler.sample_data(df_pre)

In [9]:
sampled_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   review_text  10000 non-null  object
 1   label        10000 non-null  int64 
dtypes: int64(1), object(1)
memory usage: 156.4+ KB


In [10]:
sampled_df["label"].value_counts()

label
0    5000
1    5000
Name: count, dtype: int64

In [11]:
import logging
from abc import ABC, abstractmethod

import pandas as pd
from sklearn.model_selection import train_test_split

# Setup logging configuration
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Abstract Base Class for Data Splitting Strategy
# -----------------------------------------------
# This class defines a common interface for different data splitting strategies.
# Subclasses must implement the split_data method.
class DataSplittingStrategy(ABC):
    @abstractmethod
    def split_data(self, df:pd.DataFrame, target_column:str):
        """
        Abstract method to split the data into training and testing sets.

        Parameters:
        df (pd.DataFrame): The input DataFrame to be split.
        target_column (str): The name of the target column.

        Returns:
        X_train, X_test, y_train, y_test: The training and testing splits for features and target.
        """
        pass

# Concrete Strategy for Simple Train-Test Split
# ---------------------------------------------
# This strategy implements a simple train-test split.
class SimpleTrainTestSplitStrategy(DataSplittingStrategy):
    def __init__(self, test_size=0.2, random_state=42):
        """
        Initializes the SimpleTrainTestSplitStrategy with specific parameters.

        Parameters:
        test_size (float): The proportion of the dataset to include in the test split.
        random_state (int): The seed used by the random number generator.
        """
        self.test_size=test_size
        self.random_state=random_state

    def split_data(self, df: pd.DataFrame, target_column: str):
        """
        Splits the data into training and testing sets using a simple train-test split.

        Parameters:
        df (pd.DataFrame): The input DataFrame to be split.
        target_column (str): The name of the target column.

        Returns:
        X_train, X_test, y_train, y_test: The training and testing splits for features and target.
        """
        logging.info("Performing simple train-test split.")
        X = df.drop(columns=[target_column])
        y = df[target_column]

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=self.test_size, random_state= self.random_state
        )

        logging.info("Train-test split completed.")
        return X_train, X_test, y_train, y_test
    
# Context Class for Data Splitting
# --------------------------------
# This class uses a DataSplittingStrategy to split the data.
class DataSplitter:
    def __init__(self, strategy:DataSplittingStrategy):
        """
        Initializes the DataSplitter with a specific data splitting strategy.

        Parameters:
        strategy (DataSplittingStrategy): The strategy to be used for data splitting.
        """
        self._strategy = strategy

    def set_strategy(self, strategy:DataSplittingStrategy):
        """
        Sets a new strategy for the DataSplitter.

        Parameters:
        strategy (DataSplittingStrategy): The new strategy to be used for data splitting.
        """
        logging.info("Switching data splitting strategy")
        self._strategy = strategy

    def split(self, df:pd.DataFrame, target_column:str):
        """
        Executes the data splitting using the current strategy.

        Parameters:
        df (pd.DataFrame): The input DataFrame to be split.
        target_column (str): The name of the target column.

        Returns:
        X_train, X_test, y_train, y_test: The training and testing splits for features and target.
        """
        logging.info("Splitting data using the selected strategy.")
        return self._strategy.split_data(df, target_column)
    
# Example usage
if __name__ == "__main__":
    # Example dataframe (replace with actual data loading)
    # df = pd.read_csv('your_data.csv')

    # Initialize data splitter with a specific strategy
    # data_splitter = DataSplitter(SimpleTrainTestSplitStrategy(test_size=0.2, random_state=42))
    # X_train, X_test, y_train, y_test = data_splitter.split(df, target_column='SalePrice')

    pass


In [12]:
splitter = DataSplitter(strategy=SimpleTrainTestSplitStrategy())
X_train, X_test, y_train, y_test = splitter.split(sampled_df, target_column="label")

In [13]:
import logging
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer

class TfidfVectorization:
    def __init__(self):
        """
        Initializes the TfidfVectorizer for use on text data.
        """
        self.vectorizer = TfidfVectorizer()

    def fit_transform(self, X_train:pd.DataFrame):
        """
        Fits the vectorizer to the training data and transforms it.

        Parameters:
            X_train (pd.DataFrame): The training data to fit and transform.

        Returns:
            sparse matrix: The transformed training data in sparse matrix form.
        """
        logging.info("TF-IDF Vectorizer: Fitting and transforming training data.")
        return self.vectorizer.fit_transform(X_train)
    
    def transform(self, X_test:pd.DataFrame):
        """
        Transforms the test data using the already-fitted vectorizer.

        Parameters:
            X_test (pd.Series): The test data to transform.

        Returns:
            sparse matrix: The transformed test data in sparse matrix form.
        """
        logging.info("TF-IDF Vectorizer: Transforming test data")
        return self.vectorizer.transform(X_test)
    

# Example usage
if __name__ == "__main__":
    # # Example DataFrame (replace with actual data loading)
    # df_train = pd.DataFrame({
    #     'review_text': ['Good product! Highly recommend.', 'Just okay.', 'Worst product ever!']
    # })
    # df_test = pd.DataFrame({
    #     'review_text': ['Amazing quality!', 'Not good at all.']
    # })

    # # Initialize the TF-IDF Vectorizer
    # tfidf_vectorizer = TfidfVectorization()
    # tf_x_train = tfidf_vectorizer.fit_transform(df_train['review_text'])
    # tf_x_test = tfidf_vectorizer.transform(df_test['review_text'])

    # print("TF-IDF Vectors for Training Data:\n", tf_x_train.toarray())
    # print("TF-IDF Vectors for Test Data:\n", tf_x_test.toarray())
    pass

In [14]:
vectorizer = TfidfVectorization()
tf_X_train = vectorizer.fit_transform(X_train["review_text"])
tf_X_test = vectorizer.transform(X_test["review_text"])

In [15]:
print(tf_X_train)


<Compressed Sparse Row sparse matrix of dtype 'float64'
	with 271128 stored elements and shape (8000, 20842)>
  Coords	Values
  (0, 10543)	0.12939230732465548
  (0, 8067)	0.28092700382345537
  (0, 1621)	0.16615664095950014
  (0, 4518)	0.47103293355055575
  (0, 18516)	0.1741517281248534
  (0, 8731)	0.18951420280216913
  (0, 3573)	0.3275615309254489
  (0, 17443)	0.33489521610541567
  (0, 4911)	0.18046328901902722
  (0, 20010)	0.18304516012964425
  (0, 167)	0.2013097435448063
  (0, 7677)	0.17559395337410352
  (0, 13551)	0.2954780586148099
  (0, 12731)	0.18557102166885456
  (0, 588)	0.15462024507937902
  (0, 7166)	0.20076278820103954
  (0, 16293)	0.21394717235785957
  (1, 18516)	0.07060157118168427
  (1, 4911)	0.073160294661121
  (1, 20010)	0.07420699203794795
  (1, 167)	0.08161150246097915
  (1, 12731)	0.1504619659727828
  (1, 7166)	0.162779530636036
  (1, 16293)	0.1734694990510156
  (1, 7783)	0.0708917589850204
  :	:
  (7998, 16821)	0.20236463059911358
  (7998, 3329)	0.20449793444398334


In [16]:
import logging
from abc import ABC, abstractmethod
from typing import Any
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import RandomizedSearchCV
import xgboost as xgb
from xgboost import XGBClassifier
import joblib

# Abstract Base Class for Model Building Strategy
class ModelBuildingStrategy(ABC):
    @abstractmethod
    def build_and_train_model(self, X_train:pd.DataFrame, y_train:pd.Series, fine_tuning:bool = False) -> Any:
        """
        Abstract method to build and train a model.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Flag to indicate if fine-tuning should be performed.

        Returns:
            Any: A trained scikit-learn model instance.
        """
        pass

# Concrete Strategy for Logistic Regression
class LogisticRegressionStrategy(ModelBuildingStrategy):
    def build_and_train_model(self, X_train: pd.DataFrame, y_train: pd.Series, fine_tuning: bool = False) -> Any:
        """
        Trains a Logistic Regression model on the provided training data.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Not applicable for Logistic Regression, defaults to False.

        Returns:
            LogisticRegression: A trained Logistic Regression model.
        """
        logging.info("Training the Logistic Regression Model.")
        model = LogisticRegression(max_iter=1000)
        model.fit(X_train,y_train)
        joblib.dump(model, "/home/karthikponna/karthik/sentiment_analysis_mlops_project_1/sentiment_analysis_MLOps/saved_models/logistic.pkl")
        logging.info("Logistic Regression training completed.")
        return model
    
# Concrete Strategy for XGBoost
class XGBoostStrategy(ModelBuildingStrategy):
    def build_and_train_model(self, X_train: pd.DataFrame, y_train: pd.Series, fine_tuning: bool = False) -> Any:
        """
        Trains an XGBoost model on the provided training data, optionally with fine-tuning.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Flag to indicate if fine-tuning should be performed.

        Returns:
            XGBClassifier: A trained XGBoost model (either fine-tuned or default).
        """
        if fine_tuning:
            logging.info("Started fine-tuning the XGBoost model.")
            params = {
                "n_estimators": [50, 100, 150, 200],
                "max_depth": [2, 4, 6, 8],
                "learning_rate": [0.01, 0.05, 0.1, 0.2],
                "subsample": [0.5, 0.7, 0.8, 1.0],
                "colsample_bytree": [0.5, 0.7, 1.0],
            }
            
            xgb_model = XGBClassifier()
            clf = RandomizedSearchCV(xgb_model, params, cv=5, n_jobs=-1)
            clf.fit(X_train, y_train)
            joblib.dump(clf, "/home/karthikponna/karthik/sentiment_analysis_mlops_project_1/sentiment_analysis_MLOps/saved_models/xgb_fine_tuned.pkl")
            logging.info("Finished Hyperparameter search for XGBoost.")
            return clf
        
        else:
            logging.info("Started training the XGBoost model.")
            model = XGBClassifier(
                learning_rate=0.3,
                max_depth=8,
                min_child_weight=1,
                n_estimators=50,
                random_state=0,
            )
            
            model.fit(X_train, y_train)
            joblib.dump(model, "/home/karthikponna/karthik/sentiment_analysis_mlops_project_1/sentiment_analysis_MLOps/saved_models/xgb.pkl")
            logging.info("Completed training the XGBoost model.")
            return model
        
# Concrete Strategy for SVM
class SVCStrategy(ModelBuildingStrategy):
    def build_and_train_model(self, X_train: pd.DataFrame, y_train: pd.Series, fine_tuning: bool = False) -> Any:
        """
        Trains a Support Vector Classifier model on the provided training data, optionally with fine-tuning.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Flag to indicate if fine-tuning should be performed.

        Returns:
            SVC: A trained SVC model (either fine-tuned or default).
        """
        if fine_tuning:
            logging.info("Started fine-tuning the SVM model.")
            params = {
                "C": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
                "gamma": [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
            }
            svm = SVC()
            clf = RandomizedSearchCV(svm, params, cv=5, n_jobs=-1)
            clf.fit(X_train, y_train)
            joblib.dump(clf, "/home/karthikponna/karthik/sentiment_analysis_mlops_project_1/sentiment_analysis_MLOps/saved_models/svm_fine_tuned.pkl")
            logging.info("Finished Hyperparameter search for SVM.")
            return clf
        
        else:
            logging.info("Started training the SVM model.")
            model = SVC(C=1.0, gamma='scale')
            model.fit(X_train, y_train)
            joblib.dump(model, "/home/karthikponna/karthik/sentiment_analysis_mlops_project_1/sentiment_analysis_MLOps/saved_models/svm.pkl")
            logging.info("Completed training the SVM model.")
            return model
            
# Concrete Strategy for Naive Bayes
class NaiveBayesStrategy(ModelBuildingStrategy):
    def build_and_train_model(self, X_train: pd.DataFrame, y_train: pd.Series, fine_tuning: bool = False) -> Any:
        """
        Trains a Naive Bayes model on the provided training data.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Not applicable for Naive Bayes, defaults to False.

        Returns:
            MultinomialNB: A trained Naive Bayes model.
        """
        logging.info("Training the Naive Bayes model.")
        model = MultinomialNB()
        model.fit(X_train, y_train)
        joblib.dump(model, "/home/karthikponna/karthik/sentiment_analysis_mlops_project_1/sentiment_analysis_MLOps/saved_models/NBayes.pkl")
        logging.info("Completed training the Naive Bayes model.")
        return model
    
# Concrete Strategy for Random Forest
class RandomForestStrategy(ModelBuildingStrategy):
    def build_and_train_model(self, X_train: pd.DataFrame, y_train: pd.Series, fine_tuning: bool = False) -> Any:
        """
        Trains a Random Forest model on the provided training data.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Not applicable for Random Forest, defaults to False.

        Returns:
            RandomForestClassifier: A trained Random Forest model.
        """
        logging.info("Training the Random Forest model.")
        model = RandomForestClassifier()
        model.fit(X_train, y_train)
        joblib.dump(model, "/home/karthikponna/karthik/sentiment_analysis_mlops_project_1/sentiment_analysis_MLOps/saved_models/rf.pkl")
        logging.info("Completed training the Random Forest model.")
        return model
    
# Context Class for Model Building Strategy
class ModelBuilder:
    def __init__(self, strategy:ModelBuildingStrategy):
        """
        Initializes the ModelBuildingStrategy with the X_train, y_train, fine_tuning and a strategy.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Flag to indicate if fine-tuning should be performed.
        """
        self._strategy = strategy

    def set_strategy(self, strategy:ModelBuildingStrategy):
        """
        Set the model building strategy.

        Parameters:
            strategy (ModelBuildingStrategy): The strategy to set.
        """
        self._strategy = strategy

    def train(self, X_train: pd.DataFrame, y_train: pd.Series, fine_tuning: bool = False) -> Any:
        """
        Train the model using the set strategy.

        Parameters:
            X_train (pd.DataFrame): The training data features.
            y_train (pd.Series): The training data labels/target.
            fine_tuning (bool): Flag to indicate if fine-tuning should be performed.

        Returns:
            Any: A trained model instance from the chosen strategy.
        """
        return self._strategy.build_and_train_model(X_train, y_train, fine_tuning)


# Example usage
if __name__ == "__main__":
    # import numpy as np
    # from sklearn.model_selection import train_test_split
    # from sklearn.datasets import make_classification

    # # Configure logging
    # logging.basicConfig(level=logging.INFO)

    # # Generate synthetic data for demonstration purposes
    # X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
    # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # # Initialize the ModelBuilder with different strategies
    # # Logistic Regression
    # logging.info("Training Logistic Regression Model")
    # logistic_strategy = LogisticRegressionStrategy()
    # logistic_builder = ModelBuilder(logistic_strategy)
    # logistic_model = logistic_builder.train(X_train, y_train)
    # logging.info("Logistic Regression Model Trained and Saved.")

    # # XGBoost with fine-tuning
    # logging.info("Training XGBoost Model with Fine-Tuning")
    # xgb_strategy = XGBoostStrategy()
    # xgb_builder = ModelBuilder(xgb_strategy)
    # xgb_model = xgb_builder.train(X_train, y_train, fine_tuning=True)
    # logging.info("XGBoost Model (Fine-Tuned) Trained and Saved.")

    # # SVM with fine-tuning
    # logging.info("Training SVM Model with Fine-Tuning")
    # svm_strategy = SVCStrategy()
    # svm_builder = ModelBuilder(svm_strategy)
    # svm_model = svm_builder.train(X_train, y_train, fine_tuning=True)
    # logging.info("SVM Model (Fine-Tuned) Trained and Saved.")

    # # Naive Bayes
    # logging.info("Training Naive Bayes Model")
    # nb_strategy = NaiveBayesStrategy()
    # nb_builder = ModelBuilder(nb_strategy)
    # nb_model = nb_builder.train(X_train, y_train)
    # logging.info("Naive Bayes Model Trained and Saved.")

    # # Random Forest
    # logging.info("Training Random Forest Model")
    # rf_strategy = RandomForestStrategy()
    # rf_builder = ModelBuilder(rf_strategy)
    # rf_model = rf_builder.train(X_train, y_train)
    # logging.info("Random Forest Model Trained and Saved.")

    # # Example of using models on test data (Optional)
    # test_data_sample = X_test[:5]  # Take first 5 examples for testing
    # logging.info("Logistic Regression Prediction: %s", logistic_model.predict(test_data_sample))
    # logging.info("XGBoost Prediction: %s", xgb_model.predict(test_data_sample))
    # logging.info("SVM Prediction: %s", svm_model.predict(test_data_sample))
    # logging.info("Naive Bayes Prediction: %s", nb_model.predict(test_data_sample))
    # logging.info("Random Forest Prediction: %s", rf_model.predict(test_data_sample))
    
    pass

In [20]:
import mlflow
import logging
import pandas as pd
from typing import List, Annotated
from scipy.sparse import csr_matrix
from sklearn.base import ClassifierMixin

from zenml import ArtifactConfig, step
from zenml.client import Client

# Get the active experiment tracker from ZenML
experiment_tracker = Client().active_stack.experiment_tracker
from zenml import Model

model = Model(
    name="customer_reviews_predictor",
    version=None,
    license="Apache 2.0",
    description="Reviews predictor model for customer reviews",
)

@step(enable_cache=False, experiment_tracker=experiment_tracker.name, model=model)
def model_building_step(
    X_train:csr_matrix, y_train:pd.Series, method:str, fine_tuning:bool = False
) -> Annotated[ClassifierMixin, ArtifactConfig(name="trained_model", is_model_artifact=True)]:
    """
    Model building step using ZenML with multiple model options and MLflow tracking.

    Parameters:
        X_train (pd.DataFrame): The training data features.
        y_train (pd.Series): The training data labels/target.
        method (str): Model selection method, e.g., 'logistic_regression', 'xgboost', 'svm', 'naive_bayes', 'random_forest'.
        fine_tuning (bool): Flag to indicate if fine-tuning should be performed, only applicable to certain models.

    Returns:
        Trained model instance.
    """
    logging.info(f"Building model using method: {method}")
    
    # Choose the appropriate strategy based on the method
    if method == "logistic_regression":
        strategy = LogisticRegressionStrategy()
        logging.info("Selected Logistic Regression Strategy.")
    
    elif method == "xgboost":
        strategy = XGBoostStrategy()
        logging.info("Selected XGBoost Strategy.")

    elif method == "svc":
        strategy = SVCStrategy()
        logging.info("Selected SVM Strategy.")

    elif method == "naive_bayes":
        strategy = NaiveBayesStrategy()
        logging.info("Selected Naive Bayes Strategy.")

    elif method == "random_forest":
        strategy = RandomForestStrategy()
        logging.info("Selected Random Forest Strategy.")

    else:
        raise ValueError(f"Unknown method '{method}' selected for model training.")
    
    # Initialize ModelBuilder with the selected strategy
    model_builder = ModelBuilder(strategy)

    # Start an MLflow run to log the training process
    if not mlflow.active_run():
        mlflow.start_run()

    try:
        # Enable autologging to automatically log model parameters, metrics, and artifacts
        mlflow.sklearn.autolog()

        # Train the model with or without fine-tuning
        logging.info("Started model training.")
        trained_model = model_builder.train(X_train, y_train, fine_tuning=fine_tuning)
        logging.info("Model training completed.")

    except Exception as e:
        logging.error(f"An error occurred during model training: {e}")
        raise e
    
    finally:
        #End the mlflow run
        mlflow.end_run()

    return trained_model

In [21]:
model = model_building_step(X_train=tf_X_train, y_train=y_train, method="svc", fine_tuning=False)


[1;35mRunning single step pipeline to execute step [0m[1;36mmodel_building_step[1;35m[0m
[33mUsing an external artifact as step input currently invalidates caching for the step and all downstream steps. Future releases will introduce hashing of artifacts which will improve this behavior.[0m
[33mUsing an external artifact as step input currently invalidates caching for the step and all downstream steps. Future releases will introduce hashing of artifacts which will improve this behavior.[0m
[1;35mInitiating a new run for the pipeline: [0m[1;36mmodel_building_step[1;35m.[0m


[1;35mUploading external artifact to 'external_artifacts/external_5572bb0f-2d44-4858-89ff-f99d364d49a1'.[0m
[1;35mFinished uploading external artifact 1a0a7359-7333-479e-b0dc-8959c71d2217.[0m
[1;35mUploading external artifact to 'external_artifacts/external_3987cd67-778f-4f3e-91ef-6e35a0e90da2'.[0m
[1;35mFinished uploading external artifact c7ba3f5f-57e2-4a81-9a11-cb5a0f8578dc.[0m
[1;35mExecuting a new run.[0m
[1;35mCaching is disabled by default for [0m[1;36mmodel_building_step[1;35m.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mlocal-mlflow-stack[1;35m[0m
[1;35m  artifact_store: [0m[1;36mdefault[1;35m[0m
[1;35m  model_deployer: [0m[1;36mmlflow[1;35m[0m
[1;35m  experiment_tracker: [0m[1;36mmlflow_tracker[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35mDashboard URL for Pipeline Run: [0m[34mhttp://127.0.0.1:8237/runs/d89a15c4-b68a-4d0f-bcfd-82824ca9eb53[1;35m[0m
[1;35mCaching [0m[1;36mdisa

In [23]:
import logging
from abc import ABC, abstractmethod
from typing import Dict
import pandas as pd
from sklearn.base import ClassifierMixin
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    roc_auc_score
)

# Setup logging configuration
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Abstract Base Class for Model Evaluation Strategy
class ModelEvaluationStrategy(ABC):
    @abstractmethod
    def evaluate_model(
        self, model:ClassifierMixin, X_test:pd.DataFrame, y_test:pd.Series
    ) -> Dict[str, float]:
        """
        Abstract method to evaluate a model.

        Parameters:
            model (ClassifierMixin): The trained model to evaluate.
            X_test (pd.DataFrame): The testing data features.
            y_test (pd.Series): The testing data labels/target.

        Returns:
            dict: A dictionary containing evaluation metrics.
        """
        pass

# Concrete Strategy for Classification Model Evaluation
class ClassificationModelEvaluationStrategy(ModelEvaluationStrategy):
    def evaluate_model(self, model: ClassifierMixin, X_test: pd.DataFrame, y_test: pd.Series) -> Dict[str, float]:
        """
        Evaluates a classification model using various metrics.

        Parameters:
            model (ClassifierMixin): The trained classification model to evaluate.
            X_test (pd.DataFrame): The testing data features.
            y_test (pd.Series): The testing data labels/target.

        Returns:
            dict: A dictionary containing accuracy, precision, recall, F1 score, ROC AUC, and confusion matrix.
        """
        logging.info("Predicting using the trained model.")
        y_pred = model.predict(X_test)

        logging.info("Calculating evaluation metrics.")
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        F1_score = f1_score(y_test, y_pred)
        roc_auc = roc_auc_score(y_test, y_pred)
        Confusion_matrix = confusion_matrix(y_test, y_pred)

        metrics = {
            "Accuracy": accuracy,
            "Precision": precision,
            "Recall": recall,
            "F1 Score": F1_score,
            "ROC AUC": roc_auc,
            "True Negatives": Confusion_matrix[0][0],
            "False Positives": Confusion_matrix[0][1],
            "False Negatives": Confusion_matrix[1][0],
            "True Positives": Confusion_matrix[1][1],

        }
        logging.info(f"Model Evaluation Metrics: {metrics}")
        return metrics
    
# Context Class for Model Evaluation
class ModelEvaluator:
    def __init__(self, strategy:ModelEvaluationStrategy):
        """
        Initializes the ModelEvaluator with a specific model evaluation strategy.

        Parameters:
            strategy (ModelEvaluationStrategy): The strategy to be used for model evaluation.
        """
        self._strategy = strategy

    def set_strategy(self, strategy: ModelEvaluationStrategy):
        """
        Sets a new strategy for the ModelEvaluator.

        Parameters:
            strategy (ModelEvaluationStrategy): The new strategy to be used for model evaluation.
        """
        logging.info("Switching model evaluation strategy.")
        self._strategy = strategy

    def evaluate(self, model: ClassifierMixin, X_test: pd.DataFrame, y_test: pd.Series) -> Dict[str, float]:
        """
        Executes the model evaluation using the current strategy.

        Parameters:
            model (ClassifierMixin): The trained model to evaluate.
            X_test (pd.DataFrame): The testing data features.
            y_test (pd.Series): The testing data labels/target.

        Returns:
            dict: A dictionary containing evaluation metrics.
        """
        logging.info("Evaluating the model using the selected strategy.")
        return self._strategy.evaluate_model(model, X_test, y_test)
    

# Example usage
if __name__ == "__main__":
    # Example trained model and data (replace with actual trained model and data)
    # model = trained_sklearn_classification_model
    # X_test = test_data_features
    # y_test = test_data_target

    # Initialize model evaluator with a specific strategy
    # model_evaluator = ModelEvaluator(ClassificationModelEvaluationStrategy())
    # evaluation_metrics = model_evaluator.evaluate(model, X_test, y_test)
    # print(evaluation_metrics)

    pass




In [1]:
selected_model = model  # or another index/model depending on your logic

# Example usage of ModelEvaluator
model_evaluator = ModelEvaluator(ClassificationModelEvaluationStrategy())
evaluation_metrics = model_evaluator.evaluate(selected_model, tf_X_test, y_test)
print(evaluation_metrics)

NameError: name 'model' is not defined