In [1]:
!pip install transformers

Collecting transformers
  Using cached transformers-4.44.2-py3-none-any.whl.metadata (43 kB)
Collecting huggingface-hub<1.0,>=0.23.2 (from transformers)
  Using cached huggingface_hub-0.25.0-py3-none-any.whl.metadata (13 kB)
Collecting safetensors>=0.4.1 (from transformers)
  Using cached safetensors-0.4.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.8 kB)
Collecting tokenizers<0.20,>=0.19 (from transformers)
  Using cached tokenizers-0.19.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Using cached transformers-4.44.2-py3-none-any.whl (9.5 MB)
Using cached huggingface_hub-0.25.0-py3-none-any.whl (436 kB)
Using cached safetensors-0.4.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (435 kB)
Using cached tokenizers-0.19.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.6 MB)
Installing collected packages: safetensors, huggingface-hub, tokenizers, transformers
Successfully installed huggingface-hub-0.25.0 

In [2]:
import os
import json
import joblib
import logging
import time
import pandas as pd
import tensorflow as tf
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from typing import Any, Dict, Iterable, List, Tuple
import tensorflow_hub as hub
import numpy as np
from transformers import DistilBertTokenizer, TFDistilBertModel
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    cohen_kappa_score,
    f1_score,
    matthews_corrcoef,
    precision_score,
    recall_score,
)
from tensorflow.python.lib.io import file_io
import yaml
AUTO = tf.data.experimental.AUTOTUNE
logger = logging.getLogger(__name__)

2024-09-23 10:55:36.995372: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-09-23 10:55:38.277027: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/cuda/lib:/usr/local/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
2024-09-23 10:55:38.277162: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such 

In [3]:
FEATURE_COLUMN_NAME = "clean_text"
LABEL_COLUMN_NAME = "category"
TITLE = "title"
QUERY = "query"
MAX_LENGTH = "max_length"
DATA = "data"
VAL_SIZE = "val_size"
RANDOM_STATE = "random_state"
TRANSFORMERS = "transformers"
TOKENIZER = "tokenizer"
ENCODING = "encoding"
TRAIN = "train"
BATCH_SIZE = "batch_size"
MODEL = "model"
OPTIMIZATION = "optimization"
INIT_LR = "init_lr"
CALLBACKS = "callbacks"
VERBOSE = "verbose"
VAL_LOSS = "val_loss"
REDUCE_ON_PLATEAU = "reduce_on_plateau"
PATIENCE = "patience"
MIN_LR = "min_lr"
MIN_DELTA = "min_delta"
METRICS_FULL_REPORT = "full_report"
MATHEW = "Mathew"
FACTOR = "factor"
EPOCHS = "epochs"
EARLY_STOPPING = "early_stopping"
COHEN = "Cohen"
ACTIVATION = "activation"
ACCURACY = "Accuracy"
INPUT_IDS = "input_ids"
ENCODED_LABEL = "encoded_label"
LABEL_ENCODER_PKL = "label_encoder.pkl"
METRICS_JSON = "metrics.json"
PREDICTED_CATEGORY = "predicted_catagory"
METRICS_DIR = "metrics"
INFERENCE = "inference"
PROB_SCORES = "prediction_probability"
FILE_NAME_PREFIX = "search_data_prediction_file"


In [None]:
def read_csv_file(data_path: str) -> pd.DataFrame:
    """
    This functions combine multiple CSV files into a single CSV file
    :param data_path: GCS path where all the cleaned preprocessed csv file saved
    :return: Dataframe
    """
    for filename in os.listdir(data_path):
        if filename.endswith(".csv"):
            filepath = os.path.join(data_path, filename)
            data = pd.read_csv(filepath)
    return data

