In [None]:
!pip install -q fastapi uvicorn pyngrok chromadb sentence-transformers
!pip install -q transformers accelerate bitsandbytes torch
!pip install -q pydantic-settings python-dotenv httpx
print('‚úÖ D√©pendances install√©es')

In [None]:
from google.colab import drive
drive.mount('/content/drive')
print('‚úÖ Google Drive mont√©')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
‚úÖ Google Drive mont√©


In [None]:
# Installer ngrok via snap (plus fiable sur Colab)
!curl -s https://ngrok-agent.s3.amazonaws.com/ngrok.asc | sudo tee /etc/apt/trusted.gpg.d/ngrok.asc >/dev/null
!echo "deb https://ngrok-agent.s3.amazonaws.com buster main" | sudo tee /etc/apt/sources.list.d/ngrok.list
!sudo apt update -qq
!sudo apt install ngrok -qq

# Configurer le token
!ngrok authtoken 361IzWAOvuvUMfJjCWN0hOTPALb_7SPYyHQrzZkdHiXtNrnME

# Tuer les processus ngrok existants
!pkill -f ngrok

print('‚úÖ ngrok install√© et configur√©')


In [None]:
%%writefile database.py
import chromadb
from chromadb.config import Settings

DB_PATH = "/content/drive/MyDrive/conversation_db"

client = chromadb.PersistentClient(
    path=DB_PATH,
    settings=Settings(anonymized_telemetry=False)
)

conversations_collection = client.get_or_create_collection(
    name="conversations",
    metadata={"description": "Conversations metadata"}
)

messages_collection = client.get_or_create_collection(
    name="messages",
    metadata={"description": "Messages with embeddings"}
)

print(f"‚úÖ ChromaDB initialis√©: {DB_PATH}")

Overwriting database.py


In [None]:
%%writefile models.py
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime

@dataclass
class Conversation:
    id: str
    title: str
    mode: str
    created_at: str
    messages: List['Message'] = field(default_factory=list)

@dataclass
class Message:
    id: str
    conversation_id: str
    sender: str
    content: str
    timestamp: str
    suggestions: Optional[List[str]] = None

Overwriting models.py


In [None]:
%%writefile schemas.py
from pydantic import BaseModel
from typing import List, Optional

class MessageCreate(BaseModel):
    content: str
    stream: bool = False

class MessageResponse(BaseModel):
    id: str
    sender: str
    content: str
    timestamp: str
    suggestions: Optional[List[str]] = None

class ConversationCreate(BaseModel):
    mode: str = "user_initiated"
    title: Optional[str] = "New Conversation"

class ConversationResponse(BaseModel):
    id: str
    title: str
    mode: str
    created_at: str
    messages: List[MessageResponse] = []

Overwriting schemas.py


In [None]:
%%writefile history_service.py
from typing import List
from datetime import datetime
import uuid
from models import Conversation, Message
from schemas import ConversationCreate
from database import conversations_collection, messages_collection

