## Import Dependencies

In [10]:
import logging
from abc import ABC, abstractmethod
import math
import json
import importlib
import sys
import configparser
import requests
import os
import openpyxl
import json
import xlwings as xw
from dotenv import load_dotenv
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer,BitsAndBytesConfig,TextStreamer,pipeline
from huggingface_hub import InferenceApi,InferenceClient
import time
from io import StringIO
from IPython.display import display, HTML,Markdown
import gradio as gr
#from pyspark import SparkConf
#from pyspark.sql import SparkSession, DataFrame

## Load Env Variables from .env

In [3]:
# Load Environment Variables from .env
load_dotenv()
os.environ["SPARK_HOME"] = os.getenv("PROJ_SPARK_HOME","")
os.environ["HF_TOKEN"] = os.getenv("HF_TOKEN","")
hf_token=os.getenv("HF_TOKEN","")
project_path = os.getenv("PROJECT_PATH")
dataset_path = f"{project_path}\llm_custom_apps\\datasets"
test_data_path = f"{dataset_path}\\test_datasets"
app_data_path = f"{dataset_path}\\app_datasets"
quant_model_path = f"{project_path}\\quant_models"
# Set the custom cache directory for Hugging Face Transformers
os.environ['HF_HOME'] = f"{project_path}\llm_custom_apps\\.hf_cache_dir"

## Logger Method

In [4]:
def create_logger(logger_nm):
    """
    Function takes a Logger Name and returns a Generic Logger
    :param logger_nm: Name of the Logger that is prefixed to the Log Statements
    :return: logger object to be used across different scripts
    """

    # Define generic logger variable
    gen_logger = logging.getLogger(logger_nm)
    # a) Create Streaming Handler and Set level to Debug
    gen_logger.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    # b) Create formatter
    lg_str = "\n%(name)s - %(levelname)s - %(lineno)s - %(funcName)s - %(asctime)s - %(message)s"
    formatter = logging.Formatter(lg_str)
    # c) Add formatter to ch
    ch.setFormatter(formatter)
    # d) Add ch to logger after clearing previous handlers
    gen_logger.handlers.clear()
    gen_logger.addHandler(ch)

    return gen_logger

In [5]:
gen_logger = create_logger("Generic Logger")

## Reusable Rest API method

In [6]:
def reusable_rest_api(api_url, req_data=None, cert_verify=False, header=None, method="GET", resp="JSON", retry_lim=3):
    """
    Make HTTP requests to a specified API endpoint with various options.

    Parameters:
    - api_url (str): The URL of the API endpoint to make the request to.
    - req_data (str or dict, optional): The request data to send to the API (e.g., JSON payload). Default is None.
    - cert_verify (bool, optional): Whether to verify the SSL certificate. Default is False.
    - header (dict, optional): Additional headers to include in the request. Default is None.
    - method (str, optional): The HTTP request method to use (GET, POST, PUT, DELETE). Default is "GET".
    - resp (str, optional): The expected response type ("JSON" or "RAW"). Default is "JSON".
    - retry_lim (int, optional): The maximum number of retry attempts in case of connection errors. Default is 3.

    Returns:
    - tuple: A tuple containing the HTTP status code of the response and the response content.

    Raises:
    - requests.exceptions.HTTPError: If an invalid API method is specified.
    - requests.exceptions.ConnectionError: If a connection error occurs, and the maximum retry limit is exceeded.
    - requests.exceptions.Timeout: If a timeout occurs while making the request.
    - requests.exceptions.RequestException: If a general request exception occurs.
    - ValueError: If there is an issue parsing the response content as JSON.

    Example usage:
    status_code, response_data = reusable_rest_api(
        api_url="https://example.com/api",
        req_data={"key": "value"},
        cert_verify=True,
        header={"Authorization": "Bearer Token"},
        method="POST",
        resp="JSON",
        retry_lim=3
    )
    """
    retry = 1
    while True:
        resp_content = ""
        try:
            if method == "GET":
                resp_content = requests.get(api_url, header=header, data=req_data, verify=cert_verify)
                if resp == "JSON": resp_json = json.loads(resp_content.content)
            elif method == "POST":
                resp_content = requests.post(api_url, header=header, data=req_data, verify=cert_verify)
                if resp == "JSON": resp_json = json.loads(resp_content.content)
            elif method == "PUT":
                resp_content = requests.put(api_url, header=header, data=req_data, verify=cert_verify)
                if resp == "JSON": resp_json = json.loads(resp_content.content)
            elif method == "DELETE":
                resp_content = requests.delete(api_url, header=header, data=req_data, verify=cert_verify)
                resp_json = None
            else:
                raise requests.exceptions.HTTPError("Invalid API Method")
        except requests.exceptions.HTTPError as err:
            gen_logger.error(f"The exception while calling this api is: {err}")
            sys.exit(1)
        except requests.exceptions.ConnectionError as err:
            gen_logger.error(f"The exception while calling this api is: {err}")
            retry = retry + 1
            if retry <= retry_lim+1:  continue
            gen_logger.error("Retry Limit Exceeded")
            sys.exit(1)
        except requests.exceptions.Timeout as err:
            gen_logger.error(f"The exception while calling this api is: {err}")
            sys.exit(1)
        except requests.exceptions.RequestException as err:
            gen_logger.error(f"The exception while calling this api is: {err}")
            sys.exit(1)
        except ValueError as err:
            gen_logger.error(f"The exception while calling this api is: {err}")
            retry = retry + 1
            if retry <= retry_lim+1:  continue
            gen_logger.error("Retry Limit Exceeded")
            sys.exit(1)

        final_resp = resp_json if (resp == "JSON") else resp_content
        return resp_content.status_code,final_resp