In [None]:
class ModelTrainer:
    def __init__(self, model_params: Dict[str, Any]):
        self.model_params = model_params
        self.tokenizer = self.get_tokenizer()
        self.label_encoder = LabelEncoder()

    def get_tokenizer(self) -> Any:
        try:
            logger.info("Download the tokenizer from Huggingface.")
            return DistilBertTokenizer.from_pretrained(self.model_params[TRANSFORMERS][TOKENIZER])
        except Exception as e:
            logger.error(f"Error downloading tokenizer: {e}")
            raise

    def encode_labels(self, df: pd.DataFrame) -> pd.DataFrame:
        try:
            logger.info("Perform label encoding.")
            df[ENCODED_LABEL] = self.label_encoder.fit_transform(list(df[LABEL_COLUMN_NAME].values))
            logger.info(
                f"The total number of samples: {len(df[ENCODED_LABEL])}; classes: {len(self.label_encoder.classes_)}")
            return df
        except Exception as e:
            logger.error(f"Error in label encoding: {e}")
            raise
    def split_to_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
        try:
            logger.info("Perform train/validation split.")
            return train_test_split(
                df,
                test_size=self.model_params[DATA][VAL_SIZE],
                random_state=self.model_params[RANDOM_STATE],
                stratify=list(df[LABEL_COLUMN_NAME]),
            )
        except Exception as e:
            logger.error(f"Error during train/validation split: {e}")
            raise

    def tokenize_batch(self, texts: List[str], batch_size: int = 32) -> Any:
        try:
            logger.info("Tokenizing batch of texts.")
            tokenized_data = []
            for i in range(0, len(texts), self.model_params[ENCODING][BATCH_SIZE]):
                batch_text = texts[i:i + self.model_params[ENCODING][BATCH_SIZE]]
                tokenized_batch = self.tokenizer(batch_text, max_length=self.model_params[ENCODING][MAX_LENGTH],
                                                 padding='max_length', truncation=True, return_tensors='tf')['input_ids']
                tokenized_data.append(tokenized_batch)
            return tf.concat(tokenized_data, axis=0)
        except Exception as e:
            logger.error(f"Error during tokenization: {e}")
            raise

    def build_and_train_model(self, train_dataset, val_dataset, callbacks: List[Any], n_steps):
        """
        Build and train the model using TFDistilBertModel and TensorFlow's Keras API.
        """
        try:
            logger.info("Building the model.")
            bert_model = TFDistilBertModel.from_pretrained(self.model_params[TRANSFORMERS][MODEL])

            input_ids = tf.keras.layers.Input(shape=(self.model_params[ENCODING][MAX_LENGTH],), dtype='int32')
            bert_output = bert_model(input_ids)[0]
            cls_token = bert_output[:, 0, :]
            output = tf.keras.layers.Dense(len(self.label_encoder.classes_), activation='softmax')(cls_token)

            model = tf.keras.models.Model(inputs=input_ids, outputs=output)

            logger.info("Compiling the model.")
            model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.model_params[TRAIN][OPTIMIZATION][INIT_LR]),
                          loss=SparseCategoricalCrossentropy(),
                          metrics=['accuracy'])

            logger.info("Training the model.")
            train_start = time.time()
            model.fit(train_dataset,
                      validation_data=val_dataset,
                      steps_per_epoch=n_steps,
                      epochs=self.model_params[TRAIN][EPOCHS],
                      batch_size=self.model_params[TRAIN][BATCH_SIZE],
                      callbacks=callbacks)
            train_time_minutes = round((time.time() - train_start) / 60, 2)
            logger.info(f"The training has finished, took {train_time_minutes} minutes.")
            return model
        except Exception as e:
            logger.error(f"Error during model building or training: {e}")
            raise


