In [None]:
pip install bitsandbytes transformers[accelerate] gradio

In [None]:
!pip install fastapi uvicorn pyngrok huggingface_hub torch

In [None]:
!ngrok config add-authtoken <token>

In [None]:
# Import necessary libraries

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from fastapi import FastAPI, Request
from pydantic import BaseModel
from huggingface_hub import login



# Authenticate with Hugging Face using your access token
login("<token>")  # Replace with your actual token

# Model configuration
model_name = "meta-llama/Llama-3.1-8B-Instruct"

# 4-bit quantization configuration
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.float16,
)

# Load tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    torch_dtype=torch.float16,
    device_map="auto",
)


# Chat function
def chat_with_llama(prompt):
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1000).to("cuda")
    outputs = model.generate(
        inputs.input_ids,
        max_length=1000,
        #max_new_tokens=1000,
        temperature=0.7,
        top_k=50,
        do_sample=True,
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response


In [None]:
pip install firebase-admin

In [None]:
import firebase_admin
from firebase_admin import credentials, firestore

# Initialize the Firebase Admin SDK
cred = credentials.Certificate('/content/serviceAccountKey.json')
firebase_admin.initialize_app(cred)

In [None]:
pip install fastapi[all]

In [None]:
from fastapi import FastAPI, Request
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from pyngrok import ngrok  # For exposing the API to the internet
import nest_asyncio
import numpy as np
from typing import List, Optional
import markdown
import random
from sentence_transformers import SentenceTransformer
import faiss
from bs4 import BeautifulSoup, Tag

# Define weights for accuracy and improvement
ALPHA = 0.7
BETA = 0.3
POPULATION_SIZE = 10  # Number of difficulty sets
MUTATION_RATE = 0.2  # Probability of mutation
QUESTION_COUNT = 3  # Only return 3 questions
conversation_history: List[str] = []

db = firestore.client()

nest_asyncio.apply()
# Define the FastAPI app
app = FastAPI()

class UserRequest(BaseModel):
    user_id: str


# Add CORS middleware to allow requests from React (or other web apps)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # This allows requests from any origin. Change it for more security.
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods (GET, POST, etc.)
    allow_headers=["*"],  # Allows all headers
)

def calculate_improvement(current_accuracy, past_accuracies):
    past_average = np.mean(past_accuracies) if past_accuracies else 0
    return current_accuracy - past_average

def generate_initial_population():
    """Generates initial difficulty sets for the population."""
    population = []
    for _ in range(POPULATION_SIZE):
        sequence = [random.choice(["easy", "medium", "difficult"]) for _ in range(QUESTION_COUNT)]
        population.append(sequence)
    return population

def fitness_function(fitness_score, difficulty_set):
    """Evaluates how well a set of 3 difficulties matches the student."""
    difficulty_weights = {"easy": 1, "medium": 2, "difficult": 3}
    total_difficulty = sum(difficulty_weights[d] for d in difficulty_set)

    # Ideal difficulty should match fitness_score * 3 (scaled for 3 questions)
    return -abs(total_difficulty - fitness_score * 3)