class HistoryService:
    async def create_conversation(self, conversation_data: ConversationCreate) -> Conversation:
        conv_id = str(uuid.uuid4())
        created_at = datetime.utcnow().isoformat()

        conversations_collection.add(
            ids=[conv_id],
            metadatas=[{
                "title": conversation_data.title,
                "mode": conversation_data.mode,
                "created_at": created_at
            }],
            documents=[f"{conversation_data.title}"]
        )

        return Conversation(
            id=conv_id,
            title=conversation_data.title,
            mode=conversation_data.mode,
            created_at=created_at,
            messages=[]
        )

    async def get_conversation(self, conversation_id: str) -> Conversation:
        conv_result = conversations_collection.get(ids=[conversation_id])
        if not conv_result['ids']:
            return None

        metadata = conv_result['metadatas'][0]
        messages = await self.get_messages(conversation_id)

        return Conversation(
            id=conversation_id,
            title=metadata['title'],
            mode=metadata['mode'],
            created_at=metadata['created_at'],
            messages=messages
        )

    async def add_message(self, conversation_id: str, sender: str, content: str,
                         embedding: List[float], suggestions=None) -> Message:
        msg_id = str(uuid.uuid4())
        timestamp = datetime.utcnow().isoformat()

        metadata = {
            "conversation_id": conversation_id,
            "sender": sender,
            "timestamp": timestamp
        }

        if suggestions and len(suggestions) > 0:
            metadata["suggestions"] = ",".join(suggestions)

        messages_collection.add(
            ids=[msg_id],
            embeddings=[embedding],
            metadatas=[metadata],
            documents=[content]
        )

        return Message(
            id=msg_id,
            conversation_id=conversation_id,
            sender=sender,
            content=content,
            timestamp=timestamp,
            suggestions=suggestions
        )

    async def get_messages(self, conversation_id: str) -> List[Message]:
        results = messages_collection.get(
            where={"conversation_id": conversation_id}
        )

        messages = []
        for i, msg_id in enumerate(results['ids']):
            metadata = results['metadatas'][i]
            sugg_str = metadata.get('suggestions')
            suggestions = sugg_str.split(',') if sugg_str else None

            messages.append(Message(
                id=msg_id,
                conversation_id=conversation_id,
                sender=metadata['sender'],
                content=results['documents'][i],
                timestamp=metadata['timestamp'],
                suggestions=suggestions
            ))

        messages.sort(key=lambda x: x.timestamp)
        return messages

    async def list_conversations(self, skip: int = 0, limit: int = 100) -> List[Conversation]:
        results = conversations_collection.get()

        conversations = []
        for i, conv_id in enumerate(results['ids']):
            metadata = results['metadatas'][i]
            conversations.append(Conversation(
                id=conv_id,
                title=metadata['title'],
                mode=metadata['mode'],
                created_at=metadata['created_at'],
                messages=[]
            ))

        conversations.sort(key=lambda x: x.created_at, reverse=True)
        return conversations[skip:skip+limit]

    async def delete_conversation(self, conversation_id: str):
        # Supprimer les messages
        messages_collection.delete(where={"conversation_id": conversation_id})
        # Supprimer la conversation
        conversations_collection.delete(ids=[conversation_id])

    async def rename_conversation(self, conversation_id: str, title: str) -> Conversation:
        conv = await self.get_conversation(conversation_id)
        if conv:
            conversations_collection.update(
                ids=[conversation_id],
                metadatas=[{
                    "title": title,
                    "mode": conv.mode,
                    "created_at": conv.created_at
                }],
                documents=[title]
            )
            conv.title = title
        return conv


Overwriting history_service.py


In [None]:
%%writefile chat_service.py
from typing import List, Tuple, Optional
from models import Message
from sentence_transformers import SentenceTransformer
import torch
import os
import time

# CONFIGURATION CACHE
CACHE_DIR = "/content/drive/MyDrive/huggingface_cache"
os.environ['HF_HOME'] = CACHE_DIR
os.environ['TRANSFORMERS_CACHE'] = CACHE_DIR