class ModelEvaluator:
    def __init__(self, model: Any, label_encoder: LabelEncoder):
        self.model = model
        self.label_encoder = label_encoder

    def calculate_scores(self, X_val: Iterable[int], y_val_true: Iterable[int]) -> Dict[str, float]:
        try:
            logger.info("Calculating the metrics.")
            prediction = self.model.predict(X_val)
            y_val_pred = tf.argmax(prediction, axis=1).numpy()
            y_val_pred = self.label_encoder.inverse_transform(y_val_pred)
            y_val_true = self.label_encoder.inverse_transform(y_val_true)
            metrics = self.get_metrics(y_val_true, y_val_pred)
            logger.info(f"Metrics: {metrics}")
            return y_val_pred, y_val_true, metrics
        except Exception as e:
            logger.error(f"Error during score calculation: {e}")
            raise

    def val_data(self, df_val, y_pred):
        df_val[PREDICTED_CATEGORY] = y_pred
        return df_val

    def get_metrics(self,
                    true_values: Iterable[str],
                    predicted_values: Iterable[str],
                    round_n: int = 3,
                    ) -> Dict[str, float]:
        """
        Prints classification report with accuracy.
        :param true_values: Iterable with the actual values.
        :param predicted_values: Iterable with the predicted values.
        :param round_n: The number after the decimal point.
        :return: The dictionary with the model metrics.
        """
        metrics = dict()

        acc_value = accuracy_score(true_values, predicted_values)
        metrics[ACCURACY] = round(acc_value, round_n)
        cohen = cohen_kappa_score(true_values, predicted_values)
        metrics[COHEN] = round(cohen, round_n)
        matthew = matthews_corrcoef(true_values, predicted_values)
        metrics[MATHEW] = round(matthew, round_n)

        f1_macro = f1_score(true_values, predicted_values, average="macro")
        metrics["f1-score macro"] = round(f1_macro, round_n)
        precision_macro = precision_score(true_values, predicted_values, average="macro")
        metrics["Precision macro"] = round(precision_macro, round_n)
        recall_macro = recall_score(true_values, predicted_values, average="macro")
        metrics["Recall macro"] = round(recall_macro, round_n)

        f1_micro = f1_score(true_values, predicted_values, average="micro")
        metrics["f1-score micro"] = round(f1_micro, round_n)
        precision_micro = precision_score(true_values, predicted_values, average="micro")
        metrics["Precision micro"] = round(precision_micro, round_n)
        recall_micro = recall_score(true_values, predicted_values, average="micro")
        metrics["Recall micro"] = round(recall_micro, round_n)

        f1_weighted = f1_score(true_values, predicted_values, average="weighted")
        metrics["f1-score weighted"] = round(f1_weighted, round_n)
        precision_weighted = precision_score(
            true_values, predicted_values, average="weighted"
        )
        metrics["Precision weighted"] = round(precision_weighted, round_n)
        recall_weighted = recall_score(true_values, predicted_values, average="weighted")
        metrics["Recall weighted"] = round(recall_weighted, round_n)

        metrics[METRICS_FULL_REPORT] = classification_report(
            true_values, predicted_values, output_dict=True
        )
        return metrics


class ArtifactManager:
    def __init__(self, artifacts_dir: str):
        self.artifacts_dir = artifacts_dir

    def save_artifacts(self, model: Any, label_encoder: LabelEncoder, metrics: Dict[str, float], df_val_pred) -> None:
        try:
            joblib.dump(label_encoder, os.path.join(self.artifacts_dir, LABEL_ENCODER_PKL))
            metrics_dir_path = os.path.join(self.artifacts_dir, METRICS_DIR)
            os.makedirs(metrics_dir_path, exist_ok=True)
            with open(os.path.join(metrics_dir_path, METRICS_JSON), 'w') as f:
                json.dump(metrics, f, ensure_ascii=False)
            model.save(self.artifacts_dir)
            df_val_pred.to_csv(os.path.join(metrics_dir_path, "val_predicted.csv"), index=False)
            logger.info("The artifacts are successfully saved.")
        except Exception as e:
            logger.error(f"Error saving artifacts: {e}")
            raise


