<a href="https://colab.research.google.com/github/meetechno157-create/agri_chatbot/blob/main/tiny_llama_agricultural.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install torch transformers sentence-transformers faiss-cpu numpy


Collecting faiss-cpu
  Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (31.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m46.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.12.0


In [None]:
import torch
import numpy as np
import faiss
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer
import os
import json
from typing import List, Dict, Tuple
import re


In [None]:
class TinyLlamaRAGChatbot:
    def __init__(self,
                 model_name="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
                 embedding_model="sentence-transformers/all-MiniLM-L6-v2",
                 max_context_length=2048,
                 chunk_size=500,
                 chunk_overlap=50):

        self.model_name = model_name
        self.max_context_length = max_context_length
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.chat_history = []

        print("Loading TinyLlama model...")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            device_map="auto" if torch.cuda.is_available() else None
        )
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        print("Loading embedding model...")
        self.embedding_model = SentenceTransformer(embedding_model)

        self.index = None
        self.chunks = []
        self.chunk_metadata = []
        print("RAG Chatbot initialized successfully!")

    def count_tokens(self, text: str) -> int:
        return len(self.tokenizer.encode(text))

    def load_text_files(self, file_paths: List[str]) -> List[str]:
        documents = []
        for file_path in file_paths:
            try:
                with open(file_path, 'r', encoding='utf-8') as file:
                    content = file.read()
                    documents.append(content)
                    print(f"Loaded {file_path}: {len(content)} characters")
            except Exception as e:
                print(f"Error loading {file_path}: {e}")
        return documents

    def chunk_text(self, text: str, source_file: str) -> List[Dict]:
        import re
        sentences = re.split(r'[.!?]+', text)
        chunks = []
        current_chunk = ""
        current_tokens = 0

        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
            sentence_tokens = self.count_tokens(sentence)
            if current_tokens + sentence_tokens > self.chunk_size and current_chunk:
                chunks.append({
                    'content': current_chunk.strip(),
                    'source': source_file,
                    'token_count': current_tokens
                })
                overlap_text = ' '.join(current_chunk.split()[-self.chunk_overlap:])
                current_chunk = overlap_text + ' ' + sentence
                current_tokens = self.count_tokens(current_chunk)
            else:
                current_chunk += ' ' + sentence
                current_tokens += sentence_tokens
        if current_chunk.strip():
            chunks.append({
                'content': current_chunk.strip(),
                'source': source_file,
                'token_count': current_tokens
            })
        return chunks

    def build_rag_database(self, file_paths: List[str]):
        print("Building RAG database...")
        documents = self.load_text_files(file_paths)
        all_chunks = []
        for i, doc in enumerate(documents):
            file_name = os.path.basename(file_paths[i])
            chunks = self.chunk_text(doc, file_name)
            all_chunks.extend(chunks)
            print(f"Created {len(chunks)} chunks from {file_name}")
        self.chunks = [chunk['content'] for chunk in all_chunks]
        self.chunk_metadata = all_chunks
        print("Generating embeddings...")
        embeddings = self.embedding_model.encode(self.chunks)
        import faiss
        dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dimension)
        faiss.normalize_L2(embeddings)
        self.index.add(embeddings.astype('float32'))
        print(f"RAG database built with {len(self.chunks)} chunks")

    def retrieve_relevant_chunks(self, query: str, top_k: int = 3) -> List[Dict]:
        if self.index is None:
            return []
        query_embedding = self.embedding_model.encode([query])
        import faiss
        faiss.normalize_L2(query_embedding)
        scores, indices = self.index.search(query_embedding.astype('float32'), top_k)
        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(self.chunks):
                results.append({
                    'content': self.chunks[idx],
                    'source': self.chunk_metadata[idx]['source'],
                    'score': float(scores[0][i]),
                    'tokens': self.chunk_metadata[idx]['token_count']
                })
        return results

    def format_chat_history(self, max_history_tokens: int = 10000) -> str:
        if not self.chat_history:
            return ""
        history_text = ""
        current_tokens = 0
        for exchange in reversed(self.chat_history):
            exchange_text = f"User: {exchange['human']}\nAssistant: {exchange['assistant']}\n"
            exchange_tokens = self.count_tokens(exchange_text)
            if current_tokens + exchange_tokens > max_history_tokens:
                break
            history_text = exchange_text + history_text
            current_tokens += exchange_tokens
        return history_text

    def generate_response(self, user_input: str) -> str:
        relevant_chunks = self.retrieve_relevant_chunks(user_input, top_k=3)
        context = ""
        total_context_tokens = 0
        max_context_tokens = self.max_context_length // 2  # Reserve half for generation
        for chunk in relevant_chunks:
            chunk_text = f"Source ({chunk['source']}): {chunk['content']}\n\n"
            chunk_tokens = self.count_tokens(chunk_text)
            if total_context_tokens + chunk_tokens > max_context_tokens:
                break
            context += chunk_text
            total_context_tokens += chunk_tokens
        history = self.format_chat_history(max_history_tokens=500)

        # AGRICULTURAL-SPECIFIC PROMPT

        system_prompt = """
        - YOUR ROLE IS TO PROVIDE THE INFORMATION RELATED TO AGRICULTURE ONLY, EXCEPT THIS YOU DONT HAVE TO ANSWER ANY OTHER QUESTIONS.
        -MUST NEED TO FOLLOW THE FOLLOWING INSTRUCTIONS:
        You are an expert AI assistant specialized strictly in agriculture.
        -ignore the qeatiions from the different domains such as  IT, political, human science ,many more, you just need to deliver the queries realted to agriculture department only.
You are an expert assistant whose sole purpose is to provide information about agriculture and farming.
ONLY answer questions related to agriculture, crops, soil, irrigation, fertilizers, seeds, agri-technology, pests, livestock, or related fields.
If a user's question is NOT about agriculture or your knowledge base/retrieved context does NOT contain relevant agricultural information, politely respond:
"I'm sorry, but I can only answer questions about agriculture."

Never attempt to answer non-agricultural questions, and do not generate or invent answers that are outside the agricultural domain under any circumstances.
Always base your answers ONLY on the provided agricultural context or data.
never helucinate while giving the output.
-never terminate the output, always pass the output according to the tokens , which is 2048  only, so dont exceed the result beyond the token limits.
"""

        prompt = f"""<|system|>
{system_prompt}
Agricultural knowledge base context (may be empty):
{context or '[No relevant agricultural information found in the database.]'}
Chat history (for reference only):
{history}
<|user|>
{user_input}
<|assistant|>
"""

        inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=self.max_context_length-200)
        device = next(self.model.parameters()).device
        inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=3038,
                temperature=0.2,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
                eos_token_id=self.tokenizer.eos_token_id
            )
        full_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        assistant_start = full_response.find("<|assistant|>")
        if assistant_start != -1:
            response = full_response[assistant_start + len("<|assistant|>"):].strip()
        else:
            response = full_response[len(prompt):].strip()
        return response

    def chat(self, user_input: str) -> str:
        response = self.generate_response(user_input)
        self.chat_history.append({
            'human': user_input,
            'assistant': response
        })
        if len(self.chat_history) > 10:
            self.chat_history = self.chat_history[-10:]
        return response

    def save_chat_history(self, filepath: str):
        import json
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(self.chat_history, f, indent=2, ensure_ascii=False)

    def load_chat_history(self, filepath: str):
        import json
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                self.chat_history = json.load(f)
            print(f"Loaded chat history with {len(self.chat_history)} exchanges")
        except Exception as e:
            print(f"Error loading chat history: {e}")