## Method for Saving Quantized Model

In [7]:
def quantize_model_with_bitsandbytes(model_name: str, quant_model_directory: str, quantization_bits: int = 4):
    """
    Quantize a Hugging Face model using bitsandbytes for 4-bit quantization.
    
    Args:
        model_name (str): Name of the Hugging Face model to quantize.
        quant_model_directory (str): Directory to save the quantized model.
        quantization_bits (int): Number of bits for quantization (default is 4).
    """
    output_dir = f"{quant_model_path}/{quant_model_directory}"
    # Validate input
    if quantization_bits != 4:
        raise ValueError("Only 4-bit quantization is supported using bitsandbytes.")
    
    # Set up quantization configuration
    quant_config = BitsAndBytesConfig(
      load_in_4bit=True,
      bnb_4bit_use_double_quant=True,
      bnb_4bit_compute_dtype=torch.bfloat16,
      bnb_4bit_quant_type="nf4"
    )
    
    # Load model with quantization
    print("Loading model with quantization...")
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="auto",
        quantization_config=quant_config
    )
    
    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Save the quantized model
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    
    print(f"Quantized model saved to {output_dir}")

## Method for Loading Response from Quantised model

In [8]:
def generate_responses(model_name: str, messages: list):
    """
    Generate a response using a conversational model from Hugging Face, stream the response in real-time, 
    and clean up resources after execution.

    This function loads a pre-trained language model and tokenizer from Hugging Face, tokenizes the input 
    messages, and streams the generated response token-by-token. After the response is generated (or if an 
    error occurs), the resources such as model, tokenizer, inputs, and streamer are cleaned up manually to 
    free GPU memory.

    Args:
        model_name (str): The name or path of the pre-trained model to use from Hugging Face.
        messages (list): A list of messages to provide as input to the model. Each message is expected to 
                         follow the format required for conversational models (e.g., a list of alternating 
                         user and assistant messages).
                         
    Returns:
        None: The function prints the generated response in real-time and clears resources upon completion.
    """

    model = None
    tokenizer = None
    streamer = None
    inputs = None
    
    try:
        # Load model and tokenizer from Hugging Face
        print(f"Loading model: {model_name}...")
        model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        tokenizer.pad_token = tokenizer.eos_token

        # Convert messages to model input format
        print("Tokenizing input messages...")
        inputs = tokenizer.apply_chat_template(messages, return_tensors="pt").to("cuda")

        # Set up the TextStreamer to stream tokens as they are generated
        print("Setting up streamer for real-time output...")
        streamer = TextStreamer(tokenizer)

        # Generate and stream the response
        print("Generating response...")
        model.generate(inputs['input_ids'], max_new_tokens=50, streamer=streamer)

    except Exception as e:
        print(f"Error occurred: {e}")
        
    finally:
        # Clean up resources manually if the function fails
        if model is not None:
            del model  # Delete the model
            print("Model deleted")

        if tokenizer is not None:
            del tokenizer  # Delete the tokenizer
            print("Tokenizer deleted")

        if inputs is not None:
            del inputs  # Delete the inputs
            print("Inputs deleted")

        if streamer is not None:
            del streamer  # Delete the streamer
            print("Streamer deleted")

        # Clear GPU memory cache
        torch.cuda.empty_cache()
        print("CUDA cache cleared")

In [9]:
def generate_responses_from_inf(hf_token, messages, model_name, max_tokens):
    """
    Generate an HTML response by sending a chat completion request to the Hugging Face API.

    Parameters:
        hf_token (str): The Hugging Face API token.
        messages (list): The messages to be sent in the chat completion request.
        model_name (str): The name of the model to use for the chat completion.
        max_tokens (int): The maximum number of tokens for the completion.

    Returns:
        str: The content of the model's response, or an error message if the request fails.
    """
    try:
        # Initialize the Inference Client
        client = InferenceClient(api_key=hf_token)

        # Request a chat completion
        completion = client.chat.completions.create(
            model=model_name, 
            messages=messages, 
            max_tokens=max_tokens
        )

        # Extract the response content
        response_content = completion.choices[0].message["content"]

        # Return the content
        return response_content
    
    except Exception as e:
        # Return error message if there is an exception
        print(f"Error: {str(e)}")
        return f"Model ({model_name}) Busy or Unavailable. Try again after sometime"