def get_callbacks(callback_params: Dict[str, Any]) -> List[Any]:

    """
    A custom function to provide the needed callbacks based on the parameters specified.
    :param callback_params: The dictionary with the callback params.
    :return: The list of Tensorflow callbacks.
    """
    callbacks = []
    if EARLY_STOPPING in callback_params:
        early_stopping = tf.keras.callbacks.EarlyStopping(
            monitor=VAL_LOSS,
            verbose=callback_params[VERBOSE],
            min_delta=callback_params[EARLY_STOPPING][MIN_DELTA],
            patience=callback_params[EARLY_STOPPING][PATIENCE],
            mode="auto",
        )
        callbacks.append(early_stopping)
    if REDUCE_ON_PLATEAU in callback_params:
        reduce_on_plateau = tf.keras.callbacks.ReduceLROnPlateau(
            monitor=VAL_LOSS,
            verbose=callback_params[VERBOSE],
            factor=callback_params[REDUCE_ON_PLATEAU][FACTOR],
            patience=callback_params[REDUCE_ON_PLATEAU][PATIENCE],
            min_lr=callback_params[REDUCE_ON_PLATEAU][MIN_LR],
        )
        callbacks.append(reduce_on_plateau)
    logger.info(f"Callbacks: {callbacks}")
    return callbacks
def transform_data_for_training(x_train, y_train, x_val, y_val, model_params):
    train_dataset = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .repeat()
        .shuffle(2048)
        .batch(model_params[TRAIN][BATCH_SIZE])
        .prefetch(AUTO)
    )

    val_dataset = (
        tf.data.Dataset.from_tensor_slices((x_val, y_val))
        .batch(model_params[TRAIN][BATCH_SIZE])
        .cache()
        .prefetch(AUTO)
    )
    return train_dataset, val_dataset

def load_model_params(model_params_path: str) -> Dict[str, Any]:
    """
    The purpose of this function is to load the model parameters from the provided path.
    :param model_params_path: GCS path for the model parameters
    :return: A dictionary containing the model parameters
    """
    try:
        logger.info(f'Read model params file from: "{model_params_path}".')
        model_params = None
        with file_io.FileIO(model_params_path, "r") as f:
            model_params = yaml.safe_load(f)
        if not model_params:
            raise Exception(
                f"Failed to load the model parameters file in the"
                f'following path: "{model_params_path}", the file was not found.'
            )
        logger.info(f"The model params:\n{model_params}")
        logger.info("Model parameters loaded successfully.")
        return model_params
    except Exception as e:
        logger.error(f"Error loading model parameters: {e}")
        raise


def train_main(data_path: str, model_params_path: str, artifacts_dir: str):
    try:
        model_params = load_model_params(model_params_path)
        df = read_csv_file(data_path)
        df = df.drop_duplicates(subset=[FEATURE_COLUMN_NAME, LABEL_COLUMN_NAME], keep="first")

        trainer = ModelTrainer(model_params)
        df_encoded = trainer.encode_labels(df)
        df_train, df_val = trainer.split_to_train_val(df_encoded)

        x_train = trainer.tokenize_batch(list(df_train[FEATURE_COLUMN_NAME]))
        x_val = trainer.tokenize_batch(list(df_val[FEATURE_COLUMN_NAME]))
        y_train = list(df_train[ENCODED_LABEL])
        y_val = list(df_val[ENCODED_LABEL])

        train_dataset, val_dataset = transform_data_for_training(x_train, y_train, x_val, y_val, model_params)

        n_steps = x_train.shape[0] // model_params[TRAIN][BATCH_SIZE]
        callbacks = get_callbacks(model_params[TRAIN][CALLBACKS])

        # Build and train the model
        model = trainer.build_and_train_model(train_dataset, val_dataset, callbacks, n_steps)

        evaluator = ModelEvaluator(model, trainer.label_encoder)
        y_pred, y_true, metrics = evaluator.calculate_scores(x_val, y_val)
        df_val_pred = evaluator.val_data(df_val, y_pred)

        artifact_manager = ArtifactManager(artifacts_dir)
        artifact_manager.save_artifacts(model, trainer.label_encoder, metrics, df_val_pred)
    except Exception as e:
        logger.error(f"Error in training pipeline: {e}")
        raise

In [None]:
train_main("data/train","dev.yml","model_artifacts")

