In [None]:
from torch import cuda, bfloat16
import transformers
import os


device =  f'cuda:{cuda.current_device()}' if cuda.is_available() else 'cpu'
# set max memory
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:256"
# cuda.set_per_process_memory_fraction(0.6, device=None)

model = transformers.AutoModelForCausalLM.from_pretrained(
    'mosaicml/mpt-7b-instruct',
    trust_remote_code=True,
    torch_dtype=bfloat16,
    max_seq_len= 64, # limit seq length 
    # max_batch_size= 2, # limit batch size
    
)
model.eval()
model.to(device)
print(f"Model loaded on {device}")

In [None]:
accelerator = Accelerator()
accelerator.device

In [None]:
from functools import partial
from typing import Any, Dict, List, Mapping, Optional, Set
from pydantic import Extra, Field, root_validator
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM
from langchain.llms.utils import enforce_stop_tokens
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
import torch
from accelerate import Accelerator, load_checkpoint_and_dispatch, init_empty_weights
from tqdm.auto import tqdm
from threading import Thread
from huggingface_hub import snapshot_download, cached_assets_path

from accelerate import infer_auto_device_map

"""Wrapper for the MosaicML MPT models."""
class MosaicML(LLM):
    model_name: str = Field("mosaicml/mpt-7b-instruct", alias='model_name')
    """The name of the model to use."""

    tokenizer_name: str = Field("EleutherAI/gpt-neox-20b", alias='tokenizer_name')
    """The name of the sentence tokenizer to use."""

    config: Any = None #: :meta private:
    """The reference to the loaded configuration."""

    tokenizer: Any = None #: :meta private:
    """The reference to the loaded tokenizer."""

    model: Any = None #: :meta private:
    """The reference to the loaded model."""

    accelerator: Any = None #: :meta private:
    """The reference to the loaded hf device accelerator."""

    attn_impl: str = Field("torch", alias='attn_impl')
    """The attention implementation to use."""

    torch_dtype: Any = Field(torch.bfloat16, alias='torch_dtype')
    """The torch data type to use."""

    max_new_tokens: Optional[int] = Field(10000, alias='max_new_tokens')
    """The maximum number of tokens to generate."""

    do_sample: Optional[bool] = Field(True, alias='do_sample')
    """Whether to sample or not."""

    temperature: Optional[float] = Field(0.1, alias='temperature')
    """The temperature to use for sampling."""

    echo: Optional[bool] = Field(False, alias='echo')
    """Whether to echo the prompt."""
    
    stop: Optional[List[str]] = []
    """A list of strings to stop generation when encountered."""

    device_map: Optional[dict] = {'transformer.wte': 0, 'transformer.emb_drop': 0, 'transformer.blocks.0': 0, 'transformer.blocks.1': 'cpu', 
                                  'transformer.blocks.2': 'cpu', 'transformer.blocks.3': 'cpu', 'transformer.blocks.4': 'cpu', 'transformer.blocks.5': 'cpu', 'transformer.blocks.6': 'cpu', 
                                  'transformer.blocks.7': 'cpu', 'transformer.blocks.8': 'cpu', 'transformer.blocks.9': 'cpu', 'transformer.blocks.10': 'cpu', 'transformer.blocks.11': 'cpu', 
                                  'transformer.blocks.12': 'cpu', 'transformer.blocks.13': 'cpu', 'transformer.blocks.14': 'cpu', 'transformer.blocks.15': 'cpu', 'transformer.blocks.16': 'cpu', 
                                  'transformer.blocks.17.norm_1': 'cpu', 'transformer.blocks.17.attn.Wqkv': 'cpu', 'transformer.blocks.17.attn.out_proj': 'cpu', 'transformer.blocks.17.norm_2': 'cpu', 
                                  'transformer.blocks.17.ffn': 'cpu', 'transformer.blocks.17.resid_attn_dropout': 'cpu', 'transformer.blocks.17.resid_ffn_dropout': 'cpu', 
                                  'transformer.blocks.18': 'cpu', 'transformer.blocks.19': 'cpu', 'transformer.blocks.20': 'cpu', 'transformer.blocks.21': 'cpu', 
                                  'transformer.blocks.22': 'cpu', 'transformer.blocks.23': 'cpu', 'transformer.blocks.24': 'cpu', 'transformer.blocks.25': 'cpu', 'transformer.blocks.26': 'cpu', 
                                  'transformer.blocks.27': 'cpu', 'transformer.blocks.28': 'cpu', 'transformer.blocks.29': 'cpu', 'transformer.blocks.30': 'cpu', 
                                  'transformer.blocks.31': 'cpu', 'transformer.norm_f': 'cpu'}
    

    class Config:
        """Configuration for this pydantic object."""

        extra = Extra.forbid


    def _mpt_default_params(self) -> Dict[str, Any]:
        """Get the default parameters."""
        return {
            "max_new_tokens": self.max_new_tokens,
            "temperature": self.temperature,
            "do_sample": self.do_sample,
        }
    
    @staticmethod
    def _mpt_param_names() -> Set[str]:
        """Get the identifying parameters."""
        return {
            "max_new_tokens",
            "temperature",
            "do_sample",
        }

    @staticmethod
    def _model_param_names(model_name: str) -> Set[str]:
        """Get the identifying parameters."""
        # TODO: fork for different parameters for different model variants.
        return MosaicML._mpt_param_names()
    
    def _default_params(self) -> Dict[str, Any]:
        """Get the default parameters."""
        return self._mpt_default_params()
    
    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate the environment."""
        try:
            # This module is supermassive so we use the transformers accelerator to load it.
            values['accelerator'] = Accelerator()
            print("[" + values["model_name"] + "] Downloading model (or fetching from cache)...")
            download_location = snapshot_download(repo_id=values["model_name"], use_auth_token=True, local_files_only=True)
            print("[" + values["model_name"] + "] Model location: " + str(download_location))
            offload_cache_location = cached_assets_path(library_name="langchain", namespace=values["model_name"], subfolder="offload")
            print("[" + values["model_name"] + "] Offload cache location: " + str(offload_cache_location))
            print("[" + values["model_name"] + "] AutoConfiguring...")
            values["config"] = AutoConfig.from_pretrained(values["model_name"], 
                                                        trust_remote_code=True, 
                                                        ) 
                                                        
            values["config"].attn_config['attn_impl'] = values["attn_impl"]
            values["tokenizer"] = AutoTokenizer.from_pretrained(values["tokenizer_name"])
            print("[" + values["model_name"] + "] Initializing empty weights for model...")
            with init_empty_weights():
                values["model"] = AutoModelForCausalLM.from_pretrained(
                    values["model_name"],
                    config=values["config"],
                    torch_dtype=values["torch_dtype"],
                    trust_remote_code=True,
                    device_map=values["device_map"],
                )
                
            # device_map = infer_auto_device_map(values["model"], max_memory={0: "7GiB", "cpu": "12GiB"})
            # print(device_map)
            print("[" + values["model_name"] + "] Tying weights...")
            values["model"].tie_weights()
            print("[" + values["model_name"] + "] Dispatching checkpoint...")
            values["model"] = load_checkpoint_and_dispatch(
                values["model"], 
                download_location, 
                device_map=values["device_map"], 
                no_split_module_classes=["MPTBlock"],
                offload_folder=offload_cache_location
            )
            print("[" + values["model_name"] + "] Loaded successfully!")
        except Exception as e:
            raise Exception(f"MosaicML failed to load with error: {e}")
        return values
    
    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        """Get the identifying parameters."""
        return {
            "model": self.model_name,
            **self._default_params(),
            **{
                k: v
                for k, v in self.__dict__.items()
                if k in self._model_param_names(self.model_name)
            },
        }
    
    @property
    def _llm_type(self) -> str:
        """Return the type of llm."""
        return "mosaicml"

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
    ) -> str:
        r"""Call out to MosiacML's generate method via transformers.

        Args:
            prompt: The prompt to pass into the model.
            stop: A list of strings to stop generation when encountered.

        Returns:
            The string generated by the model.

        Example:
            .. code-block:: python

                prompt = "This is a story about a big sabre tooth tiger: "
                response = model(prompt)
        """
        text_callback = None
        if run_manager:
            text_callback = partial(run_manager.on_llm_new_token, verbose=self.verbose)
        text = ""
        inputs = self.tokenizer([prompt], return_tensors='pt')
        inputs = inputs.to(self.accelerator.device)
        streamer = TextIteratorStreamer(tokenizer=self.tokenizer, skip_prompt=True)
        generation_kwargs = dict(inputs, streamer=streamer, **self._mpt_default_params())
        thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
        thread.start()
        text = ""
        pbar = tqdm(total=self.max_new_tokens, desc="Thinking", leave=False)
        for new_text in streamer:
            if text_callback:
                text_callback(new_text)
            text += new_text
            pbar.update(1)
        pbar.close()
        if stop is not None:
            text = enforce_stop_tokens(text, stop)
        return text

In [None]:
llm = MosaicML(model_name='mosaicml/mpt-7b-instruct', 
               attn_impl='torch', 
               torch_dtype=torch.bfloat16, 
               max_new_tokens=200, 
               echo=True)

In [None]:
import torch
from transformers import StoppingCriteria, StoppingCriteriaList

tokenizer = transformers.AutoTokenizer.from_pretrained("EleutherAI/gpt-neox-20b")
# mtp-7b is trained to add "<|endoftext|>" at the end of generations
stop_token_ids = tokenizer.convert_tokens_to_ids(["<|endoftext|>"])

# define custom stopping criteria object
class StopOnTokens(StoppingCriteria):
    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        for stop_id in stop_token_ids:
            if input_ids[0][-1] == stop_id:
                return True
        return False

stopping_criteria = StoppingCriteriaList([StopOnTokens()])

In [None]:
stop_token_ids


d_model: int=2048, 
n_heads: int=16, 
n_layers: int=24, 
expansion_ratio: int=4, 
max_seq_len: int=2048, 
vocab_size: int=50368, 
resid_pdrop: float=0.0, 
emb_pdrop: float=0.0, 
learned_pos_emb: bool=True, 
attn_config: Dict=attn_config_defaults, 
init_device: str='cpu', 
logit_scale: Optional[Union[float, str]]=None, 
no_bias: bool=False, 
verbose: int=0, 
embedding_fraction: float=1.0, n
orm_type: str='low_precision_layernorm', 
use_cache: bool=False, 
init_config: Dict=init_config_defaults, **kwargs)

In [None]:
generate_text = transformers.pipeline(
    model=model, tokenizer=tokenizer,
    return_full_text=True,  # langchain expects the full text
    task='text-generation',
    device=device,
    # we pass model parameters here too
    stopping_criteria=stopping_criteria,  # without this model will ramble
    temperature=0.1,  # 'randomness' of outputs, 0.0 is the min and 1.0 the max
    top_p=0.15,  # select from top tokens whose probability add up to 15%
    top_k=0,  # select from top 0 tokens (because zero, relies on top_p)
    max_new_tokens=64,  # max number of tokens to generate in the output
    repetition_penalty=1.1  # penalizes repetition in tokens generated 
)

In [None]:
res = generate_text("Explain to me the difference between nuclear fission and fusion.")
print(res[0]["generated_text"])

In [None]:

from langchain import PromptTemplate, LLMChain
from langchain.llms import HuggingFacePipeline

# template for an instruction with no input
prompt = PromptTemplate(
    input_variables=["instruction"],
    template="{instruction}"
)

llm = HuggingFacePipeline(pipeline=generate_text)

llm_chain = LLMChain(llm=llm, prompt=prompt)


In [None]:
print(llm_chain.predict(instruction = "Explain the differernce between yoghurt and cheese").lstrip())

In [None]:
# use in conversational chain
from langchain.vectorstores import Chroma  
from langchain.chains import ConversationalRetrievalChain
from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import PyPDFLoader


loader = PyPDFLoader("data/ocbc_net_zero_report.pdf")
documents = loader.load_and_split()

text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50) 
documents = text_splitter.split_documents(documents)


qa = ConversationalRetrievalChain.from_llm(OpenAI(temperature=0), vectorstore.as_retriever(), memory=memory)