def selection(population, fitness_scores):
    """Selects the best difficulty sets using tournament selection."""
    selected = []
    while len(selected) < max(2, len(population) // 2):  # Ensure at least 2
        i, j = random.sample(range(len(population)), 2)
        selected.append(population[i] if fitness_scores[i] > fitness_scores[j] else population[j])
    return selected

def crossover(parent1, parent2):
    """Performs crossover to generate new difficulty assignments."""
    point = random.randint(1, len(parent1) - 1)
    return parent1[:point] + parent2[point:], parent2[:point] + parent1[point:]

def mutate(sequence):
    """Ensures mutation doesn't corrupt the sequence."""
    if random.random() < MUTATION_RATE:
        index = random.randint(0, len(sequence) - 1)
        new_value = random.choice(["easy", "medium", "difficult"])
        sequence[index] = new_value if new_value != sequence[index] else sequence[index]  # Ensure change
    return sequence

def evolve_difficulty_assignment(fitness_score):
    """Uses evolutionary computing to generate an optimal difficulty assignment for 3 questions."""
    population = generate_initial_population()
    print(f"population: {population}")

    for _ in range(10):  # Number of generations
        if not population:
            raise ValueError("Population is empty. Check evolution logic.")

        fitness_scores = [fitness_function(fitness_score, seq) for seq in population]
        selected = selection(population, fitness_scores)

        # Ensure at least 2 individuals exist for crossover
        if len(selected) < 2:
            selected = generate_initial_population()[:2]

        # Crossover and mutation to generate new population
        new_population = []
        for i in range(0, len(selected) - 1, 2):
            child1, child2 = crossover(selected[i], selected[i + 1])
            new_population.extend([mutate(child1), mutate(child2)])

        # Ensure we always have a valid population
        population = new_population if new_population else generate_initial_population()

    print(f"New population: {population}")
    # Ensure we have at least one valid sequence
    if not population:
        raise ValueError("Population is empty after evolution.")

    best_index = np.argmax([fitness_function(fitness_score, seq) for seq in population])
    return population[best_index]


def calculate_improvement(current_accuracy, past_accuracies):
    # Calculate the average of past accuracies
    past_average = np.mean(past_accuracies)
    print(past_average)
    return current_accuracy - past_average

def getcontext(lessonId, subtopicId):
    doc_ref = db.collection('topics').document(lessonId)
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        # Navigate through the nested maps
        subtopics = data.get('subtopics', {})  # Get the subtopics map
        subtopic_data = subtopics.get(subtopicId, {})  # Get the specific subtopic
        summary = subtopic_data.get('summary', 'No summary found')  # Get the summary field
        print("Summary:", summary)
        return summary
    else:
        print("Document does not exist.")

def convert_to_count_format(difficulty_distribution):
    difficulty_counts = {"easy": 0, "medium": 0, "difficult": 0}

    for difficulty in difficulty_distribution:
        difficulty_counts[difficulty] += 1

    return difficulty_counts


'''def filter_content(content):
    prompt = f"""
        Given the content remove unnecessary or repeated words or unnecessary html tags irrelevant of the context. Give only the clean ouput without any additonal texts as we will be directly sending it to the front-end:
        HTML Content: {content}
        AI:
    """
    print(prompt)
    response = chat_with_llama(prompt)
    return response.split("AI:")[-1].strip()'''


model = SentenceTransformer('all-MiniLM-L6-v2')

def filter_context(context: str, query: str, num_chunks: int = 3) -> str:
    """Extract and filter text content from HTML using semantic headings structure"""
    soup = BeautifulSoup(context, 'html.parser')
    chunks = []

    # Extract all headings and their content sections
    headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])

    for heading in headings:
        section_text = []
        # Add heading text (strip HTML tags)
        section_text.append(heading.get_text(separator=" ", strip=True))

        # Collect content until next heading
        next_node = heading.next_sibling
        while next_node and not is_heading(next_node):
            if isinstance(next_node, Tag):
                section_text.append(next_node.get_text(separator=" ", strip=True))
            next_node = next_node.next_sibling

        chunks.append(" ".join(section_text))

    if not chunks:
        return ""

    # Semantic filtering with FAISS
    embeddings = model.encode(chunks, convert_to_tensor=False)
    index = faiss.IndexFlatL2(embeddings.shape[1])
    index.add(np.array(embeddings).astype('float32'))

    query_embedding = model.encode([query], convert_to_tensor=False)
    distances, indices = index.search(
        np.array(query_embedding).astype('float32'),
        min(num_chunks, len(chunks))

    return "\n".join([chunks[i] for i in indices[0]])

def is_heading(tag):
    return isinstance(tag, Tag) and tag.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']

# Request model for FastAPI
class ChatRequest(BaseModel):
    topic: str
    context: Optional[str] = None
    lessonId: Optional[str] = None
    subtopicId: Optional[str] = None
    #custom_prompt: str = None
    custom_prompt: Optional[str] = None
    options: Optional[List[str]] = None  # Add this for quiz options
    correct_answer: Optional[str] = None  # Add this for the correct answer
    selected_answer: Optional[str] = None  # Add this for the selected answer

# Endpoint for quiz generation
@app.post("/generate_quiz")
async def generate_quiz(request: ChatRequest):
    prompt = f"""
    You are tutoring a 12-year-old student.
    Give 5 simple quiz questions each with 4 options for the following topic: {request.topic}
    AI:
    """
    response = chat_with_llama(prompt)
    return {"quiz": response.split("AI:")[1].strip()}

@app.post("/reset_conversation")
async def reset_conversation():
    global conversation_history
    conversation_history = []


# Endpoint for general chat
@app.post("/ask_anything")
async def ask_anything(request: ChatRequest):

    prompt = f"""
        You are a helpful assistant. Respond to the following query and follow these rules:
        1. Use HTML formatting for structure
        2.restrict 18+ content
        Query: {request.topic}
        AI:
    """
    response = chat_with_llama(prompt)
    ai_response = response.split("AI:")[1].strip()
    clean_response = ai_response.split("Note:")[0].strip()
    #html_content = markdown.markdown(ai_response)
    return {"response": clean_response}


# Endpoint for general chat
@app.post("/content_explanation")
async def content_explanation(request: ChatRequest):

    global conversation_history
    raw_context = getcontext(request.lessonId, request.subtopicId)
    relevant_context = filter_context(raw_context, request.topic)
    print(request.lessonId, request.subtopicId)
    prompt = f"""
        You are a helpful assistant tutoring a 12-year-old-student. Given the context and conversation that has been done already, respond to the following query and and format the respnose in html, add appropriate tags wherever necessary:
        Context: {relevant_context}
        Conversation: {conversation_history}
        Query: {request.topic}
        AI:
    """
    print(f"Prompt: {prompt}")
    response = chat_with_llama(prompt)
    ai_response = response.split("AI:")[-1].strip()
    print(f"Ai_response: {ai_response}")
    # Append new conversation history
    conversation_history.append(f"User: {request.topic}")
    conversation_history.append(f"AI: {ai_response}")
    print(f"Conversation History: {conversation_history}")
    #html_content = markdown.markdown(ai_response)
    clean_response = ai_response.split("Note:")[0].strip()
    return {"response": clean_response}

# Endpoint for general chat
@app.post("/simplify")
async def simplify(request: ChatRequest):


    prompt = f"""
        You are a helpful assistant tutoring a 12-year-old-student. Given the content from a textbook about a topic simplify it:
        Textbook Content: {request.topic}
        AI:
    """
    print(prompt)
    response = chat_with_llama(prompt)
    print(response)

    return {"response": response.split("AI:")[-1].strip()}


@app.post("/quiz_explanation")
async def quiz_explanation(request: ChatRequest):
    prompt = f"""
    You are a helpful assistant tutoring a 12-year-old student. Given the quiz questions with options, correct answer, and option selected by the user, provide a detailed feedback on why the selected answer is wrong and explain the correct answer (Format the response in html format, add appropriate tags wherever necessary):
    Quiz question: {request.topic}
    Options: {', '.join(request.options)}
    Correct answer: {request.correct_answer}
    Selected answer: {request.selected_answer}
    AI:
    """
    print(prompt)
    response = chat_with_llama(prompt)
    print(response)
    ai_respnose = response.split("AI:")[-1].strip()
    clean_response = ai_response.split("Note:")[0].strip()
    return {"response": clean_response}
    #return {"response": prompt}


@app.post("/fitness_calculation")
async def fitness_calculation(user_request: dict):
    user_id = user_request["user_id"]
    doc_ref = db.collection('users').document(user_id)

    doc = doc_ref.get()
    if not doc.exists:
        raise HTTPException(status_code=404, detail="User document not found")

    data = doc.to_dict()
    quiz_scores = [value['quizScore'] / 100 for key, value in data.get('progress', {}).items() if 'quizScore' in value]

    if not quiz_scores:
        raise HTTPException(status_code=400, detail="No quiz scores available")

    improvement = calculate_improvement(quiz_scores[-1], quiz_scores[:-1])
    fitness_score = ALPHA * quiz_scores[-1] + BETA * improvement

    # Use evolutionary computing to determine question difficulties for 3 questions
    difficulty_distribution_old = evolve_difficulty_assignment(fitness_score)
    difficulty_distribution = convert_to_count_format(difficulty_distribution_old)

    return {"difficulty_distribution": difficulty_distribution}



# Expose the API to the internet using ngrok
def expose_ngrok():
    url = ngrok.connect(8000)
    print(f"Public URL: {url}")

if __name__ == "__main__":
    # Start ngrok
    expose_ngrok()

    # Run the FastAPI app
    uvicorn.run(app, host="0.0.0.0", port=8000)