In [None]:
!rm -r model_artifacts/

In [None]:
def load_model_params(model_params_path: str) -> Dict[str, Any]:
    """
    The purpose of this function is to load the model parameters from the provided path.
    :param model_params_path: GCS path for the model parameters
    :return: A dictionary containing the model parameters
    """
    try:
        logger.info(f'Read model params file from: "{model_params_path}".')
        model_params = None
        with file_io.FileIO(model_params_path, "r") as f:
            model_params = yaml.safe_load(f)
        if not model_params:
            raise Exception(
                f"Failed to load the model parameters file in the"
                f'following path: "{model_params_path}", the file was not found.'
            )
        logger.info(f"The model params:\n{model_params}")
        logger.info("Model parameters loaded successfully.")
        return model_params
    except Exception as e:
        logger.error(f"Error loading model parameters: {e}")
        raise

In [None]:
class ModelTrainer:
    def __init__(self, model_params: Dict[str, Any]):
        self.model_params = model_params
        self.tokenizer = self.get_tokenizer()
        self.label_encoder = LabelEncoder()

    def get_tokenizer(self) -> Any:
        try:
            logger.info("Download the tokenizer from Huggingface.")
            return DistilBertTokenizer.from_pretrained(self.model_params[TRANSFORMERS][TOKENIZER])
        except Exception as e:
            logger.error(f"Error downloading tokenizer: {e}")
            raise

    def encode_labels(self, df: pd.DataFrame) -> pd.DataFrame:
        try:
            logger.info("Perform label encoding.")
            df[ENCODED_LABEL] = self.label_encoder.fit_transform(list(df[LABEL_COLUMN_NAME].values))
            logger.info(
                f"The total number of samples: {len(df[ENCODED_LABEL])}; classes: {len(self.label_encoder.classes_)}")
            return df
        except Exception as e:
            logger.error(f"Error in label encoding: {e}")
            raise
    def split_to_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
        try:
            logger.info("Perform train/validation split.")
            return train_test_split(
                df,
                test_size=self.model_params[DATA][VAL_SIZE],
                random_state=self.model_params[RANDOM_STATE],
                stratify=list(df[LABEL_COLUMN_NAME]),
            )
        except Exception as e:
            logger.error(f"Error during train/validation split: {e}")
            raise

    def tokenize_batch(self, texts: List[str], batch_size: int = 32) -> Any:
        try:
            logger.info("Tokenizing batch of texts.")
            tokenized_data = []
            for i in range(0, len(texts), self.model_params[ENCODING][BATCH_SIZE]):
                batch_text = texts[i:i + self.model_params[ENCODING][BATCH_SIZE]]
                tokenized_batch = self.tokenizer(batch_text, max_length=self.model_params[ENCODING][MAX_LENGTH],
                                                 padding='max_length', truncation=True, return_tensors='tf')['input_ids']
                tokenized_data.append(tokenized_batch)
            return tf.concat(tokenized_data, axis=0)
        except Exception as e:
            logger.error(f"Error during tokenization: {e}")
            raise

    def build_and_train_model(self, train_dataset, val_dataset, callbacks: List[Any], n_steps):
        """
        Build and train the model using TFDistilBertModel and TensorFlow's Keras API.
        """
        try:
            logger.info("Building the model.")
            bert_model = TFDistilBertModel.from_pretrained(self.model_params[TRANSFORMERS][MODEL])

            input_ids = tf.keras.layers.Input(shape=(self.model_params[ENCODING][MAX_LENGTH],), dtype='int32')
            bert_output = bert_model(input_ids)[0]
            cls_token = bert_output[:, 0, :]
            output = tf.keras.layers.Dense(len(self.label_encoder.classes_), activation='softmax')(cls_token)

            model = tf.keras.models.Model(inputs=input_ids, outputs=output)

            logger.info("Compiling the model.")
            model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.model_params[TRAIN][OPTIMIZATION][INIT_LR]),
                          loss=SparseCategoricalCrossentropy(),
                          metrics=['accuracy'])

            logger.info("Training the model.")
            train_start = time.time()
            model.fit(train_dataset,
                      validation_data=val_dataset,
                      steps_per_epoch=n_steps,
                      epochs=self.model_params[TRAIN][EPOCHS],
                      batch_size=self.model_params[TRAIN][BATCH_SIZE],
                      callbacks=callbacks)
            train_time_minutes = round((time.time() - train_start) / 60, 2)
            logger.info(f"The training has finished, took {train_time_minutes} minutes.")
            return model
        except Exception as e:
            logger.error(f"Error during model building or training: {e}")
            raise

