In [53]:
# ! pip install langchain deepgram-sdk langchain-groq langchain-openai torch transformers pillow python-dotenv

In [96]:
import requests
import time
import os
import shutil
import subprocess
import asyncio

In [91]:
# show me how toload .env using os.environ
from dotenv import load_dotenv
load_dotenv()

True

### Langchain for Multimodal LLMs

In [56]:
# import langchain
from langchain_groq import ChatGroq
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory, ConversationBufferWindowMemory
from langchain.schema.output_parser import StrOutputParser
from langchain.prompts import (
     ChatPromptTemplate,
     MessagesPlaceholder,
     SystemMessagePromptTemplate,
     HumanMessagePromptTemplate,
 )
from langchain.chains import LLMChain
from langchain.agents import initialize_agent
from langchain.tools import BaseTool

### LLM : Groq-hosted Mixtral 

In [57]:
llm = ChatGroq(temperature=0, model_name="mixtral-8x7b-32768", groq_api_key=os.getenv("GROQ_API_KEY"))

In [58]:
system = "You are a helpful assistant."
human = "{text}"
prompt = ChatPromptTemplate.from_messages([("system", system), ("human", human)])
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
out = chain.invoke({"text":"Explain the importance of low latency LLMs."})
print(out)

Low Latency Large Language Models (LLMs) are a type of artificial intelligence model that can process and generate human-like text with minimal delay or latency. The importance of low latency in LLMs can be explained through the following points:

1. Improved user experience: Low latency ensures that the model responds quickly to user inputs, providing a smooth and responsive user experience. This is particularly important in real-time applications such as chatbots, virtual assistants, and interactive games.
2. Enhanced accuracy: LLMs with low latency are more likely to generate accurate and coherent responses, as they can process and consider the context of the user's input more effectively. High latency can result in the model missing important context or generating responses that are disconnected from the user's input.
3. Better decision-making: In applications where LLMs are used for decision-making, such as financial trading or autonomous vehicles, low latency is critical for ensu

In [59]:
# Test Async generation and streaming
prompt = ChatPromptTemplate.from_messages([("human", "Write a Limerick about {topic}")])
chain = prompt | llm | output_parser
out = await chain.ainvoke({"topic": "The Sun"})
print(out)
#await chain.ainvoke()

prompt = ChatPromptTemplate.from_messages([("human", "Write a haiku about {topic}")])
chain = prompt | llm
for chunk in chain.stream({"topic": "The Moon"}):
    print(chunk.content, end="", flush=True)

There's a bright ball of gas in the sky,
That rises and sets, making spirits high.
It gives us light and warmth,
On sunny days it transforms,
The world into a golden, cheery pi.