class ChatService:
    def __init__(self):
        self.model = None
        self.tokenizer = None
        self.embedding_model = None

    def load_models(self):
        if not os.path.exists(CACHE_DIR):
            os.makedirs(CACHE_DIR, exist_ok=True)
            print(f"üìÅ Dossier de cache cr√©√© : {CACHE_DIR}")

        if self.model is None:
            from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

            quantization_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_compute_dtype=torch.float16
            )

            print(f"üîÑ V√©rification du mod√®le Qwen dans {CACHE_DIR}...")

            self.tokenizer = AutoTokenizer.from_pretrained(
                "Qwen/Qwen2.5-7B-Instruct",
                cache_dir=CACHE_DIR
            )
            self.model = AutoModelForCausalLM.from_pretrained(
                "Qwen/Qwen2.5-7B-Instruct",
                quantization_config=quantization_config,
                device_map="auto",
                cache_dir=CACHE_DIR
            )
            print("‚úÖ Qwen 7B charg√©")

        if self.embedding_model is None:
            print("üîÑ Chargement mod√®le embeddings...")
            self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder=CACHE_DIR)
            print("‚úÖ Mod√®le embeddings charg√©")

    def generate_embedding(self, text: str) -> List[float]:
        if self.embedding_model is None:
            self.load_models()
        return self.embedding_model.encode(text).tolist()

    async def generate_response_stream(self, history: List[Message]):
        from transformers import TextIteratorStreamer
        from threading import Thread

        if self.model is None:
            self.load_models()

        messages = [
            {"role": "system", "content": "Tu es un assistant IA intelligent et amical. R√©ponds de mani√®re naturelle et concise en fran√ßais."}
        ]
        for msg in history:
            role = "user" if msg.sender == "user" else "assistant"
            messages.append({"role": role, "content": msg.content})

        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )

        inputs = self.tokenizer([text], return_tensors="pt", truncation=True, max_length=1024).to(self.model.device)

        streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
        generation_kwargs = dict(
            inputs,
            streamer=streamer,
            max_new_tokens=512,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            pad_token_id=self.tokenizer.eos_token_id
        )

        start_time = time.time()
        thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
        thread.start()

        token_count = 0
        first_token_received = False

        for new_text in streamer:
            if not first_token_received:
                latency = time.time() - start_time
                print(f"\n‚è±Ô∏è LATENCE (Temps avant 1er mot) : {latency:.4f} secondes")
                first_token_received = True

            token_count += 1
            yield new_text

        end_time = time.time()
        total_time = end_time - start_time
        tokens_per_sec = token_count / total_time if total_time > 0 else 0

        print(f"‚ö° VITESSE DE GENERATION : {tokens_per_sec:.2f} tokens/seconde")
        print(f"üìù Total tokens (approx stream) : {token_count}")
        print("-" * 30)

    async def generate_response(self, history: List[Message]) -> Tuple[str, Optional[List[str]]]:
        full_response = ""
        async for token in self.generate_response_stream(history):
            full_response += token
        return full_response, None

chat_service = ChatService()

In [None]:
%%writefile chat_router.py
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from typing import List
from schemas import ConversationCreate, ConversationResponse, MessageCreate, MessageResponse
from history_service import HistoryService
from chat_service import chat_service

router = APIRouter()
history_service = HistoryService()

@router.post("/conversations", response_model=ConversationResponse)
async def create_conversation(conversation: ConversationCreate):
    conv = await history_service.create_conversation(conversation)

    if conversation.mode == "ai_initiated":
        greeting = "Bonjour ! Je suis votre assistant IA. Comment puis-je vous aider aujourd'hui ?"
        greeting_embedding = chat_service.generate_embedding(greeting)
        await history_service.add_message(conv.id, "ai", greeting, greeting_embedding, None)
        conv = await history_service.get_conversation(conv.id)

    return ConversationResponse(
        id=conv.id,
        title=conv.title,
        mode=conv.mode,
        created_at=conv.created_at,
        messages=[
            MessageResponse(
                id=m.id,
                sender=m.sender,
                content=m.content,
                timestamp=m.timestamp,
                suggestions=m.suggestions
            ) for m in conv.messages
        ]
    )

@router.get("/conversations", response_model=List[ConversationResponse])
async def list_conversations():
    convs = await history_service.list_conversations()
    return [
        ConversationResponse(
            id=c.id,
            title=c.title,
            mode=c.mode,
            created_at=c.created_at,
            messages=[]
        ) for c in convs
    ]