In [4]:
def load_model(model_path: str) -> Any:
    """Loads the model from the given path and returns it."""
    logger.info(f'Loading the Model... The path: "{model_path}"')
    try:
        model = tf.saved_model.load(model_path)
        model = build_model_from_pb(model)
        if not model:
            raise Exception("Failed to load the model")
        logger.info("Model was loaded successfully.")
        return model
    except Exception as e:
        logger.error(f"Error loading model from {model_path}: {e}")
        raise

def build_model_from_pb(pb_model: Any) -> Any:
    """Builds and returns a Keras model from a TensorFlow Hub protobuf model."""
    try:
        input_ids_layer = tf.keras.layers.Input(
            shape=(32,),
            name="input_word",
            dtype="int32",
        )
        keras_layer = hub.KerasLayer(pb_model, trainable=False)(input_ids_layer)
        model = tf.keras.Model([input_ids_layer], keras_layer)
        return model
    except Exception as e:
        logger.error(f"Error building model from protobuf: {e}")
        raise

In [8]:
class ModelQuantizer:
    """Class to quantize and save a Keras model."""

    @staticmethod
    def quantize_and_save_model(model_path: str, output_dir: str):
        """Quantizes the Keras model and saves it as a TensorFlow Lite model."""
        model = load_model(model_path)
        print(model.input_shape[0])
        #model = tf.keras.models.load_model(model_path)
        if model.input_shape[0] is not None:
            raise ValueError("Model input shape should have a dynamic batch size (None).")
        os.makedirs(output_dir, exist_ok=True)
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.target_spec.supported_types = [tf.float16]
        converter.allow_custom_ops = True
        tflite_model = converter.convert()

        with open(os.path.join(output_dir, 'quantized_model.tflite'), 'wb') as f:
            f.write(tflite_model)

        print(f"Quantized model saved to: {os.path.join(output_dir, 'quantized_model.tflite')}")

In [9]:
t = ModelQuantizer

In [10]:
t.quantize_and_save_model("model_artifacts","model_quantize")

None




INFO:tensorflow:Assets written to: /tmp/tmp1oi8c5p8/assets


INFO:tensorflow:Assets written to: /tmp/tmp1oi8c5p8/assets
2024-09-23 10:57:28.736554: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:362] Ignored output_format.
2024-09-23 10:57:28.736672: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:365] Ignored drop_control_dependency.
2024-09-23 10:57:28.736879: I tensorflow/cc/saved_model/reader.cc:45] Reading SavedModel from: /tmp/tmp1oi8c5p8
2024-09-23 10:57:28.791427: I tensorflow/cc/saved_model/reader.cc:89] Reading meta graph with tags { serve }
2024-09-23 10:57:28.791486: I tensorflow/cc/saved_model/reader.cc:130] Reading SavedModel debug info (if present) from: /tmp/tmp1oi8c5p8
2024-09-23 10:57:28.978939: I tensorflow/cc/saved_model/loader.cc:229] Restoring SavedModel bundle.
2024-09-23 10:57:29.953645: I tensorflow/cc/saved_model/loader.cc:213] Running initialization op on SavedModel bundle at path: /tmp/tmp1oi8c5p8
2024-09-23 10:57:30.237304: I tensorflow/cc/saved_model/loader.cc:305] SavedModel

Quantized model saved to: model_quantize/quantized_model.tflite