In [None]:

def main():
    chatbot = TinyLlamaRAGChatbot()
    text_files = [
        "/content/agriculture_data3.txt",
        "/content/agriculture_dataset.txt",
        "/content/combined_2.txt"
        "/content/combined_text.txt"
    ]
    chatbot.build_rag_database(text_files)
    print("\n🤖 TinyLlama RAG Chatbot is ready for agriculture domain!")
    print("Type 'quit' to exit, 'save' to save chat history")
    print("-" * 50)
    while True:
        user_input = input("\nYou: ").strip()
        if user_input.lower() == 'quit':
            break
        elif user_input.lower() == 'save':
            chatbot.save_chat_history("chat_history.json")
            print("Chat history saved!")
            continue
        elif not user_input:
            continue
        response = chatbot.chat(user_input)
        print(f"\nAssistant: {response}")

if __name__ == "__main__":
    main()

Loading TinyLlama model...
Loading embedding model...
RAG Chatbot initialized successfully!
Building RAG database...
Loaded /content/agriculture_data3.txt: 72166 characters
Loaded /content/agriculture_dataset.txt: 1347210 characters
Error loading /content/combined_2.txt/content/combined_text.txt: [Errno 20] Not a directory: '/content/combined_2.txt/content/combined_text.txt'
Created 66 chunks from agriculture_data3.txt
Created 1055 chunks from agriculture_dataset.txt
Generating embeddings...
RAG database built with 1121 chunks

🤖 TinyLlama RAG Chatbot is ready for agriculture domain!
Type 'quit' to exit, 'save' to save chat history
--------------------------------------------------

You: how to install the python in terminal ? 

Assistant: Sure, here's how to install Python in a terminal:

1. Open a terminal window
2. Run the following command to install Python:

```
$ sudo apt-get update
$ sudo apt-get install python3-pip
```

3. Install the required packages for the AI assistant:

``

KeyboardInterrupt: Interrupted by user

In [None]:
Which crops are grown during the summer season in Asia?

How can soil fertility be improved naturally for better crop yield?