@router.get("/conversations/{conversation_id}", response_model=ConversationResponse)
async def get_conversation(conversation_id: str):
    conv = await history_service.get_conversation(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")

    return ConversationResponse(
        id=conv.id,
        title=conv.title,
        mode=conv.mode,
        created_at=conv.created_at,
        messages=[
            MessageResponse(
                id=m.id,
                sender=m.sender,
                content=m.content,
                timestamp=m.timestamp,
                suggestions=m.suggestions
            ) for m in conv.messages
        ]
    )

@router.post("/conversations/{conversation_id}/messages", response_model=MessageResponse)
async def send_message(conversation_id: str, message: MessageCreate):
    conv = await history_service.get_conversation(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")

    user_embedding = chat_service.generate_embedding(message.content)
    user_msg = await history_service.add_message(conversation_id, "user", message.content, user_embedding, None)

    history = await history_service.get_messages(conversation_id)

    if message.stream:
        async def event_generator():
            full_response = ""
            async for token in chat_service.generate_response_stream(history):
                full_response += token
                yield token
            
            ai_embedding = chat_service.generate_embedding(full_response)
            await history_service.add_message(conversation_id, "ai", full_response, ai_embedding, None)

        return StreamingResponse(event_generator(), media_type="text/plain")

    ai_response, suggestions = await chat_service.generate_response(history)

    ai_embedding = chat_service.generate_embedding(ai_response)
    ai_msg = await history_service.add_message(conversation_id, "ai", ai_response, ai_embedding, None)

    return MessageResponse(
        id=ai_msg.id,
        sender=ai_msg.sender,
        content=ai_msg.content,
        timestamp=ai_msg.timestamp,
        suggestions=ai_msg.suggestions
    )

@router.delete("/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
    conv = await history_service.get_conversation(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")

    await history_service.delete_conversation(conversation_id)
    return {"message": "Conversation supprim√©e"}

@router.patch("/conversations/{conversation_id}")
async def rename_conversation(conversation_id: str, title: str):
    conv = await history_service.get_conversation(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")

    updated = await history_service.rename_conversation(conversation_id, title)
    return ConversationResponse(
        id=updated.id,
        title=updated.title,
        mode=updated.mode,
        created_at=updated.created_at,
        messages=[
            MessageResponse(
                id=m.id,
                sender=m.sender,
                content=m.content,
                timestamp=m.timestamp,
                suggestions=m.suggestions
            ) for m in updated.messages
        ]
    )


In [None]:
%%writefile main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from chat_router import router

app = FastAPI(title="AI Conversation Backend - Colab")

@app.on_event("startup")
async def startup():
    from chat_service import chat_service
    chat_service.load_models()
    print("‚úÖ Mod√®les pr√©-charg√©s")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(router)

@app.get("/")
async def root():
    return {"message": "Backend running on Colab with ChromaDB + Qwen 7B"}

Overwriting main.py


In [None]:
import nest_asyncio
import uvicorn
from pyngrok import ngrok
import asyncio
import sys
import importlib

# --- FORCER LE RECHARGEMENT DES MODULES ---
# Cela oblige Python √† relire tes fichiers modifi√©s (chat_service.py, etc.)
import chat_service
import chat_router
import history_service
import main

importlib.reload(chat_service)   # Recharge le service avec le chrono
importlib.reload(history_service)
importlib.reload(chat_router)
importlib.reload(main)           # Recharge l'app FastAPI avec les nouveaux liens
# ------------------------------------------

nest_asyncio.apply()

# D√©marrer ngrok
# Note: Si ngrok est d√©j√† lanc√©, il peut donner une erreur, ignore-la ou kill le process avant
try:
    public_url = ngrok.connect(8000)
    print("\n" + "="*60)
    print("üåê URL PUBLIQUE DE TON BACKEND:")
    print(f"   {public_url}")
    print("="*60)
except:
    print("Ngrok d√©j√† actif ou erreur de connexion")

print("\nüöÄ Serveur RELOAD√â et en cours d'ex√©cution... (Logs activ√©s)\n")

# D√©marrer FastAPI
# On pointe directement sur l'objet app recharg√© pour √™tre s√ªr
config = uvicorn.Config(main.app, host="0.0.0.0", port=8000, log_level="info")
server = uvicorn.Server(config)
await server.serve()