In [1]:
class ModelLoader:
    """Loads models and model parameters."""

    @staticmethod
    def load_model_params(model_params_path: str) -> dict:
        """Loads the model parameters from the given path."""
        try:
            with file_io.FileIO(model_params_path, "r") as f:
                model_params = yaml.safe_load(f)
            if not model_params:
                raise FileNotFoundError(f"Model parameters file not found: {model_params_path}")
            return model_params
        except Exception as e:
            raise RuntimeError(f"Error loading model parameters: {e}")

    @staticmethod
    def load_model(model_path: str) -> tf.keras.Model:
        """Loads and builds a TensorFlow model from the given path."""
        try:
            model = tf.saved_model.load(model_path)
            return ModelLoader.build_model_from_pb(model)
        except Exception as e:
            raise RuntimeError(f"Error loading model from {model_path}: {e}")

    @staticmethod
    def build_model_from_pb(pb_model: tf.Module) -> tf.keras.Model:
        """Builds a Keras model from a TensorFlow Hub protobuf model."""
        try:
            input_ids_layer = tf.keras.layers.Input(shape=(32,), name="input_word", dtype="int32")
            keras_layer = hub.KerasLayer(pb_model, trainable=False)(input_ids_layer)
            model = tf.keras.Model([input_ids_layer], keras_layer)
            return model
        except Exception as e:
            raise RuntimeError(f"Error building model from protobuf: {e}")


class ModelQuantizer:
    """Class to quantize and save a Keras model."""

    @staticmethod
    def quantize_and_save_model(model_path: str, output_dir: str):
        """Quantizes the Keras model and saves it as a TensorFlow Lite model."""
        model = ModelLoader.load_model(model_path)
        os.makedirs(output_dir, exist_ok=True)
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.target_spec.supported_types = [tf.float16]
        tflite_model = converter.convert()

        tflite_path = os.path.join(output_dir, 'quantized_model.tflite')
        with open(tflite_path, 'wb') as f:
            f.write(tflite_model)

        print(f"Quantized model saved to: {tflite_path}")


class InferenceQuantizeModel:
    """Handles loading the quantized model and making predictions."""

    def __init__(self, model_path: str, model_params_path: str):
        self.model_params = ModelLoader.load_model_params(model_params_path)
        self.model = self.load_model(f"{model_path}/quantized_model.tflite")
        self.label_encoder = self.load_label_encoder(f"{model_path}/{LABEL_ENCODER_PKL}")
        self.tokenizer = self.get_tokenizer()
        print(self.input_details)
    def load_model(self, model_path: str) -> tf.lite.Interpreter:
        """Loads the quantized TensorFlow Lite model."""
        model = tf.lite.Interpreter(model_path=model_path)
        model.allocate_tensors()
        self.input_details = model.get_input_details()
        self.output_details = model.get_output_details()
        return model
    def get_tokenizer(self) -> Any:
        try:
            logger.info("Download the tokenizer from Huggingface.")
            return DistilBertTokenizer.from_pretrained(
                self.model_params[TRANSFORMERS][TOKENIZER]
            )
        except Exception as e:
            logger.error(f"Error downloading tokenizer: {e}")
            raise

    def load_label_encoder(self, label_encoder_path: str) -> LabelEncoder:
        """Loads the label encoder from a file."""
        try:
            with file_io.FileIO(label_encoder_path, mode="rb") as encoder_file:
                label_encoder = joblib.load(encoder_file)
            if not label_encoder:
                raise FileNotFoundError("Label encoder not found.")
            return label_encoder
        except Exception as e:
            raise RuntimeError(f"Error loading label encoder: {e}")

    
    def predict(self, inputs: tf.Tensor):
        """ Makes predictions on the provided input and returns the categories and scores."""

        predictions = []
        prob_scores = []

        # Loop through each example (assuming the model expects batch size of 1)
        for i in range(inputs.shape[0]):
            input_data = tf.cast(inputs[i:i+1], dtype=tf.int32)  # Select one example at a time

            # Set the input tensor for the TFLite model
            self.model.set_tensor(self.input_details[0]['index'], input_data.numpy())  # Convert to NumPy for TFLite

            # Run inference
            self.model.invoke()

            # Get the output tensor from the model
            prediction = self.model.get_tensor(self.output_details[0]['index'])

            # Collect the prediction and probability scores
            predictions.append(prediction)
            prob_scores.append(np.max(prediction))

        # Convert predictions to appropriate format
        predictions = np.concatenate(predictions, axis=0)
        prediction_transform = np.argmax(predictions, axis=1)
        transform_cat = self.label_encoder.inverse_transform(prediction_transform)

        return transform_cat, np.array(prob_scores)




