In [1]:
!pip install streamlit

Collecting streamlit
  Downloading streamlit-1.40.2-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting altair<6,>=4.0 (from streamlit)
  Downloading altair-5.5.0-py3-none-any.whl.metadata (11 kB)
Collecting blinker<2,>=1.0.0 (from streamlit)
  Downloading blinker-1.9.0-py3-none-any.whl.metadata (1.6 kB)
Collecting cachetools<6,>=4.0 (from streamlit)
  Downloading cachetools-5.5.0-py3-none-any.whl.metadata (5.3 kB)
Collecting click<9,>=7.0 (from streamlit)
  Downloading click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting pyarrow>=7.0 (from streamlit)
  Downloading pyarrow-18.1.0-cp312-cp312-win_amd64.whl.metadata (3.4 kB)
Collecting rich<14,>=10.14.0 (from streamlit)
  Downloading rich-13.9.4-py3-none-any.whl.metadata (18 kB)
Collecting toml<2,>=0.10.1 (from streamlit)
  Downloading toml-0.10.2-py2.py3-none-any.whl.metadata (7.1 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-win_amd64.whl.metadata (44 kB)
Collecting gitpython!=3.1.19,<4,>=3

In [1]:
folder_path=r"C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents"
text="meerkat"
image_path=r"C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\meerkats-3.jpeg"

# Loader

In [2]:
import os
import base64
import speech_recognition as sr
from moviepy import VideoFileClip
from langchain_community.document_loaders import TextLoader, PyPDFLoader, UnstructuredPowerPointLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pprint import pprint

class DataLoader:
    def __init__(self, folder_path, chunk_size=100, chunk_overlap=10):
        self.folder_path = folder_path
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def process_text_file(self, file_path):
        loader = TextLoader(file_path, encoding="utf-8")
        documents = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
        chunks = text_splitter.split_documents(documents)
        return [
            {
                "chunk_no": int(idx),
                "text": chunk.page_content,
                "path": file_path,
                "file_type": "text",
                "media_type": "text"
            }
            for idx, chunk in enumerate(chunks)
        ]

    def process_image_file(self, file_path):
        try:
            with open(file_path, 'rb') as image_file:
                image_base64 = base64.b64encode(image_file.read()).decode('utf-8')
            return [{
                "path": file_path,
                "file_type": "image",
                "image": image_base64,
                "media_type": "image"
            }]
        except Exception as e:
            print(f"Error processing image file {file_path}: {e}")
            return None

    def process_audio_file(self, file_path):
        recognizer = sr.Recognizer()
        try:
            with sr.AudioFile(file_path) as source:
                audio_data = recognizer.record(source)
                text = recognizer.recognize_google(audio_data)
            chunks = [text[i:i + self.chunk_size] for i in range(0, len(text), self.chunk_size)]

            return [
                {
                    "chunk_no": int(idx),
                    "text": chunk,
                    "path": file_path,
                    "file_type": "audio",
                    "media_type": "text"
                }
                for idx, chunk in enumerate(chunks)
            ]
        except Exception as e:
            print(f"Error processing audio file {file_path}: {e}")
            return None
    
    def process_video_file(self, file_path, frame_interval=5, compression_quality=50, resize_factor=0.7):
        """
        Processes a video file to extract audio and frames.
        
        Parameters:
        - file_path (str): Path to the video file.
        - frame_interval (int): Interval in seconds between frames to extract.
        - compression_quality (int): Quality of the compressed image (1-100).
        - resize_factor (float): Scaling factor for resizing images (0 < resize_factor <= 1).

        Returns:
        - list: A list of dictionaries containing audio data and frame data.
        """
        video_data = []

        # Ensure frame_interval is greater than 0 to avoid division by zero
        if frame_interval <= 0:
            print(f"Error: frame_interval must be greater than 0. Received frame_interval={frame_interval}.")
            return video_data

        try:
            # Extracting audio
            video = VideoFileClip(file_path)
            audio_path = file_path.replace(os.path.splitext(file_path)[1], ".wav")
            
            try:
                video.audio.write_audiofile(audio_path)
                audio_data = self.process_audio_file(audio_path)
                if audio_data:
                    video_data.extend(audio_data)
            except Exception as e:
                print(f"Error processing audio file {audio_path}: {e}")

            # Extracting frames
            for i, frame in enumerate(video.iter_frames(fps=1.0 / frame_interval)):
                # Convert frame (numpy array) to PIL image
                pil_image = Image.fromarray(frame)
                
                # Resize the image if resize_factor is less than 1
                if resize_factor < 1.0:
                    new_size = (int(pil_image.width * resize_factor), int(pil_image.height * resize_factor))
                    pil_image = pil_image.resize(new_size, Image.LANCZOS)
                
                # Compress the image
                buffered = io.BytesIO()
                pil_image.save(buffered, format="JPEG", quality=compression_quality)
                
                # Convert to base64
                frame_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
                
                # Add frame data to video_data list
                video_data.append({
                    "chunk_no": i,
                    "image": frame_base64,
                    "path": file_path,
                    "file_type": "video_frame",
                    "media_type": "image"
                })

        except Exception as e:
            print(f"Error processing video file {file_path}: {e}")
        
        return video_data

    
    def process_file(self, file_path):
        extension = os.path.splitext(file_path)[1].lower()
        if extension == '.txt':
            return self.process_text_file(file_path)
        elif extension in ['.wav', '.mp3']:
            return self.process_audio_file(file_path)
        elif extension in ['.png', '.jpg', '.jpeg', '.bmp', '.gif', 'jfif']:
            return self.process_image_file(file_path)
        elif extension in ['.mp4', '.avi', '.mov', '.wmv']:
            return self.process_video_file(file_path)
        else:
            return None

    def load_data(self):
        all_data = []
        for root, dirs, files in os.walk(self.folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                processed_data = self.process_file(file_path)
                if processed_data is not None:
                    all_data.extend(processed_data)
        return all_data


# Database

In [3]:
# from loader import DataLoader

import weaviate
from weaviate.classes.config import Configure, Multi2VecField
from weaviate.classes.query import Filter
import base64
from hashlib import md5

class DatabaseClient:
    
    def __init__(self, folder_path):
        self.__folder_path = folder_path
        self.__create_client()
        if self.__generate_collection():
            self.loader = DataLoader(folder_path, chunk_size=300, chunk_overlap=50)
            self.__data_ingestion(folder_path)
   
    def __create_client(self):
        self.client = weaviate.connect_to_local()
    
    def __get_hashed_path(self):
        return self.__folder_path.split("\\")[-1]  + ''.join(filter(str.isalpha, md5(self.__folder_path.encode()).hexdigest()))

    def __generate_collection(self):
        collection_name = self.__get_hashed_path()
        if self.client.collections.exists(collection_name):
            # self.collection = self.client.collections.get(self.__get_hashed_path())
            # return False
            self.client.collections.delete(collection_name)

        self.collection = self.client.collections.create(
            name = self.__get_hashed_path(),
            vectorizer_config=Configure.Vectorizer.multi2vec_clip(
                    image_fields=[
                        Multi2VecField(
                            name="image"
                        )
                    ],
                    text_fields=[
                        Multi2VecField(
                            name="text"
                        ),
                        Multi2VecField(
                            name="path"
                        )
                    ]
            )
        )
        return True
    
    def __data_ingestion(self, folder_path):
        object_list = self.loader.load_data()
        with self.collection.batch.dynamic() as batch:
            for object in object_list:
                batch.add_object(
                    properties=object
                )
    
    def search_with_text(self, query : str, search_for="all", limit=5):
        if search_for == "all":
            response =  self.collection.query.near_text(
                query=query,
                limit=limit
            )
        else:
            response = self.collection.query.near_text(
                query=query,
                filters=Filter.by_property("media_type").equal(search_for),
                limit=limit
            )
        return [object.properties for object in response.objects]
    
    def __to_base64(self, path):
            with open(path, 'rb') as file:
                return base64.b64encode(file.read()).decode('utf-8')
            
    def search_with_image(self, image_path, search_for = 'all', limit=5):
        if search_for == "all":
            response = self.collection.query.near_image(
                near_image=self.__to_base64(image_path),
                limit=limit
            )
        else:
            response = self.collection.query.near_image(
                near_image=self.__to_base64(image_path),
                filters=Filter.by_property("media_type").equal(search_for),
                limit=limit
                )
        return [object.properties for object in response.objects]

    def list_collections(self):
        return self.client.collections.list_all()
    
    def delete_collections(self, name):
        return self.client.collections.delete(name)
    
    def close_connection(self):
        self.client.close()
            
    def __repr__(self):
        return f"""Folder: {self.__folder_path}"""


# Retriever

In [38]:
# from database import DatabaseClient

class RetrieverClient:
    
    def __init__(self, folder_path):
        self.__database = DatabaseClient(folder_path)

    def __retrieve(self, text=None, image_path=None, search_for="all", limit=5):
        if image_path == None and text == None:
            return []
        elif image_path != None and text != None:
            return self.__database.search_with_image(image_path, search_for=search_for, limit=limit) + self.__database.search_with_text(text, search_for=search_for, limit=limit)
        elif text == None:
            return self.__database.search_with_image(image_path, search_for=search_for, limit=limit)
        elif image_path == None:
            return self.__database.search_with_text(text, search_for=search_for, limit=limit)
    
    def __organize_by_media_type(self, properties):
        response =  {
            'text': [],
            'image': []
        }
        for property in properties:
            if property["media_type"] == "text" and property not in response['text']:
                response['text'].append(property)
            if property["media_type"] == "image" and property not in response['image']:
                response["image"].append(property)
        return response
    
    def search(self, text=None, image_path=None, search_for="all", limit=5):
        """
            Inputs: text and/or images
            Options: search_for [all, text, image (name it media in frontend)], 
                    limit
            Output: {"text":[],
                    "image":[]}   
                it's a dictionary with "text" and "images" fields which are lists
        """
        return self.__organize_by_media_type(self.__retrieve(text, image_path, search_for=search_for, limit=limit))
        
    def close_database_connection(self):
        self.__database.close_connection()
        

## Examples (for Search)

In [12]:
retriver = RetrieverClient(folder_path=folder_path)

Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\badminton.wav: 
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\bird_audio.wav: 
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\car_audio.wav: 
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\classroom.wav: 'NoneType' object has no attribute 'write_audiofile'
Error processing video file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\classroom.mp4: name 'Image' is not defined
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\dog_audio.wav: 
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\lion_roar.wav: 
MoviePy - Writing audio in C:\Users\gpu.arl\Desktop\xBit\Localinsi

                                                                    

MoviePy - Done.
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\sample-video.wav: 
Error processing video file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\sample-video.mp4: name 'Image' is not defined
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\sample-video.wav: 
Error processing audio file C:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\documents\schoolbell.wav: 


  self.proc = None
  self.proc = None


In [13]:
search_result = retriver.search(text=text)
search_result

{'text': [{'text': 'One of the most fascinating aspects of cats is their communication. Cats use a variety of vocalizations, body language, and even scent markings to convey their feelings. A purring cat often indicates contentment, though it can also be a sign of nervousness or discomfort. Their tail movements, ear',
   'chunk_no': 4.0,
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\cats.txt',
   'file_type': 'text',
   'image': None,
   'media_type': 'text'},
  {'text': 'In addition to their captivating behavior, cats have a rich history with humans. They were revered in ancient Egypt, where they were considered sacred and often depicted in art and statues. Today, cats continue to play important roles in various cultures and households, offering companionship and,',
   'chunk_no': 7.0,
   'file_type': 'text',
   'image': None,
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\cats.txt',

In [14]:
search_result = retriver.search(image_path=image_path)
search_result

{'text': [],
 'image': [{'text': None,
   'chunk_no': None,
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\meerkats-3.jpeg',
   'file_type': 'image',
   'image': '/9j/4AAQSkZJRgABAQAAAQABAAD/2wCEAAkGBxMTEhUSEhMVFRUXFhcWFRgVGBUWFhUYFxYYFxYXFRYYHSggGBolGxUVITEhJSkrMC4uFx8zODMsNygtLisBCgoKDg0OGhAQGC0lHyUtLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLf/AABEIALcBEwMBIgACEQEDEQH/xAAcAAACAwEBAQEAAAAAAAAAAAADBAIFBgABBwj/xAA6EAABAwIEAwYFAwMDBQEAAAABAAIRAyEEEjFBBVFhBiJxgZGhEzLB0fBCseEUI/EHUoIVYnKy0hb/xAAYAQADAQEAAAAAAAAAAAAAAAAAAQIDBP/EAB4RAQEBAAMBAQADAAAAAAAAAAABEQIhMRJBAzJR/9oADAMBAAIRAxEAPwDJ0mImVODDrw0FyY6CwCMxeigU1Tw6VgByILwVZiihnDSpNW5Sp06SfGGTFLBoBBtBEbhuitKeDRhhkaMVLKCYZhyn24dHbQRp4qzh0vWw6vPgoT8MgsUHwURlBW7cFKMzBQnhKynQTbMMeSsG0GjW3iITdNjU8NUjBlROBKvxRC9+AEgzp4chP4f0Wm/p14cKmGTfgOiXqcPWvdg+iE/CdEFjHOwBQnYLotc/CdEu/CpljLnBob8MVqDglF2B6JDGTNIrgwrR1OH9EF2B6IwYphKJTlWRwi5mFTw4TDV4WKzbhVF+GSw1bC5NuoLkYDQorhQToYiNpo0EKeFumG

In [15]:
search_result = retriver.search(text=text, image_path=image_path)
search_result

{'text': [{'text': 'One of the most fascinating aspects of cats is their communication. Cats use a variety of vocalizations, body language, and even scent markings to convey their feelings. A purring cat often indicates contentment, though it can also be a sign of nervousness or discomfort. Their tail movements, ear',
   'chunk_no': 4.0,
   'image': None,
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\cats.txt',
   'file_type': 'text',
   'media_type': 'text'},
  {'text': 'In addition to their captivating behavior, cats have a rich history with humans. They were revered in ancient Egypt, where they were considered sacred and often depicted in art and statues. Today, cats continue to play important roles in various cultures and households, offering companionship and,',
   'media_type': 'text',
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\cats.txt',
   'file_type': 'text',
   'image': 

In [18]:
search_result = retriver.search(text=text, image_path=image_path, search_for="text")
search_result

{'text': [{'text': 'or discomfort. Their tail movements, ear positions, and eye behavior provide further clues to their mood.',
   'chunk_no': 5.0,
   'file_type': 'text',
   'image': None,
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\cats.txt',
   'media_type': 'text'},
  {'text': 'and households, offering companionship and, at times, a sense of mystery.',
   'chunk_no': 8.0,
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\cats.txt',
   'file_type': 'text',
   'image': None,
   'media_type': 'text'},
  {'text': 'was walking hand in hand with Dinah, and saying to her very earnestly, “Now, Dinah, tell me the truth: did you ever eat a bat?” when suddenly, thump! thump! down she came upon a heap of sticks and dry leaves, and the fall was over.',
   'media_type': 'text',
   'path': 'C:\\Users\\gpu.arl\\Desktop\\xBit\\Localinsight-onush\\Localinsight-onush\\documents\\alice.txt',
   'file_

In [16]:
search_result = retriver.search(text=text, search_for="image", limit=2)
search_result

{'text': [],
 'image': [{'text': None,
   'chunk_no': None,
   'image': '/9j/4AAQSkZJRgABAQAASABIAAD/4QCMRXhpZgAATU0AKgAAAAgABQESAAMAAAABAAEAAAEaAAUAAAABAAAASgEbAAUAAAABAAAAUgEoAAMAAAABAAIAAIdpAAQAAAABAAAAWgAAAAAAAABIAAAAAQAAAEgAAAABAAOgAQADAAAAAQABAACgAgAEAAAAAQAAAyCgAwAEAAAAAQAAAo0AAAAA/+0AOFBob3Rvc2hvcCAzLjAAOEJJTQQEAAAAAAAAOEJJTQQlAAAAAAAQ1B2M2Y8AsgTpgAmY7PhCfv/CABEIAo0DIAMBIgACEQEDEQH/xAAfAAABBQEBAQEBAQAAAAAAAAADAgQBBQAGBwgJCgv/xADDEAABAwMCBAMEBgQHBgQIBnMBAgADEQQSIQUxEyIQBkFRMhRhcSMHgSCRQhWhUjOxJGIwFsFy0UOSNIII4VNAJWMXNfCTc6JQRLKD8SZUNmSUdMJg0oSjGHDiJ0U3ZbNVdaSVw4Xy00Z2gONHVma0CQoZGigpKjg5OkhJSldYWVpnaGlqd3h5eoaHiImKkJaXmJmaoKWmp6ipqrC1tre4ubrAxMXGx8jJytDU1dbX2Nna4OTl5ufo6erz9PX29/j5+v/EAB8BAAMBAQEBAQEBAQEAAAAAAAECAAMEBQYHCAkKC//EAMMRAAICAQMDAwIDBQIFAgQEhwEAAhEDEBIhBCAxQRMFMCIyURRABjMjYUIVcVI0gVAkkaFDsRYHYjVT8NElYMFE4XLxF4JjNnAmRVSSJ6LSCAkKGBkaKCkqNzg5OkZHSElKVVZXWFlaZGVmZ2hpanN0dXZ3eHl6gIOEhYaHiImKkJOUlZaXmJmaoKOkpaanqKmqsLKztLW2t7i5usDCw8TFxsfIycrQ09TV1tfY2drg4uPk5ebn6Onq8vP09fb

# Offline Chat

In [33]:
import ollama

class Chat:
    
    def __init__(self, model="llama3.2-vision:11b"):
        self.__messages = []
        self.__model = model
    
    def __process_audio_file(self, file_path):
        recognizer = sr.Recognizer()
        try:
            with sr.AudioFile(file_path) as source:
                audio_data = recognizer.record(source)
                return recognizer.recognize_google(audio_data)

        except Exception as e:
            print(f"Error processing audio file {file_path}: {e}")
            return None
        
    def __append_user_message(self, user_query, user_image_path, user_audio_path):
        audio_content = ""
        image_paths = []
        if user_query is None:
            user_query = "Describe."
        if user_image_path:
            image_paths.append(user_image_path)
        if user_audio_path:
            audio_content = self.__process_audio_file(user_audio_path)
   
        query_with_context = f"""Given Context: {audio_content}
                                 Query: {user_query}"""
        self.__messages.append({"role": "user", "content": query_with_context, "images": image_paths})

    def append_assistant_message(self, content):
        self.__messages.append({"role": "assistant", "content" : content})  
    
    def get_assistant_response(self, user_text=None, user_image_path=None, user_audio_path=None):
        self.__append_user_message(user_text, user_image_path, user_audio_path)
        return ollama.chat(self.__model, self.__messages, options={"temperature":0})
    
    def get_history(self):
        return self.__messages


## Examples (for Chat)

In [49]:
chat_client = Chat()

In [50]:
response = chat_client.get_assistant_response(user_text="What are meerkats?")
assistant_respone = response.message.content
chat_client.append_assistant_message(assistant_respone)
assistant_respone

'Meerkats are small, social mammals that belong to the mongoose family (Herpestidae). They are native to southern Africa and are known for their distinctive appearance and behavior.\n\nHere are some interesting facts about meerkats:\n\n1. **Physical Characteristics**: Meerkats have a slender body, typically 30-40 cm (12-16 inches) in length, with a long, thin tail and short legs. They weigh between 0.5-2 kg (1.1-4.4 pounds). Their fur is usually brown or grayish-brown, with a white underside.\n2. **Social Structure**: Meerkats are highly social animals that live in groups called "mobs" or "colonies." These groups typically consist of 2-50 individuals, led by a dominant female meerkat.\n3. **Diet**: Meerkats are carnivores and primarily feed on small insects, like ants, beetles, and scorpions. They also eat lizards, snakes, and other small animals.\n4. **Behavior**: Meerkats are known for their upright stance, where they stand on their hind legs to survey their surroundings. This behavi

In [51]:
response = chat_client.get_assistant_response(user_image_path=image_path)
assistant_respone = response.message.content
chat_client.append_assistant_message(assistant_respone)
assistant_respone

'The image depicts a group of meerkats, small mammals belonging to the mongoose family (Herpestidae). They are native to southern Africa and are known for their distinctive appearance and social behavior.\n\n**Physical Characteristics**\n\n* The meerkats have slender bodies, typically 30-40 cm (12-16 inches) in length.\n* Their fur is usually brown or grayish-brown, with a white underside.\n* They have long, thin tails and short legs.\n\n**Social Structure**\n\n* Meerkats are highly social animals that live in groups called "mobs" or "colonies."\n* These groups typically consist of 2-50 individuals, led by a dominant female meerkat.\n\n**Behavior**\n\n* The meerkats are standing upright on their hind legs, surveying their surroundings.\n* This behavior is often referred to as "sentinel duty," where they watch out for predators and alert the rest of the group.\n\n**Habitat**\n\n* Meerkats inhabit arid regions in southern Africa, including South Africa, Namibia, Botswana, and Zimbabwe.\n

In [52]:
response = chat_client.get_assistant_response(user_text="how many creatures are there in the image?", user_image_path=image_path)
assistant_respone = response.message.content
chat_client.append_assistant_message(assistant_respone)
assistant_respone

'There appears to be three meerkats in this image.'

In [53]:
response = chat_client.get_assistant_response(user_text="are there any cats in the imag?", user_image_path=image_path)
assistant_respone = response.message.content
chat_client.append_assistant_message(assistant_respone)
assistant_respone

'No, there do not appear to be any cats in this image. The animals depicted are meerkats, which are a type of mongoose and not related to domestic or wild cats.'

In [54]:
chat_client.get_history()

[{'role': 'user',
  'content': 'Given Context: \n                                 Query: What are meerkats?',
  'images': []},
 {'role': 'assistant',
  'content': 'Meerkats are small, social mammals that belong to the mongoose family (Herpestidae). They are native to southern Africa and are known for their distinctive appearance and behavior.\n\nHere are some interesting facts about meerkats:\n\n1. **Physical Characteristics**: Meerkats have a slender body, typically 30-40 cm (12-16 inches) in length, with a long, thin tail and short legs. They weigh between 0.5-2 kg (1.1-4.4 pounds). Their fur is usually brown or grayish-brown, with a white underside.\n2. **Social Structure**: Meerkats are highly social animals that live in groups called "mobs" or "colonies." These groups typically consist of 2-50 individuals, led by a dominant female meerkat.\n3. **Diet**: Meerkats are carnivores and primarily feed on small insects, like ants, beetles, and scorpions. They also eat lizards, snakes, an

# Demo

## Search Functionality

`
    retriever.search(self, text=None, image_path=None, search_for="all", limit=5)
`

```
Inputs: text and/or images
Options: search_for [all, text, image (name it media in frontend)], 
        limit
Output: {"text":[],
        "images":[]}    
```

## Chat Functionality


`
    chat_client.get_assistant_response(self, user_text=None, user_image_path=None, user_audio_path=None)
`

```
Inputs: text, images, audio
Outputs: text response
```


<br>

`    chat_client.append_assistant_message(self, content)
`

```
Must call after `get_assistant_response()` to maintain chat history.

<br>


`    chat_client.get_history(self):
`

```
To retriever chat history (but model doesn't remember previous conversations)

# Frontend

In [2]:
# Imports 
import os
import json
import hashlib
import base64
import time
import streamlit as st
from chat import Chat
from retriever import RetrieverClient 

# Session State Initializations
if 'folder_path' not in st.session_state:
    st.session_state['folder_path'] = None
if 'indexing_done' not in st.session_state:
    st.session_state['indexing_done'] = False
if 'main_mode' not in st.session_state:
    st.session_state['main_mode'] = 'Chat'
if 'chat_mode' not in st.session_state:
    st.session_state['chat_mode'] = 'offline'
if 'chat_messages' not in st.session_state:
    st.session_state['chat_messages'] = []
if 'search_messages' not in st.session_state:
    st.session_state['search_messages'] = []
if 'logged_in' not in st.session_state:
    st.session_state['logged_in'] = False
if 'change_folder_mode' not in st.session_state:
    st.session_state['change_folder_mode'] = False
if 'retriever' not in st.session_state:
    st.session_state['retriever'] = None
if 'chat' not in st.session_state:
    st.session_state['chat'] = None

CREDENTIALS_FILE = "user_credentials.json"

# Password Hash Function
def hash_password(password):
    return hashlib.sha256(password.encode()).hexdigest()

# Parse Json File to load credentials
def load_credentials():
    if os.path.exists(CREDENTIALS_FILE):
        with open(CREDENTIALS_FILE, 'r') as file:
            try:
                return json.load(file)
            except json.JSONDecodeError:
                return {}
    return {}

# Save Provided credentials back into the Json file
def save_credentials(credentials):
    with open(CREDENTIALS_FILE, 'w') as file:
        json.dump(credentials, file)

# Login Function
def login(username, password):
    credentials = load_credentials()
    hashed_password = hash_password(password)
    if username in credentials and credentials[username]['password'] == hashed_password:
        st.session_state['logged_in'] = True
        st.session_state['username'] = username
        st.session_state['folder_path'] = credentials[username]['folder_path']
        initialize_clients(st.session_state['folder_path'])
        return True
    return False

# Signup Function
def signup(username, password, folder_path):
    credentials = load_credentials()
    if username not in credentials:
        hashed_password = hash_password(password)
        credentials[username] = {'password': hashed_password, 'folder_path': folder_path}
        save_credentials(credentials)
        st.session_state['logged_in'] = True
        st.session_state['username'] = username
        st.session_state['folder_path'] = folder_path
        initialize_clients(folder_path)
        return True
    return False

# Logout Function
def logout():
    st.session_state.clear()
    st.session_state['logged_in'] = False

# UI for Login/Signup
def login_signup_interface():
    st.title("LocalInsight - Document Search and Chat")
    st.write("Welcome to LocalInsight, your personal document assistant!")  
    login_tab, signup_tab = st.tabs(["Login", "Signup"])   
    with login_tab:
        username = st.text_input("Username", key="login_username")
        password = st.text_input("Password", type="password", key="login_password")
        if st.button("Login", key="login_button"):
            if login(username, password):
                st.success("Logged in successfully!")
                st.rerun()
            else:
                st.error("Invalid username or password.")
    with signup_tab:
        new_username = st.text_input("New Username", key="signup_username")
        new_password = st.text_input("New Password", type="password", key="signup_password")
        folder_path = st.text_input("Initial Folder Path", key="signup_folder_path")
        if st.button("Signup", key="signup_button"):
            if signup(new_username, new_password, folder_path):
                st.success("Signed up and logged in successfully!")
                st.session_state['indexing_done'] = False
                st.rerun()
            else:
                st.error("Username already exists.")

# Lets user change document directory
def set_folder_path():
    st.header("Change Folder Path")
    folder_path = st.text_input("Enter the folder path to index:", key="folder_path_input")
    if folder_path:
        if os.path.exists(folder_path) and os.path.isdir(folder_path):
            st.session_state['folder_path'] = folder_path
            update_folder_path(st.session_state['username'], folder_path)
            initialize_clients(folder_path)
            st.session_state['indexing_done'] = True
            st.session_state['change_folder_mode'] = False
            st.success("Folder path updated successfully!")
            st.rerun()
        else:
            st.error("Invalid folder path. Please enter a valid path.")
    if st.button("Back"):
        st.session_state['change_folder_mode'] = False
        st.rerun()

# Overwrite changed folder path content into the json file
def update_folder_path(username, folder_path):
    credentials = load_credentials()
    if username in credentials:
        credentials[username]['folder_path'] = folder_path
        save_credentials(credentials)

# Initialize Retriever & Chat Clients
def initialize_clients(folder_path):
    st.session_state['retriever'] = RetrieverClient(folder_path)
    st.session_state['chat'] = Chat()


def handle_chat():
    # Initialize chat client if not already in session state
    if 'chat' not in st.session_state:
        st.session_state['chat'] = Chat()
    if 'chat_messages' not in st.session_state:
        st.session_state['chat_messages'] = []

    # Display past chat messages
    for message in st.session_state['chat_messages']:
        if message['role'] == 'user':
            with st.chat_message("user"):
                st.write(message['content'])
        else:
            with st.chat_message("assistant"):
                st.write(message['content'])

    # Input fields for user message, audio, and image
    user_message = st.chat_input("Type your message here:")
    user_audio = st.file_uploader("Or upload an audio file:", type=["wav", "mp3", "m4a"], key="audio_uploader")
    user_image = st.file_uploader("Or upload an image:", type=["png", "jpg", "jpeg"], key="image_uploader")

    # When a message or file is uploaded
    if user_message or user_audio or user_image:
        user_query = user_message if user_message else None

        user_audio_path = None
        user_image_path = None

        # Save uploaded image or audio to temporary path
        if user_image:
            user_image_path = f"temp_uploaded_image.{user_image.name.split('.')[-1]}"
            with open(user_image_path, "wb") as f:
                f.write(user_image.getbuffer())

        if user_audio:
            user_audio_path = f"temp_uploaded_audio.{user_audio.name.split('.')[-1]}"
            with open(user_audio_path, "wb") as f:
                f.write(user_audio.getbuffer())

        chat_client = st.session_state['chat']

        # Get response from assistant and append user message to chat history
        response = chat_client.get_assistant_response(
            user_text=user_query,
            user_image_path=user_image_path,
            user_audio_path=user_audio_path
        )

        # Extract assistant's response content
        assistant_response = response.message.content

        # Append user and assistant messages to chat history
        st.session_state['chat_messages'].append({"role": "user", "content": user_query or ""})
        st.session_state['chat_messages'].append({"role": "assistant", "content": assistant_response})

        # Display assistant's response
        with st.chat_message("assistant"):
            st.write(assistant_response)

        # Clean up temporary files if uploaded
        if user_image_path and os.path.exists(user_image_path):
            os.remove(user_image_path)
        if user_audio_path and os.path.exists(user_audio_path):
            os.remove(user_audio_path)

    # Button to view chat history
    if st.button("View Chat History"):
        st.write("### Chat History:")
        for message in st.session_state['chat_messages']:
            role = "User" if message['role'] == "user" else "Assistant"
            st.write(f"**{role}:** {message['content']}")


# File Open Function
def open_file(path):
    try:
        if not os.path.exists(path):
            st.error(f"File not found: {path}")
            return
        # Open File in different applications
        if os.name == 'nt':  # For Windows
            os.startfile(path)
        elif os.name == 'posix':  # For macOS and Linux
            os.system(f'open "{path}"' if 'darwin' in os.uname().sysname.lower() else f'xdg-open "{path}"')
        else:
            st.warning("Your operating system may not support this operation.")
    except Exception as e:
        st.error(f"Failed to open the file: {e}")

# Search Interaction Handler Function
import os
import streamlit as st

def handle_search():
    # Search input fields
    user_query = st.text_input("Type your search query here:", key="search_input")
    
    # Advanced search options
    if st.button("Advanced Search"):
        search_option = st.radio("Choose a search type:", ("Search Image", "Search Text"))
        user_query = st.text_input("Enter your prompt:") if search_option == "Search Text" else None
        uploaded_image = st.file_uploader("Upload an image:", type=["png", "jpg", "jpeg"], key="image_uploader") if search_option == "Search Image" else None
    else:
        uploaded_image = st.file_uploader("Or upload an image to search:", type=["png", "jpg", "jpeg"], key="image_uploader")

    # Toggle button for limiting images
    limit_images = st.slider("Limit the number of images to view:", min_value=1, max_value=20, value=5, key="limit_slider")

    # Perform search when the search button is clicked
    if st.button("Search", key="search_send"):
        with st.spinner("Searching..."):
            if user_query or uploaded_image:
                image_path = None
                if uploaded_image:
                    image_path = f"temp_uploaded_image.{uploaded_image.name.split('.')[-1]}"
                    with open(image_path, "wb") as f:
                        f.write(uploaded_image.getbuffer())

                # Call search function
                search_results = st.session_state['retriever'].search(
                    text=user_query, 
                    image_path=image_path, 
                    search_for="all", 
                    limit=limit_images
                )

                st.session_state['search_results'] = search_results

                # Clean up temporary image
                if uploaded_image and os.path.exists(image_path):
                    os.remove(image_path)

    # Display search results
    if 'search_results' in st.session_state:
        search_results = st.session_state['search_results']
        st.subheader("Search Results:")
        file_paths = []
        image_paths = []
        video_paths = []

        # Organize results
        for result in search_results['text']:
            if result['path'] not in file_paths:
                file_paths.append(result['path'])
        for result in search_results['image']:
            if result['file_type'] == 'image' and result['path'] not in image_paths:
                image_paths.append(result['path'])
            if result['file_type'] == 'video_frame' and result['path'] not in video_paths:
                video_paths.append(result['path'])

        # Display text files
        for path in file_paths:
            st.write(f"**Found:** {os.path.basename(path)}")
            if st.button(f"Open: {os.path.basename(path)}", key=f"open_{path}"):
                open_file(path)

        # Display images
        for image_path in image_paths[:limit_images]:
            st.image(image_path, caption=os.path.basename(image_path), use_column_width=True)

        # Display videos
        for video_path in video_paths:
            st.video(video_path)

def front_page():
    st.markdown("<h1 style='text-align: center;'>Welcome to LocalInsight!</h1>", unsafe_allow_html=True)

    # Path for the local video
    local_video_path = "LOCALINSIGHT (2).mp4"
    
    # Check if the video file exists
    if os.path.exists(local_video_path):
        with open(local_video_path, "rb") as file:
            video_bytes = file.read()
        
        # Video styling (fixed size, centered, and no overflow)
        video_html = f"""
        <style>
        .video-container {{
            width: 70%;
            height: 400px;  /* Fixed height for the video */
            margin: 20px auto;
            padding: 10px;
            background-color: #000;
            border-radius: 10px;
            box-shadow: 0px 4px 20px rgba(0, 0, 0, 0.3);
            text-align: center;
        }}
        .video-container:hover {{
            transform: scale(1.05);
            box-shadow: 0px 8px 40px rgba(0, 0, 0, 0.6);
        }}
        
        video {{
            width: 100%;
            height: 100%;  /* Ensure the video takes up all available space */
            object-fit: cover;
            border-radius: 10px;
        }}

        .stButton > button {{
            display: block;
            margin: 20px auto;
            padding: 12px 30px;
            border-radius: 10px;
            border: none;
            font-size: 16px;
            font-weight: bold;
            cursor: pointer;
            transition: background-color 0.3s ease;
        }}

        .stButton > button:hover {{
            background-color: #FFA500;
        }}
        </style>

        <div class="video-container">
            <video autoplay muted loop playsinline>
                <source src="data:video/mp4;base64,{base64.b64encode(video_bytes).decode()}" type="video/mp4">
                Your browser does not support the video tag.
            </video>
        </div>
        """
        
        # Display the video HTML
        st.markdown(video_html, unsafe_allow_html=True)
    else:
        st.error("Video file not found. Please check the path.")
    
    # Handle session timing
    if 'start_time' not in st.session_state:
        st.session_state['start_time'] = time.time()
    elapsed_time = time.time() - st.session_state['start_time']
    
    if elapsed_time > 5:  
        st.session_state['viewed_front_page'] = True
        st.rerun()

    # Button placed directly below the video (Centered)
    col1, col2, col3 = st.columns([1, 2, 1])  # Center the button
    with col2:
        if st.button("Proceed to Login/Signup"):
            st.session_state['viewed_front_page'] = True
            st.rerun()


# UI for main page
def main_interface():
    st.sidebar.title("Options")
    st.sidebar.button("Change Folder Path", on_click=lambda: st.session_state.update({'change_folder_mode': True}))
    st.sidebar.button("Logout", on_click=logout)
    st.session_state['main_mode'] = st.sidebar.radio("Select Mode", options=["Search", "Chat"])
    st.title("LocalInsight")
    st.write(f"Current Folder: {st.session_state['folder_path']}")
    if st.session_state['main_mode'] == 'Search':
        st.header("Search Mode")
        handle_search()
    else:
        st.header("Chat Mode")
        handle_chat()

def main():
    st.set_page_config(page_title="LocalInsight", page_icon="🔍", layout="wide")
    if 'viewed_front_page' not in st.session_state:
        st.session_state['viewed_front_page'] = False
    if not st.session_state['viewed_front_page']:
        front_page()
    elif not st.session_state['logged_in']:
        login_signup_interface()
    elif st.session_state['change_folder_mode']:
        set_folder_path()
    else:
        main_interface()

# Running the app
if __name__ == "__main__":
    main()


2024-12-07 07:43:29.277 
  command:

    streamlit run c:\Users\gpu.arl\Desktop\xBit\Localinsight-onush\Localinsight-onush\env\Lib\site-packages\ipykernel_launcher.py [ARGUMENTS]