(Note: I tried to make the last line "The world into a golden, cheery high", but that didn't fit the rhythm of a limerick. So I changed it to "pi", which is a mathematical constant and a playful way to end the limerick.)
Silent, luminous,
Glowing in the velvet night,
The Moon's gentle light.

### Speech Model : Deepgram 

In [92]:
# speech: deepgram
from deepgram import (
    DeepgramClient,
    DeepgramClientOptions,
    LiveTranscriptionEvents,
    LiveOptions,
    Microphone,
)


### Multimodal: Image processing Agent

In [61]:
import torch
# image processing
from transformers import (
    BlipProcessor, 
    BlipForConditionalGeneration
)

In [62]:
# specify model to be used
hf_model = "Salesforce/blip-image-captioning-large"
# use GPU if it's available
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# preprocessor will prepare images for the model
processor = BlipProcessor.from_pretrained(hf_model)
# then we initialize the model itself
model = BlipForConditionalGeneration.from_pretrained(hf_model).to(device)

In [63]:
from PIL import Image

img_url = 'https://images.unsplash.com/photo-1616128417859-3a984dd35f02?ixlib=rb-4.0.3&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=2372&q=80' 
image = Image.open(requests.get(img_url, stream=True).raw).convert('RGB')
#image
# unconditional image captioning
inputs = processor(image, return_tensors="pt").to(device)
out = model.generate(**inputs, max_new_tokens=30)
print(processor.decode(out[0], skip_special_tokens=True))

there is a monkey that is sitting in a tree


In [64]:
# Creating it as a llm tool
desc = (
    "use this tool when given the URL of an image that you'd like to be "
    "described. It will return a simple caption describing the image."

)

class ImageCaptionTool(BaseTool):
    name = "Image captioner"
    description = desc
    
    def _run(self, url: str):
        # download the image and convert to PIL object
        image = Image.open(requests.get(img_url, stream=True).raw).convert('RGB')
        # preprocess the image
        inputs = processor(image, return_tensors="pt").to(device)
        # generate the caption
        out = model.generate(**inputs, max_new_tokens=20)
        # get the caption
        caption = processor.decode(out[0], skip_special_tokens=True)
        return caption
    
    def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

tools = [ImageCaptionTool()]

In [88]:
# An example agent for LLM
from math import pi, sqrt, cos, sin
from typing import Union, Optional
 
class CircumferenceTool(BaseTool):
    name = "Circumference calculator"
    description = "use this tool when you need to calculate a circumference using the radius of a circle"

    def _run(self, radius: Union[int, float]):
        return float(radius)*2.0*pi

    def _arun(self, radius: int):
        raise NotImplementedError("This tool does not support async")



class PythagorasTool(BaseTool):
    name = "Hypotenuse calculator"
    description = (
    """use this tool when you need to calculate the length of a hypotenuse
    using the given one or two sides of a triangle and/or an angle (in degrees). 
    To use the tool, you must provide at least two of the following parameters 
    ['adjacent_side', 'opposite_side', 'angle']. 
    If there are two lengths provided, then use the length of the first one as adjacent side and the second one as opposite side.
    If there is only one length provided and there is an angle, then use the length as adjacent side and the angle as the angle.
    if there is only one length provided and there is no angle, then tell the user to provide an angle or provide two lengths.
    """
    )
    
    def _run(
        self,
        adjacent_side: Optional[Union[int, float]] = None,
        opposite_side: Optional[Union[int, float]] = None,
        angle: Optional[Union[int, float]] = None
    ):
        return f"Adjacent side is {adjacent_side}, opposite side is {opposite_side}, and angle is {angle}."
        # check for the values we have been given
        if adjacent_side and opposite_side:
            return sqrt(float(adjacent_side)**2 + float(opposite_side)**2)
        elif adjacent_side and angle:
            return adjacent_side / cos(float(angle))
        elif opposite_side and angle:
            return opposite_side / sin(float(angle))
        else:
            return "Could not calculate the hypotenuse of the triangle. Need two or more of `adjacent_side`, `opposite_side`, or `angle`."
    
    def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")


In [89]:

# initialize conversational memory
conversational_memory = ConversationBufferWindowMemory(
        memory_key='chat_history',
        k=5,
        return_messages=True
)

# Agent examples
tools = [CircumferenceTool()]

# initialize agent with tools
agent = initialize_agent(
    agent='chat-conversational-react-description',
    tools=tools,
    llm=llm,
    verbose=True,
    max_iterations=3,
    early_stopping_method='generate',
    memory=conversational_memory
)

sys_msg = """Assistant is a large language model trained by OpenAI.

Assistant is designed to be able to assist with a wide range of tasks, from answering simple questions to providing in-depth explanations and discussions on a wide range of topics. As a language model, Assistant is able to generate human-like text based on the input it receives, allowing it to engage in natural-sounding conversations and provide responses that are coherent and relevant to the topic at hand.

Assistant is constantly learning and improving, and its capabilities are constantly evolving. It is able to process and understand large amounts of text, and can use this knowledge to provide accurate and informative responses to a wide range of questions. Additionally, Assistant is able to generate its own text based on the input it receives, allowing it to engage in discussions and provide explanations and descriptions on a wide range of topics.

Unfortunately, Assistant is terrible at maths. When provided with math questions, no matter how simple, assistant always refers to it's trusty tools and absolutely does NOT try to answer math questions by itself

Overall, Assistant is a powerful system that can help with a wide range of tasks and provide valuable insights and information on a wide range of topics. Whether you need help with a specific question or just want to have a conversation about a particular topic, Assistant is here to assist.
"""
tools = [CircumferenceTool(), PythagorasTool()]

new_prompt = agent.agent.create_prompt(
    system_message=sys_msg,
    tools=tools
)
agent.agent.llm_chain.prompt = new_prompt


# update the agent tools
agent.tools = tools

In [None]:
agent("can you calculate the circumference of a circle that has a radius of 7.81mm")

In [None]:
agent("If I have a triangle with two sides of length 51cm and 34cm, what is the length of the hypotenuse?")

In [None]:
agent("Tell me a poem")

### Deepgram Test


In [93]:
class TranscriptCollector:
    def __init__(self):
        self.reset()

    def reset(self):
        self.transcript_parts = []

    def add_part(self, part):
        self.transcript_parts.append(part)

    def get_full_transcript(self):
        return ' '.join(self.transcript_parts)

transcript_collector = TranscriptCollector()

async def get_transcript(callback):
    transcription_complete = asyncio.Event()  # Event to signal transcription completion

    try:
        # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
        config = DeepgramClientOptions(options={"keepalive": "true"})
        deepgram: DeepgramClient = DeepgramClient("", config)

        dg_connection = deepgram.listen.asynclive.v("1")
        print ("Listening...")

        async def on_message(self, result, **kwargs):
            sentence = result.channel.alternatives[0].transcript
            
            if not result.speech_final:
                transcript_collector.add_part(sentence)
            else:
                # This is the final part of the current sentence
                transcript_collector.add_part(sentence)
                full_sentence = transcript_collector.get_full_transcript()
                # Check if the full_sentence is not empty before printing
                if len(full_sentence.strip()) > 0:
                    full_sentence = full_sentence.strip()
                    print(f"Human: {full_sentence}")
                    callback(full_sentence)  # Call the callback with the full_sentence
                    transcript_collector.reset()
                    transcription_complete.set()  # Signal to stop transcription and exit

        dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)

        options = LiveOptions(
            model="nova-2",
            punctuate=True,
            language="en-US",
            encoding="linear16",
            channels=1,
            sample_rate=16000,
            endpointing=300,
            smart_format=True,
        )

        await dg_connection.start(options)

        # Open a microphone stream on the default input device
        microphone = Microphone(dg_connection.send)
        microphone.start()

        await transcription_complete.wait()  # Wait for the transcription to complete instead of looping indefinitely

        # Wait for the microphone to close
        microphone.finish()

        # Indicate that we've finished
        await dg_connection.finish()

    except Exception as e:
        print(f"Could not open socket: {e}")
        return


In [94]:
class ConversationManager:
    def __init__(self):
        self.transcription_response = ""
        #self.llm = LanguageModelProcessor()

    async def main(self):
        def handle_full_sentence(full_sentence):
            self.transcription_response = full_sentence

        # Loop indefinitely until "goodbye" is detected
        while True:
            await get_transcript(handle_full_sentence)
            
            # Check for "goodbye" to exit the loop
            if "goodbye" in self.transcription_response.lower():
                break
            
            print(self.transcription_response)
            # llm_response = self.llm.process(self.transcription_response)

            # tts = TextToSpeech()
            # tts.speak(llm_response)

            # Reset transcription_response for the next loop iteration
            self.transcription_response = ""



In [98]:
manager = ConversationManager()
manager.main()

<coroutine object ConversationManager.main at 0x146131560>