class CSVProcessor:
    """Processes CSV files for inference."""

    def __init__(self, inference_model: InferenceQuantizeModel):
        self.inference_model = inference_model

    def process_csv(self, file_path: str, prediction_dir: str):
        """Processes the CSV file and saves predictions."""
        df = pd.read_csv(file_path)
        #df = df.head(5)
        x_train = self.tokenize_batch(list(df[FEATURE_COLUMN_NAME]))
        predictions, prob_scores = self.inference_model.predict(x_train)

        df[PREDICTED_CATEGORY] = predictions
        df[PROB_SCORES] = prob_scores

        output_path = os.path.join(prediction_dir, FILE_NAME_PREFIX + ".csv")
        df.to_csv(output_path, index=False)
        print(f"Predictions saved to: {output_path}")

    def tokenize_batch(self, texts: List[str]) -> Any:
        try:
            logger.info("Tokenizing batch of texts.")
            tokenized_data = []
            for i in range(0, len(texts), self.inference_model.model_params[ENCODING][BATCH_SIZE]):
                batch_text = texts[i : i + self.inference_model.model_params[ENCODING][BATCH_SIZE]]
                tokenized_batch = self.inference_model.tokenizer(
                    batch_text,
                    max_length=self.inference_model.model_params[ENCODING][MAX_LENGTH],
                    padding="max_length",
                    truncation=True,
                    return_tensors="tf",
                )["input_ids"]
                tokenized_data.append(tokenized_batch)
            return tf.concat(tokenized_data, axis=0)
        except Exception as e:
            logger.error(f"Error during tokenization: {e}")
            raise


class InferenceCLI:
    """Command-line interface for running inference."""

    def __init__(self, model_path: str, model_params_path: str):
        self.inference_model = InferenceQuantizeModel(model_path, model_params_path)

    def run_csv_inference(self, csv_path: str, prediction_dir: str):
        """Runs inference on the specified CSV file."""
        processor = CSVProcessor(self.inference_model)
        processor.process_csv(csv_path, prediction_dir)

NameError: name 'tf' is not defined

In [2]:
A = InferenceCLI("model_quantize","dev.yml")

NameError: name 'InferenceCLI' is not defined

In [None]:
A.run_csv_inference("data/test/search_data.csv","data")

In [None]:
# CLI command function to run inference and quantization
def main():
    import argparse

    parser = argparse.ArgumentParser(description="Model Quantization and Inference CLI")
    parser.add_argument("--quantize", action="store_true", help="Quantize the model.")
    parser.add_argument("--model_path", type=str, required=True, help="Path to the model.")
    parser.add_argument("--output_dir", type=str, help="Output directory for the quantized model.")
    parser.add_argument("--csv_inference", type=str, help="Path to the CSV file for inference.")
    parser.add_argument("--model_params_path", type=str, required=True, help="Path to model params.")
    parser.add_argument("--prediction_dir", type=str, help="Directory to save predictions.")

    args = parser.parse_args()

    if args.quantize:
        ModelQuantizer.quantize_and_save_model(args.model_path, args.output_dir)

    if args.csv_inference:
        cli = InferenceCLI(args.model_path, args.model_params_path)
        cli.run_csv_inference(args.csv_inference, args.prediction_dir)


if __name__ == "__main__":
    main()
