---
**Как работает механизм RAG, его реализации вручную и с ипользованием библиотеки Langchain, обзор библиотеки Langchain, а так же пример написания web-приложения чат-бота с механизмом RAG на фреймворке Gradio + Docker**

---
RAG (Retrieval Augmented Generation) — простое и понятное объяснение статья  
https://habr.com/ru/articles/779526/

Векторные БД статьи  
https://habr.com/ru/articles/807957/  
https://habr.com/ru/articles/817173/
https://habr.com/ru/articles/784158/

RAG статьи  
https://habr.com/ru/companies/wunderfund/articles/779748/  
https://habr.com/ru/companies/bothub/articles/825850/  
https://www.promptingguide.ai/research/rag?utm_source=substack&utm_medium=email  
https://vinija.ai/nlp/RAG/  
https://www.rungalileo.io/blog/mastering-rag-how-to-architect-an-enterprise-rag-system  

Туториал по RAG от HF  
https://huggingface.co/learn/cookbook/advanced_rag

Туториалы по langchain и RAG  
https://blog.davideai.dev/series/langchain  
https://habr.com/ru/articles/729664/  

Курс Building Agentic RAG with LlamaIndex  
https://www.deeplearning.ai/short-courses/building-agentic-rag-with-llamaindex/

Компактная реализация RAG через Ollama  
https://github.com/technovangelist/videoprojects/tree/main/2024-04-04-build-rag-with-python  
Видео про RAG где она используется (RU)  
https://www.youtube.com/watch?v=DyOKAVzMWaQ


---

# Gradio app old all peges

Туториал чат бот с помощью нового интерфейса  
https://www.gradio.app/guides/creating-a-chatbot-fast

Туториал чат бот с помощью блоков  
https://www.gradio.app/guides/creating-a-custom-chatbot-with-blocks

Туториал по темам  
https://www.gradio.app/guides/theming-guide

Конструктор тем  
https://huggingface.co/spaces/gradio/theme_builder

Галерея тем  
https://huggingface.co/spaces/gradio/theme-gallery

Доки по блоками и событиям, декораторы  
https://www.gradio.app/guides/blocks-and-event-listeners

Прогресс бар  
https://www.gradio.app/guides/key-features#progress-bars

Туториал по катомному чат боту через gr.ChatBot  
https://www.gradio.app/guides/creating-a-custom-chatbot-with-blocks

Примеры

Пример приложения Опенчат через Докер и create_chat_completion  
https://huggingface.co/spaces/Tomoniai/Open-Chat/blob/main/app.py  

Пример простого приложения с разными базовыми моделями prefix suffix ctransformers   
https://huggingface.co/spaces/daniellefranca96/Open_LLMs_Playground/blob/main/app.py



## Imports

In [None]:
%%time
%%capture
# faiss-gpu or faiss-gpu
!pip install accelerate langchain chromadb sentence_transformers pdfminer.six llama-cpp-python \
                        youtube-transcript-api rank_bm25 faiss-cpu datasets gradio

CPU times: user 533 ms, sys: 149 ms, total: 682 ms
Wall time: 1min 40s


In [None]:
import gc
import psutil
from pathlib import Path
from shutil import rmtree
from collections import deque
from typing import List, Tuple, Dict, Union, Iterable, Optional, Any
import time
import csv
import os

import requests
import torch
from huggingface_hub import hf_hub_download, list_repo_tree, list_repo_files, repo_info, repo_exists
from llama_cpp import Llama, llama
from youtube_transcript_api import YouTubeTranscriptApi
import gradio as gr

from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
from langchain_community.vectorstores import FAISS, Chroma
from langchain_community.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings
from langchain_community.retrievers import BM25Retriever, TFIDFRetriever
from langchain.retrievers import EnsembleRetriever

# Loader classes
from langchain_community.document_loaders import (
    CSVLoader,
    PDFMinerLoader,
    PyPDFLoader,
    TextLoader,
    UnstructuredEmailLoader,
    UnstructuredHTMLLoader,
    UnstructuredMarkdownLoader,
    UnstructuredODTLoader,
    UnstructuredPowerPointLoader,
    UnstructuredWordDocumentLoader,
    WebBaseLoader,
    YoutubeLoader,
    DirectoryLoader,
)

# Annotations
from langchain_core.retrievers import BaseRetriever
from langchain.docstore.document import Document
from langchain_core.vectorstores import VectorStore
from langchain_core.embeddings import Embeddings

`requirements.txt`  
```
llama-cpp-python
youtube-transcript-api
accelerate
langchain
chromadb
sentence_transformers
pdfminer.six
rank_bm25
faiss-cpu
datasets
gradio
```

In [None]:
!pip list | grep -P \
'llama_cpp_python|youtube-transcript-api|accelerate|langchain|chromadb|sentence-transformers|pdfminer.six|rank_bm25|faiss-cpu|datasets|gradio|torch'

# accelerate                               0.27.2
# chromadb                                 0.4.24
# datasets                                 2.18.0
# faiss-cpu                                1.8.0
# gradio                                   4.21.0
# gradio_client                            0.12.0
# langchain                                0.1.11
# langchain-community                      0.0.27
# langchain-core                           0.1.30
# langchain-text-splitters                 0.0.1
# llama_cpp_python                         0.2.56
# pdfminer.six                             20231228
# sentence-transformers                    2.5.1
# tensorflow-datasets                      4.9.4
# torch                                    2.1.0+cu121
# torchaudio                               2.1.0+cu121
# torchdata                                0.7.0
# torchsummary                             1.5.1
# torchtext                                0.16.0
# torchvision                              0.16.0+cu121
# vega-datasets                            0.9.0
# youtube-transcript-api                   0.6.2

accelerate                               0.27.2
chromadb                                 0.4.24
datasets                                 2.18.0
faiss-cpu                                1.8.0
gradio                                   4.21.0
gradio_client                            0.12.0
langchain                                0.1.11
langchain-community                      0.0.27
langchain-core                           0.1.30
langchain-text-splitters                 0.0.1
llama_cpp_python                         0.2.56
pdfminer.six                             20231228
sentence-transformers                    2.5.1
tensorflow-datasets                      4.9.4
torch                                    2.1.0+cu121
torchaudio                               2.1.0+cu121
torchdata                                0.7.0
torchsummary                             1.5.1
torchtext                                0.16.0
torchvision                              0.16.0+cu121
vega-datasets               

Докер - папки для моделей должны быть созданы

In [None]:
'''
FROM python:3.10
WORKDIR /app
COPY ./requirements.txt .
RUN pip install --no-cache-dir --upgrade pip
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY . .
CMD ["python", "app.py"]
'''

In [None]:
'''
docker build -t openchat .
docker run --rm -v ./models/:/app/models/ -v ./embed_models/:/app/embed_models/ -p 80:7860 openchat
'''

## Инициализация моделей

Модель LLM

In [None]:
!mkdir ./models
!mkdir ./embed_models

mkdir: cannot create directory ‘./models’: File exists
mkdir: cannot create directory ‘./embed_models’: File exists


In [None]:
!wget -qq --show-progress -P ./models https://huggingface.co/IlyaGusev/saiga_mistral_7b_gguf/resolve/main/model-q2_K.gguf



In [None]:
from llama_cpp import Llama

model_path = './models/model-q2_K.gguf'
n_ctx = 2000
model = Llama(
    model_path=model_path,
    n_ctx=n_ctx,
    # n_gpu_layers=-1,
)

llama_model_loader: loaded meta data with 21 key-value pairs and 291 tensors from ./models/model-q2_K.gguf (version GGUF V2)
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = llama
llama_model_loader: - kv   1:                               general.name str              = models
llama_model_loader: - kv   2:                       llama.context_length u32              = 32768
llama_model_loader: - kv   3:                     llama.embedding_length u32              = 4096
llama_model_loader: - kv   4:                          llama.block_count u32              = 32
llama_model_loader: - kv   5:                  llama.feed_forward_length u32              = 14336
llama_model_loader: - kv   6:                 llama.rope.dimension_count u32              = 128
llama_model_loader: - kv   7:                 llama.attention.head_count u32              = 32
l

Модель эмбедингов

In [None]:
%%capture
from langchain_community.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings

# статья рейтинг энкодеров https://habr.com/ru/articles/669674/
# репозиторий рейтинг энкодеров https://github.com/avidale/encodechka (RuElectra тогда еще не появилась)

# # 3 модели RuElectra https://huggingface.co/ai-forever?search_models=ruElectra
# model_name = 'ai-forever/ruElectra-medium'  # small 174 MB medium 356 MB large 1.71 GB

# https://huggingface.co/inkoziev/sbert_pq/tree/main
model_name = 'inkoziev/sbert_pq'  # 117 MB

# # https://huggingface.co/cointegrated/rubert-tiny
# model_name = 'cointegrated/rubert-tiny'  # 50M усреднине эмбедингов по длине предложения

# # https://huggingface.co/cointegrated/rubert-tiny2
# model_name = 'cointegrated/rubert-tiny2'  # 100M усреднине эмбедингов по длине предложения

# # https://huggingface.co/cointegrated/LaBSE-en-ru
# model_name = 'cointegrated/LaBSE-en-ru'  # 516M # выход pooler (норм модель)

# # https://huggingface.co/sentence-transformers/all-mpnet-base-v2
# model_name = "sentence-transformers/all-mpnet-base-v2"  # только английский вроде

# # https://huggingface.co/ai-forever/ruBert-large/tree/main
# # https://huggingface.co/ai-forever/ruBert-base
# model_name = 'ai-forever/ruBert-large'  # 1.71 Gb
# model_name = 'ai-forever/ruBert-base'  # 716 MB

# # https://huggingface.co/ai-forever/sbert_large_nlu_ru
# https://huggingface.co/ai-forever/sbert_large_mt_nlu_ru
# model_name = "ai-forever/sbert_large_nlu_ru"  # 1.71 Gb  говорят что норм
# model_name = "ai-forever/sbert_large_mt_nlu_ru"  # 1.71 Gb  Multitask Learning версия обученная на 3 задачах (NLI, NER, Sentiment)

# # https://huggingface.co/sentence-transformers/paraphrase-multilingual-mpnet-base-v2
# model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"  # 1 Gb еще вариант от сайги мистраль ру весит

# # https://huggingface.co/intfloat/multilingual-e5-large
# model_name = 'intfloat/multilingual-e5-large'  # 2.24G еще вариант говорят норм мультиязычная

# за технику усреднения отвечают флаги при выводе модели, например pooling_mode_mean_tokens стоит True
# SentenceTransformer обычно на выходе делают нормализацию эмбедингов torch.nn.functional.normalize(embeddings)
model_kwargs = {"device": "cpu"}
embed_model = HuggingFaceEmbeddings(model_name=model_name, model_kwargs=model_kwargs, cache_folder='./embed_models')

## Приложение после загрузки моделей

Вариант через полностью кастомные элементы

In [None]:
# ======================= КОНФИГ ===================================
LOADER_CLASSES = {
    '.csv': CSVLoader,  # ".csv": (CSVLoader, {"csv_args": {"delimiter": ";"}})
    '.doc': UnstructuredWordDocumentLoader,
    '.docx': UnstructuredWordDocumentLoader,
    '.html': UnstructuredHTMLLoader,
    '.md': UnstructuredMarkdownLoader,
    '.pdf': PDFMinerLoader,
    '.ppt': UnstructuredPowerPointLoader,
    '.pptx': UnstructuredPowerPointLoader,
    '.txt': TextLoader,
    'web': WebBaseLoader,
    'directory': DirectoryLoader,
    'youtube': YoutubeLoader,
}

RETRIEVER_CLASSES = [BM25Retriever, FAISS, TFIDFRetriever]
RETRIEVER_NAMES = [cls.__name__ for cls in RETRIEVER_CLASSES]
SUBTITLES_LANGUAGES = ["Russian", "English"]
HISTORY_LEN = 0
DO_SAMPLE = False

generate_kwargs = dict(
    top_k=40,  # 40
    top_p=0.9,  # 0.95
    temp=0.8,  # 0.8 (параметр называется temperature для __call__, temp для generate)
    repeat_penalty=1.0,  # 1.1
    )

INTERFACE_DESCRIPTION = '''Добро пожаловать
Задайте вопрос для получения ответа от бота
Для настроек семплирования настройте параметры в Additional inputs
Для использования контекста перейдите во вкладку Load documents
'''

TEMPLATES = {
    'Saiga mistral': {
        'SYSTEM_TEMPLATE': '''<s> system
Ты — Сайга, русскоязычный автоматический ассистент. Ты разговариваешь с людьми и помогаешь им.
</s> ''',
        'USER_TEMPLATE': '<s> user\n{message}\n</s> ',
        'BOT_TEMPLATE': '<s> bot\n{message}\n</s> ',
        'ADD_BOS': False,
        },

    'Openchat': {
        'SYSTEM_TEMPLATE': '',
        'USER_TEMPLATE': ' GPT4 Correct User: {message}<|end_of_turn|>',
        'BOT_TEMPLATE': ' GPT4 Correct Assistant: {message}<|end_of_turn|>',
        'ADD_BOS': False,
        },

    'Mistral instruct': {
        'SYSTEM_TEMPLATE': '',
        'USER_TEMPLATE': ' [INST] {message} [/INST]',
        'BOT_TEMPLATE': '{message}</s>  ',
        'ADD_BOS': True,
        },
    }

DEFAULT_TEMPLATE = TEMPLATES['Saiga mistral']

CONTEXT_TEMPLATE = '''Дан контекст:
{context}
Дан вопрос:
{message}
Ответ:'''

CSS = '''
.gradio-container {width: 80% !important}
'''

# ================ ОЧИСТКА ТЕКСТА ====================================
def clear_text(text: str) -> str:
    lines = text.split('\n')
    lines = [line for line in lines if len(line.strip()) > 2]
    text = '\n'.join(lines).strip()
    return text

def clear_documents(documents: List[Document]) -> List[Document]:
    output_documents = []
    for document in documents:
        text = clear_text(document.page_content)
        if len(text) > 10:
            document.page_content = text
            output_documents.append(document)
    return output_documents


# ====================== ДОП ФУНКЦИИ ЗАГРУЗКИ ФАЙЛОВ ===================
def get_csv_delimiter(file_path: str) -> str:
    n_bytes = 4096
    with open(file_path) as csvfile:
        delimiter = csv.Sniffer().sniff(csvfile.read(n_bytes)).delimiter
    return delimiter

def load_documents_from_files(upload_files: List[str]) -> Tuple[List[Document], str]:
    load_log = ''
    documents = []
    for upload_file in upload_files:
        file_extension = f'.{upload_file.rsplit(".", 1)[-1]}'
        if file_extension in LOADER_CLASSES:
            loader_kwargs = {}
            loader_class = LOADER_CLASSES[file_extension]
            if file_extension == '.csv':
                delimiter = get_csv_delimiter(upload_file)
                loader_kwargs = {'csv_args': {'delimiter': delimiter}}
            try:
                load_documents = loader_class(upload_file, **loader_kwargs).load()
                documents.extend(load_documents)
            except Exception as ex:
                load_log += f'Ошибка загрузки файла: {upload_file}\n'
                load_log += f'Код ошибки: {ex}\n'
                continue
        else:
            load_log += f'Неподдерживаемый формат файла {upload_file}\n'
            continue
    return documents, load_log


def load_documents_from_links(
        web_links: str,
        subtitles_lang: str,
        ) -> Tuple[List[Document], str]:
    load_log = ''
    documents = []
    loader_class_kwargs = {}

    web_links = [web_link.strip() for web_link in web_links.split('\n') if web_link.strip()]
    for web_link in web_links:
        # ----------------- ссылка на Ютуб ---------------------------------
        if 'youtube.com' in web_link:
            # проверка что субтитры на выбранном языке subtitles_lang доступны в видео web_link
            youtube_id = web_link.split('watch?v=')[-1].split('&')[0]
            available_langs = [t.language for t in list(YouTubeTranscriptApi.list_transcripts(youtube_id))]
            if subtitles_lang not in str(available_langs):
                load_log += f'Язык субтитров {subtitles_lang} недоступен для видео {web_link}\n'
                continue
            if len(available_langs) == 1 and 'auto-generated' in str(available_langs):
                load_log += f'Загружены автоматические субтитры, ручные недоступны для видео {web_link}\n'
            # если субтитры доступны то будем доставать их с помощью именованных аргументов
            loader_class = LOADER_CLASSES['youtube'].from_youtube_url
            language = subtitles_lang[:2].lower()
            loader_class_kwargs = {'language': language}

        # ----------------- ссылка не на Ютуб -------------------------------
        else:
            loader_class = LOADER_CLASSES['web']
        try:
            if requests.get(web_link).status_code != 200:
                load_log += f'Ссылка недоступна для Python: {web_link}\n'
                continue
            load_documents = loader_class(web_link, **loader_class_kwargs).load()
            documents.extend(load_documents)
        except MissingSchema:
            load_log += f'Неверная ссылка: {web_link}\n'
            continue
        except Exception as ex:
            load_log += f'Ошибка загрузки лоадером данных по ссылке: {web_link}\n'
            load_log += f'Код ошибки: {ex}\n'
            continue
    return documents, load_log


# =================== ИНИЦИАЛИЗАЦИЯ РЕТРИВЕРА =========================
def create_retriver(
        documents: List[Document],
        retriever_classes: List[BaseRetriever],
        k: int,
        ) -> BaseRetriever:
    retrievers = []
    for retriever_class in retriever_classes:
        if retriever_class.__name__ in ('FAISS', 'Chroma'):
            db = retriever_class.from_documents(
                documents=documents,
                embedding=embed_model,
                )
            retriever = db.as_retriever(search_kwargs={'k': k})
        else:
            retriever = retriever_class.from_documents(documents=documents)
            retriever.k = k
        retrievers.append(retriever)
    if len(retrievers) == 1:
        final_retriver = retrievers[0]
    else:
        weights = [0.5] * len(retrievers)
        final_retriver = EnsembleRetriever(retrievers=retrievers, weights=weights)
    return final_retriver


# ============= ГЛАВНАЯ ФУНКЦИЯ ОКНА ЗАГРУЗКИ ======================
def load_documents_and_create_retriver(
        upload_files: Optional[List[str]],
        web_links: str,
        subtitles_lang: str,
        chunk_size: int,
        chunk_overlap: int,
        k: Union[int, str],
        retriever_indexes: List[int],
        ) -> Tuple[List[Document], Optional[BaseRetriever], str]:

    documents = []
    retriever = None

    if not retriever_indexes:
        load_log = 'Не выбран ретривер'
        return documents, retriever, load_log

    if upload_files is None and not web_links:
        load_log = 'Не выбраны файлы или ссылки'
        return documents, retriever, load_log

    progress = gr.Progress()
    all_documents = []
    load_log = ''

    if upload_files is not None:
        progress(0.3, desc='Шаг 1/2: Загрузка документов из файлов')
        docs, log = load_documents_from_files(upload_files)
        all_documents.extend(docs)
        load_log += log

    if web_links:
        progress(0.3 if upload_files is None else 0.5, desc='Шаг 1/2: Загрузка документов по ссылкам')
        docs, log = load_documents_from_links(web_links, subtitles_lang)
        all_documents.extend(docs)
        load_log += log

    if len(all_documents) == 0:
        load_log += 'Загрузка прервана так как не было извлечено ни одного документа\n'
        load_log += 'Режим RAG не может быть активирован'
        return documents, retriever, load_log

    load_log += f'Загружено документов: {len(all_documents)}\n'

    if k == 'max':
        documents = clear_documents(all_documents)
        load_log += f'Используются документы без разделения в кол-ве: {len(documents)}\n'
        k = 1
    else:
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )
        documents = text_splitter.split_documents(all_documents)
        documents = clear_documents(documents)
        load_log += f'Документы разделены, кол-во фрагментов текста: {len(documents)}\n'

    progress(0.7, desc='Шаг 2/2: Инициализация ретривера')

    retriever_classes = [RETRIEVER_CLASSES[i] for i in retriever_indexes]
    retriever = create_retriver(documents, retriever_classes, k)

    load_log += 'Режим RAG активирован и может быть дективирован на вкладке Generate'
    return documents, retriever, load_log


# ================== ГЛАВНАЯ ФУНКЦИЯ ОКНА БОТА ==========================

def get_promt_with_context(
        user_message: str,
        chatbot: List[List[Optional[str]]],
        history_len: int,
        rag_mode: bool,
        retriever: BaseRetriever,
        bos_token: str,
        context_template,
        *template_components: List[str],
        ):

    # user_message = user_message.strip()  # ?
    chatbot.append([user_message, None])
    if not user_message:
        return '', chatbot, full_promt

    CONTEXT_TEMPLATE = context_template
    SYSTEM_TEMPLATE, USER_TEMPLATE, BOT_TEMPLATE, ADD_BOS = template_components

    full_promt = ''
    if SYSTEM_TEMPLATE:
        full_promt += SYSTEM_TEMPLATE

    if ADD_BOS:
        full_promt += bos_token

    if history_len != 0:
        for user_msg, bot_msg in chatbot[-history_len:]:
            full_promt += USER_TEMPLATE.format(message=user_msg)
            full_promt += BOT_TEMPLATE.format(message=bot_msg)

    if retriever is not None and rag_mode:
        retriever_docs = retriever.invoke(user_message)
        retriever_context = '\n'.join([doc.page_content for doc in retriever_docs])
        user_message = CONTEXT_TEMPLATE.format(
            message=user_message,
            context=retriever_context,
            )

    full_promt += USER_TEMPLATE.format(message=user_message)
    full_promt += BOT_TEMPLATE.split('{message}')[0]
    return '', chatbot, full_promt

def generate_text(
        chatbot: List[List[Optional[str]]],
        full_promt: str,
        history_len: int,
        do_sample: bool,
        *generate_args: List[Union[int, float]],
        ):
    if chatbot[-1][0] == '':
        yield chatbot[:-1]
    else:
        promt = full_promt
        promt_tokens = model.tokenize(promt.encode('utf-8'), special=True, add_bos=False)
        gen_kwargs = dict(zip(generate_kwargs.keys(), generate_args))
        if not do_sample:
            gen_kwargs['top_k'] = 1
            gen_kwargs['repeat_penalty'] = 1

        # generator = 'И тебе привет'
        chatbot[-1][1] = ''
        generator = model.generate(promt_tokens, **gen_kwargs)
        for token in generator:
            if token == model.token_eos():
                break
            character = model.detokenize([token]).decode('utf-8', errors='ignore')
            # character = token
            chatbot[-1][1] += character
            yield chatbot


# =================== ИНТЕРФЕЙС ПРИЛОЖЕНИЯ ===========================
# gr.themes.Glass() Glass Soft Monochrome Default Base
with gr.Blocks(theme=gr.themes.Monochrome(), css=CSS) as demo:
    documents = gr.State([])
    retriever = gr.State(None)
    full_promt = gr.State('')

    def get_chatbot(chatbot_history: list = [], rag_mode_label: bool = False):
        label = 'RAG' if rag_mode_label else 'Chatbot'
        chatbot = gr.Chatbot(
                value=chatbot_history,
                scale=0,
                min_width=100,  # 160
                height=300,
                show_copy_button=True,
                bubble_full_width=False,
                label=label,
                )
        return chatbot

    # ==================== СТРАНИЦА БОТА =================================

    with gr.Tab(label='Generate'):
        # chatbot_title = gr.Markdown('Чат бот в режиме RAG')
        with gr.Row():
            with gr.Column(scale=3):
                chatbot = get_chatbot()
                user_message = gr.Textbox(label='User')
                with gr.Row():
                    user_message_btn = gr.Button('Отправить')
                    stop_btn = gr.Button('Стоп')
                    clear_btn = gr.Button('Очистить чат')

                rag_mode = gr.Checkbox(value=False, label='Режим RAG', scale=1, visible=False)
                rag_mode.change(fn=get_chatbot, inputs=[chatbot, rag_mode], outputs=chatbot)

            # ------------------ ПАРАМЕТРЫ ГЕНЕРАЦИИ -------------------------
            def get_generate_args(do_sample: bool):
                visible = do_sample
                generate_args = [
                    gr.Slider(5, 50, generate_kwargs['top_k'], 5, 'top_k', visible=visible),
                    gr.Slider(0, 1, generate_kwargs['top_p'], 0.1, 'top_p', visible=visible),
                    gr.Slider(0.1, 4, generate_kwargs['temp'], 0.1, 'temp', visible=visible),
                    gr.Slider(1, 5, generate_kwargs['repeat_penalty'], 0.1, 'repeat_penalty', visible=visible),
                ]
                return generate_args

            with gr.Column(scale=1, min_width=80):
                with gr.Group():
                    gr.Markdown('Длина истории')
                    history_len = gr.Slider(
                        minimum=0,
                        maximum=5,
                        value=HISTORY_LEN,
                        step=1,
                        info='Кол-во предыдущих сообщенией, учитываемых в истории',
                        label='history len',
                        show_label=False,
                        )

                    with gr.Group():
                        gr.Markdown('Параметры генерации')
                        do_sample = gr.Checkbox(value=DO_SAMPLE, label='do_sample')
                        generate_args = get_generate_args(do_sample.value)

                    do_sample.change(
                        fn=get_generate_args,
                        inputs=do_sample,
                        outputs=generate_args,
                        show_progress=False,
                        )

        # ------------------ ТЕКСТ ПРОМТА С КОНТЕКСТОМ --------------------
        full_promt = gr.Textbox(
            label='Полный текст текущего промта с контекстом',
            interactive=False,
            )

        # -------- КОМПОНЕНТЫ РЕДАКТИРОВАНИЯ ПРОМТОВ И СПЕЦ ТОКЕНОВ --------
        def get_bos_token(visible=False, render=True):
            bos_token_component = gr.Textbox(
                value='<s>',
                label='Bos token',
                visible=visible,
                render=render,
                )
            return bos_token_component

        # добавлять токен {BOS} только один раз вначале шаблона юзера
        add_bos = gr.Checkbox(
            value=False,
            label='add bos',
            scale=1,
            render=False,
            info='Добавлять ли токен BOS один раз вначале диалога перед промтом пользователя',
            )
        bos_token = get_bos_token(visible=False, render=False)
        add_bos.change(
            fn=get_bos_token,
            inputs=add_bos,
            outputs=bos_token,
            show_progress=False,
            )

        system_template = gr.Textbox(
            DEFAULT_TEMPLATE['SYSTEM_TEMPLATE'],
            label='System template',
            lines=4,
            render=False,
            )
        user_template = gr.Textbox(
            DEFAULT_TEMPLATE['USER_TEMPLATE'],
            label='User template',
            lines=4,
            render=False,
            )
        bot_template = gr.Textbox(
            DEFAULT_TEMPLATE['BOT_TEMPLATE'],
            label='Bot template',
            lines=4,
            render=False,
            )
        template_components = [system_template, user_template, bot_template, add_bos]

        context_template = gr.Textbox(
            CONTEXT_TEMPLATE,
            label='Context template',
            lines=6,
            render=False,
            )

        # ------------------ КНОПКИ ОТПРАВИТЬ ОЧИСТИТЬ И СТОП ------------
        # нажатие Enter и кнопка отправить
        generate_event = gr.on(
            triggers=[user_message.submit, user_message_btn.click],
            fn=get_promt_with_context,
            inputs=[user_message, chatbot, history_len, rag_mode, retriever,
            bos_token, context_template, *template_components],
            outputs=[user_message, chatbot, full_promt],
            queue=True,
        ).then(
            fn=lambda promt: promt,  # попробовать поставить lambda: full_promt
            inputs=full_promt,
            outputs=full_promt,
            queue=False,
        ).then(
            fn=generate_text,
            inputs=[chatbot, full_promt, history_len, do_sample, *generate_args],
            outputs=chatbot,
            queue=True,
            )
        # кнопка Стоп
        stop_btn.click(
            fn=None,
            inputs=None,
            outputs=None,
            cancels=generate_event,
            queue=False,
        )
        # кнопка Очистить чат
        clear_btn.click(
            fn=lambda: (None, ''),
            inputs=None,
            outputs=[chatbot, full_promt],
            queue=False,
            )

    # ===================== СТРАНИЦА ЗАГРУЗКИ ФАЙЛОВ =========================

    def get_chunk_size_overlap(k: int = 2):
        visible = k != 'max'
        chunk_size = gr.Slider(50, 2000, value=500, step=50, label='Длина фрагментов', visible=visible)
        chunk_overlap = gr.Slider(0, 200, value=20, step=10, label="Длина пересечения фрагментов", visible=visible)
        return chunk_size, chunk_overlap

    with gr.Tab(label='Load documents'):
        with gr.Row():
            upload_files = gr.File(file_count='multiple', label='Загрузка текстовых файлов')
            web_links = gr.Textbox(lines=8, label='Ссылки на Web сайты или Ютуб')
        with gr.Row():
            with gr.Column(scale=2):
                retriever_indexes = gr.CheckboxGroup(
                    choices=RETRIEVER_NAMES,
                    value=RETRIEVER_NAMES[0],
                    type='index',
                    label='Ретривер',
                    )
                with gr.Row():
                    k = gr.Radio(
                        [1, 2, 3, 'max'],
                        value=2,
                        label='Количество релевантных документов для поиска',
                        )
                    subtitles_lang = gr.Radio(
                        SUBTITLES_LANGUAGES,
                        value=SUBTITLES_LANGUAGES[0],
                        label="Язык субтитров YouTube",
                        )

            with gr.Column(scale=1, min_width=160):
                chunk_size, chunk_overlap = get_chunk_size_overlap()
                k.change(
                    get_chunk_size_overlap,
                    inputs=k,
                    outputs=[chunk_size, chunk_overlap],
                    show_progress=False,
                    )

        load_documents_btn = gr.Button(value='Загрузить документы и создать ретривер')
        load_docs_log = gr.Textbox(label='Прогресс загрузки и разделения документов')

        load_event = load_documents_btn.click(
            fn=load_documents_and_create_retriver,
            inputs=[upload_files, web_links, subtitles_lang, chunk_size,
                    chunk_overlap, k, retriever_indexes],
            outputs=[documents, retriever, load_docs_log],
            )

        def load_success(chatbot_history, retriever):
            rag_mode = retriever is not None
            chatbot = get_chatbot(chatbot_history, rag_mode)
            rag_mode_checkbox = gr.Checkbox(value=rag_mode, label='Режим RAG', scale=1, visible=rag_mode)
            return chatbot, rag_mode_checkbox

        load_event.success(load_success, [chatbot, retriever], [chatbot, rag_mode])

    # ================= СТРАНИЦА ПРОСМОТРА ВСЕХ ДОКУМЕНТОВ =================

    with gr.Tab(label='View documents'):
        view_documents_btn = gr.Button(value='Отобразить загруженные фрагменты')
        view_documents_textbox = gr.Textbox(
            lines=1,
            placeholder='Для просмотра фрагментов загрузите документы ан вкладке Load documents',
            label='Загруженные фрагменты',
            )
        sep = '=' * 20
        view_documents_btn.click(
            lambda documents: f'\n{sep}\n\n'.join([doc.page_content for doc in documents]),
            inputs=documents,
            outputs=view_documents_textbox,
        )

    # ================= СТРАНИЦА РЕДАКТИРОВАНИЯ ШАБЛОНА ПРОМТА ===============

    def select_template(template_name):
        template = list(TEMPLATES[template_name].values())
        return template

    def get_example_prompt(bos_token, *template_components):
        user_message = 'Как дела?'
        chatbot_history = [['Привет', 'Здорова']]

        _, _, example_prompt = get_promt_with_context(
            user_message, chatbot_history, 1, False, None,
            bos_token, '', *template_components)

        example_prompt = gr.Textbox(
            value=example_prompt,
            label='Example prompt',
            lines=example_prompt.count('\n'),
            )
        return example_prompt

    with gr.Tab(label='Edit pomt templates'):
        with gr.Group():
            gr.Markdown('Готовые шаблоны по умолчанию')
            default_template = gr.Dropdown(
                choices=list(TEMPLATES.keys()),
                value=None,
                label='Default Templates',
                )
            default_template.change(
                fn=select_template,
                inputs=default_template,
                outputs=[*template_components],
            )

        with gr.Group():
            gr.Markdown('Настройки форматирования промтов перед подачей в модель')
            with gr.Row():
                add_bos.render()
                bos_token.render()

        with gr.Row():
            with gr.Column():
                system_template.render()
                user_template.render()
                bot_template.render()

            with gr.Column():
                example_prompt = get_example_prompt(
                    bos_token.value,
                    *[c.value for c in template_components],
                    )

        gr.on(
            triggers=[c.change for c in [add_bos, bos_token, *template_components]],
            fn=get_example_prompt,
            inputs=[bos_token, *template_components],
            outputs=[example_prompt],
            queue=False,
            )

        with gr.Group():
            gr.Markdown('Настройки форматирования промта при условии контекста (RAG)')
            context_template.render()

demo.queue().launch(debug=True, width='70%', height=700)

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://24d4cad74e6bb66528.gradio.live




## Страница загрузки моделей

Страница загрузки моделей отдельно  
API HF Python  
https://huggingface.co/docs/huggingface_hub/guides/repository

In [None]:
EMBED_MODELS_PATH = Path('embed_models')
MODELS_PATH = Path('models')

EMBEDERS_IDS = [
    # https://huggingface.co/cointegrated/rubert-tiny2
    'cointegrated/rubert-tiny2',
    # ttps://huggingface.co/cointegrated/rubert-tiny
    'cointegrated/rubert-tiny',
    # https://huggingface.co/cointegrated/LaBSE-en-ru
    'cointegrated/LaBSE-en-ru',
    # https://huggingface.co/intfloat/multilingual-e5-large
    'intfloat/multilingual-e5-large',
    # https://huggingface.co/sentence-transformers/all-mpnet-base-v2
    'sentence-transformers/all-mpnet-base-v2',
    # https://huggingface.co/sentence-transformers/paraphrase-multilingual-mpnet-base-v2
    'sentence-transformers/paraphrase-multilingual-mpnet-base-v2',
    # https://huggingface.co/inkoziev/sbert_pq/tree/main
    'inkoziev/sbert_pq',
    # https://huggingface.co/ai-forever?search_models=ruElectra
    'ai-forever/ruElectra-medium',
    # https://huggingface.co/ai-forever/sbert_large_nlu_ru
    'ai-forever/sbert_large_nlu_ru',
]

MODEL_IDS = [
    'IlyaGusev/saiga_mistral_7b_gguf',
    'TheBloke/openchat-3.5-0106-GGUF',
    'TheBloke/Mistral-7B-Instruct-v0.1-GGUF',
]


def get_memory_usage():
    '''Печатает кол-во свободной ОЗУ и свободной видеопамяти (если видеокарта она доступна)'''
    memory_type = 'CPU'
    psutil_stats = psutil.virtual_memory()
    memory_usage = psutil_stats.used / 1024**3
    memory_total = psutil_stats.total / 1024**3
    print_memory = f'{memory_type} Menory Usage: {memory_usage:.2f} / {memory_total:.2f} GB'

    if torch.cuda.is_available():
        memory_type = 'GPU'
        memory_free, memory_total = torch.cuda.mem_get_info()
        memory_usage = total_memory - free_memory
        print_memory += f'{memory_type} Menory Usage: {memory_usage / 1024**3:.2f} / {memory_total:.2f} GB'

    print_memory = f'---------------\n{print_memory}\n---------------\n'
    return print_memory


def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()


def load_model(model_id, model_file, n_ctx):
    model = None
    if isinstance(model_file, list):
        load_log = 'Не выбрана модель'
        return model, load_log

    load_log = ''
    if '(' in model_file:
        model_file = model_file.split('(')[0].rstrip()

    progress = gr.Progress()
    progress(0.3, desc='Шаг 1/2: Загрузка модели GGUF')
    model_path = MODELS_PATH / model_file
    if model_path.is_file():
        load_log += 'Модель уже загружена, повторная инициализация\n'
    else:
        try:
            hf_hub_download(
                repo_id=model_id,
                filename=model_file,
                local_dir=MODELS_PATH,
                )
            load_log += 'Модель успешно загружена\n'
        except Exception as ex:
            model_path = ''
            load_log += f'Ошибка загрузки модели, код ошибки:\n{ex}\n'

    if model_path:
        progress(0.7, desc='Шаг 2/2: Инициализация модели')
        try:
            model = Llama(model_path=str(model_path), n_ctx=n_ctx)
            load_log += f'Модель {model_id}/{model_file} инициализирована\n'
        except Exception as ex:
            load_log += f'Ошибка инициализации модели, код ошибки:\n{ex}\n'

    clear_memory()
    model = {'model': model}
    return model, load_log


def load_embed_model(model_id):
    embed_model = None
    if isinstance(model_id, list):
        load_log = 'Не выбрана модель'
        return embed_model, load_log

    progress = gr.Progress()
    load_log = ''
    folder_name = model_id.replace('/', '_')
    if Path(folder_name).is_dir():
        load_log += f'Модель {model_id} уже загружена, повторная инициализация\n'
        progress(0.5, desc='Шаг 1/1: Инициализация загруженной модели модели')
    else:
        progress(0.5, desc='Шаг 1/1: Загрузка и инициализация модели')

    model_kwargs = {'device': 'cuda' if torch.cuda.is_available() else 'cpu'}
    embed_model = HuggingFaceEmbeddings(
        model_name=model_id,
        model_kwargs=model_kwargs,
        cache_folder=str(EMBED_MODELS_PATH),
        )
    load_log += f'Модель эмбедингов {model_id} успешно загружена и инициализирована\n'

    clear_memory()
    embed_model = {'embed_model': embed_model}
    return embed_model, load_log


# загрузка моделей
start_model = load_model(MODEL_IDS[0], 'model-q2_K.gguf', 2000)[0]['model']
start_embed_model = load_embed_model(EMBEDERS_IDS[0])[0]['embed_model']


def add_new_model_id(new_model_id: str, model_ids):
    load_log = ''
    model_id = new_model_id.strip()
    if model_id:
        model_id = model_id.split('/')[-2:]
        if len(model_id) == 2:
            model_id = '/'.join(model_id).split('?')[0]
            if repo_exists(model_id) and model_id not in model_ids:
                if any([file_name.endswith('.gguf') for file_name in list_repo_files(model_id)]):
                    model_ids.insert(0, model_id)
                    load_log += f'Репозиторий модели {model_id} успешно добавлен\n'
                else:
                    load_log += f'Не найдены модели GGUF в репозитории {model_id}\n'
            else:
                load_log += 'Неверное название репозитория HF\n'
        else:
            load_log += 'Неверная ссылка на репозиторий HF\n'
    else:
        load_log += 'Пустая строка в поле репозитория HF\n'
    model_id_dropdown = gr.Dropdown(choices=model_ids, value=model_ids[0])
    return model_id_dropdown, load_log


def get_model_paths(model_id):
    load_log = ''
    repo_files = list(list_repo_tree(model_id))
    repo_files = [file for file in repo_files if file.path.endswith('.gguf')]
    model_paths = [f'{file.path} ({file.size / 1000 ** 3:.2f}G)' for file in repo_files]

    model_paths_dropdown = gr.Dropdown(
        choices=model_paths,
        value=model_paths[0],
        label='Файл модели GGUF',
        )
    return model_paths_dropdown


def clear_folder(ignore_link):
    folder = EMBED_MODELS_PATH
    if len(ignore_link.split('/')) != 2:
        folder = MODELS_PATH
        if '(' in ignore_link:
            ignore_link = ignore_link.split('(')[0].rstrip()

    for path in folder.iterdir():
        if path.name == ignore_link:
            continue
        if path.is_file():
            path.unlink()
        elif path.is_dir():
            rmtree(path)


with gr.Blocks(theme=gr.themes.Monochrome()) as demo:
    # ================= ЗАГРУЗКА GGUF МОДЕЛЕЙ ============================
    with gr.Tab('Load model'):
        model_ids_state = gr.State(MODEL_IDS)
        model = gr.State({'model': start_model})

        new_model_id = gr.Textbox(
            value='',
            label='Добавить репозиторий',
            placeholder='Ссылка на репозиторий HF моделей в формате GGUF',
            )
        new_model_btn = gr.Button('Добавить репозиторий')

        model_id = gr.Dropdown(
            choices=MODEL_IDS,
            value=None,
            label='Репозиторий модели HF',
            )
        model_path = gr.Dropdown(
            choices=[],
            value=None,
            label='Файл модели GGUF',
            )

        n_ctx = gr.Slider(500, 500 * 8, step=500, label='n_ctx')
        load_model_btn = gr.Button('Загрука и инициализация модели')
        load_model_log = gr.Textbox(
            value='Модель IlyaGusev/saiga_mistral_7b_gguf/model-q2_K.gguf загружена по умолчанию',
            label='Статус загрузки модели',
            )

        with gr.Group():
            gr.Markdown('Освободить место на диске путем удаления всех моделей кроме текущей')
            remove_models_btn = gr.Button('Очистить папку')

        new_model_btn.click(
            fn=add_new_model_id,
            inputs=[new_model_id, model_ids_state],
            outputs=[model_id, load_model_log],
        ).success(
            fn=lambda: '',
            inputs=None,
            outputs=new_model_id,
        )

        model_id.change(
            fn=get_model_paths,
            inputs=[model_id],
            outputs=[model_path],
        )

        load_model_btn.click(
            fn=load_model,
            inputs=[model_id, model_path, n_ctx],
            outputs=[model, load_model_log],
            queue=True,
        ).success(
            fn=lambda log: log + get_memory_usage(),
            inputs=load_model_log,
            outputs=load_model_log,
        )

        remove_models_btn.click(
            fn=clear_folder,
            inputs=[model_path],
            outputs=None,
        ).success(
            fn=lambda model: f'Модели кроме {model} удалены',
            inputs=model_path,
            outputs=None,
        )

    # ================= ЗАГРУЗКА ЭМБЕДИНГ МОДЕЛЕЙ ========================
    with gr.Tab('Load embed model'):
        embed_ids_state = gr.State(EMBEDERS_IDS)
        embed_model = gr.State({'embed_model': start_embed_model})

        new_embed_id = gr.Textbox(
            value='',
            label='Добавить репозиторий',
            placeholder='Ссылка на репозиторий модели HF',
            )
        new_embed_btn = gr.Button('Добавить репозиторий')

        embed_id = gr.Dropdown(
            choices=EMBEDERS_IDS,
            value=None,
            label='Репозиторий модели HF',
            )

        load_embed_btn = gr.Button('Загрука и инициализация модели')
        load_embed_log = gr.Textbox(
            value=f'Модель {EMBEDERS_IDS[0]} загружена по умолчанию',
            label='Статус загрузки модели',
            )
        with gr.Group():
            gr.Markdown('Освободить место на диске путем удаления всех моделей кроме текущей')
            remove_embed_models_btn = gr.Button('Очистить папку')

        new_embed_btn.click(
            fn=add_new_model_id,
            inputs=[new_embed_id, embed_ids_state],
            outputs=[embed_id, load_embed_log],
        ).success(
            fn=lambda: '',
            inputs=None,
            outputs=new_embed_id,
        )

        load_embed_btn.click(
            fn=load_embed_model,
            inputs=[embed_id],
            outputs=[embed_model, load_embed_log],
        ).success(
            fn=lambda log: log + get_memory_usage(),
            inputs=load_embed_log,
            outputs=load_embed_log,
        )

        remove_embed_models_btn.click(
            fn=clear_folder,
            inputs=[embed_id],
            outputs=None,
        ).success(
            fn=lambda model: f'Модели кроме {model} удалены',
            inputs=embed_id,
            outputs=None,
        )

demo.queue().launch(debug=True, width='70%', height=700)

## Страница About

In [None]:
import gradio as gr

markdown = '''

'''

CSS = '''
.gradio-container {width: 80% !important}
'''

# gr.themes.Glass() Glass Soft Monochrome Default Base
with gr.Blocks(theme=gr.themes.Monochrome()) as demo:  # css=CSS
    gr.Markdown(markdown)

demo.queue().launch(debug=True, width='70%', height=700)

## Итоговый вариант

### Импорты

In [None]:
import gc
import psutil
from pathlib import Path
from shutil import rmtree
from collections import deque
from typing import List, Tuple, Dict, Union, Iterable, Optional, Any
import time
import csv
import os

import requests
from requests.exceptions import MissingSchema
import torch
from huggingface_hub import hf_hub_download, list_repo_tree, list_repo_files, repo_info, repo_exists, snapshot_download

from llama_cpp import Llama, llama
from youtube_transcript_api import YouTubeTranscriptApi
import gradio as gr

from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
from langchain_community.vectorstores import FAISS, Chroma
from langchain_community.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings
from langchain_community.retrievers import BM25Retriever, TFIDFRetriever
from langchain.retrievers import EnsembleRetriever

# Loader classes
from langchain_community.document_loaders import (
    CSVLoader,
    PDFMinerLoader,
    PyPDFLoader,
    TextLoader,
    UnstructuredEmailLoader,
    UnstructuredHTMLLoader,
    UnstructuredMarkdownLoader,
    UnstructuredODTLoader,
    UnstructuredPowerPointLoader,
    UnstructuredWordDocumentLoader,
    WebBaseLoader,
    YoutubeLoader,
    DirectoryLoader,
)

# Annotations
from langchain_core.retrievers import BaseRetriever
from langchain.docstore.document import Document
from langchain_core.vectorstores import VectorStore
from langchain_core.embeddings import Embeddings

### Конфиг

In [None]:
# from pathlib import Path

# from langchain_community.vectorstores import FAISS, Chroma
# from langchain_community.retrievers import BM25Retriever, TFIDFRetriever

# from langchain_community.document_loaders import (
#     CSVLoader,
#     PDFMinerLoader,
#     PyPDFLoader,
#     TextLoader,
#     UnstructuredEmailLoader,
#     UnstructuredHTMLLoader,
#     UnstructuredMarkdownLoader,
#     UnstructuredODTLoader,
#     UnstructuredPowerPointLoader,
#     UnstructuredWordDocumentLoader,
#     WebBaseLoader,
#     YoutubeLoader,
#     DirectoryLoader,
# )


LOADER_CLASSES = {
    '.csv': CSVLoader,  # ".csv": (CSVLoader, {"csv_args": {"delimiter": ";"}})
    '.doc': UnstructuredWordDocumentLoader,
    '.docx': UnstructuredWordDocumentLoader,
    '.html': UnstructuredHTMLLoader,
    '.md': UnstructuredMarkdownLoader,
    '.pdf': PDFMinerLoader,
    '.ppt': UnstructuredPowerPointLoader,
    '.pptx': UnstructuredPowerPointLoader,
    '.txt': TextLoader,
    'web': WebBaseLoader,
    'directory': DirectoryLoader,
    'youtube': YoutubeLoader,
}

# список классов ретривера либо классов БД с методом as_retriever()
RETRIEVER_CLASSES = [BM25Retriever, FAISS, TFIDFRetriever]
RETRIEVER_NAMES = [cls.__name__ for cls in RETRIEVER_CLASSES]
SUBTITLES_LANGUAGES = ["Russian", "English"]

# начальные настройки бота при первом запуске
HISTORY_LEN = 0

GENERATE_KWARGS = dict(
    top_k=40,  # 40 default
    top_p=0.9,  # 0.95 default
    temp=0.8,  # 0.8 default
    repeat_penalty=1.0,  # 1.1 default
    )

# шаблоны промтов для юзера, бота и системный
# ADD_BOS - добавлять ли токен BOS один раз вначале промта юзера
TEMPLATES = {
    'Openchat': {
        'SYSTEM_TEMPLATE': '',
        'USER_TEMPLATE': ' GPT4 Correct User: {message}<|end_of_turn|>',
        'BOT_TEMPLATE': ' GPT4 Correct Assistant: {message}<|end_of_turn|>',
        'ADD_BOS': False,
        },

    'Saiga mistral': {
        'SYSTEM_TEMPLATE': '''<s> system
Ты — Сайга, русскоязычный автоматический ассистент. Ты разговариваешь с людьми и помогаешь им.
</s> ''',
        'USER_TEMPLATE': '<s> user\n{message}\n</s> ',
        'BOT_TEMPLATE': '<s> bot\n{message}\n</s> ',
        'ADD_BOS': False,
        },

    'Mistral instruct': {
        'SYSTEM_TEMPLATE': '',
        'USER_TEMPLATE': ' [INST] {message} [/INST]',
        'BOT_TEMPLATE': '{message}</s>  ',
        'ADD_BOS': True,
        },
    }

# шаблон промта при условии контекста
CONTEXT_TEMPLATE = '''Дан контекст:
{context}
Дан вопрос:
{message}
Ответ:'''

CSS = '''
.gradio-container {width: 80% !important}
'''

# пути куда будут скачиваться модели
MODELS_PATH = Path('models')
EMBED_MODELS_PATH = Path('embed_models')

MODELS_PATH.mkdir(exist_ok=True)
EMBED_MODELS_PATH.mkdir(exist_ok=True)

# список готовых моделей эмбедингов
EMBEDERS_IDS = [
    # https://huggingface.co/cointegrated/rubert-tiny2  # 118 MB
    'cointegrated/rubert-tiny2',
    # https://huggingface.co/cointegrated/LaBSE-en-ru  # 438 MB
    'cointegrated/LaBSE-en-ru',
    # https://huggingface.co/intfloat/multilingual-e5-large  # 2.24 GB
    'intfloat/multilingual-e5-large',
    # https://huggingface.co/intfloat/multilingual-e5-base  # 1.11 GB
    'intfloat/multilingual-e5-base',
    # https://huggingface.co/intfloat/multilingual-e5-small  # 471 MB
    'intfloat/multilingual-e5-small',
    # https://huggingface.co/sentence-transformers/all-mpnet-base-v2  # 438 MB
    'sentence-transformers/all-mpnet-base-v2',
    # https://huggingface.co/sentence-transformers/paraphrase-multilingual-mpnet-base-v2  # 1.11 GB
    'sentence-transformers/paraphrase-multilingual-mpnet-base-v2',
    # https://huggingface.co/inkoziev/sbert_pq/tree/main  # 438 MB
    'inkoziev/sbert_pq',
    # https://huggingface.co/ai-forever?search_models=ruElectra  # 117 MB
    'ai-forever/ruElectra-medium',
    # https://huggingface.co/ai-forever/sbert_large_nlu_ru  # 1.71 GB
    'ai-forever/sbert_large_nlu_ru',
]

# список готовых моделей GGUF
MODEL_IDS = [
    # https://huggingface.co/TheBloke/openchat-3.5-0106-GGUF
    'TheBloke/openchat-3.5-0106-GGUF',
    # https://huggingface.co/IlyaGusev/saiga_mistral_7b_gguf
    'IlyaGusev/saiga_mistral_7b_gguf',
    # https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.1-GGUF
    'TheBloke/Mistral-7B-Instruct-v0.1-GGUF',
]

### Функции

In [None]:
# import gc
# import psutil
# from pathlib import Path
# from shutil import rmtree
# from typing import List, Tuple, Dict, Union, Iterable, Optional, Any
# import csv

# import requests
# from requests.exceptions import MissingSchema
# import torch
# from huggingface_hub import hf_hub_download, list_repo_tree, list_repo_files, repo_info, repo_exists, snapshot_download
# from llama_cpp import Llama
# from youtube_transcript_api import YouTubeTranscriptApi
# import gradio as gr

# from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
# from langchain_community.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings
# from langchain.retrievers import EnsembleRetriever

# # Annotations
# from langchain_core.retrievers import BaseRetriever
# from langchain.docstore.document import Document
# from langchain_core.vectorstores import VectorStore
# from langchain_core.embeddings import Embeddings

# from config import (
#     MODELS_PATH,
#     EMBED_MODELS_PATH,
#     LOADER_CLASSES,
#     RETRIEVER_CLASSES,
#     GENERATE_KWARGS,
#     )


# получение количества свободной памяти на диске, CPU и GPU
def get_memory_usage() -> str:
    print_memory = ''

    memory_type = 'Disk'
    psutil_stats = psutil.disk_usage('.')
    memory_total = psutil_stats.total / 1024**3
    memory_usage = psutil_stats.used / 1024**3
    print_memory += f'{memory_type} Menory Usage: {memory_usage:.2f} / {memory_total:.2f} GB\n'

    memory_type = 'CPU'
    psutil_stats = psutil.virtual_memory()
    memory_total = psutil_stats.total / 1024**3
    memory_usage =  memory_total - (psutil_stats.available / 1024**3)
    print_memory += f'{memory_type} Menory Usage: {memory_usage:.2f} / {memory_total:.2f} GB\n'

    if torch.cuda.is_available():
        memory_type = 'GPU'
        memory_free, memory_total = torch.cuda.mem_get_info()
        memory_usage = memory_total - memory_free
        print_memory += f'{memory_type} Menory Usage: {memory_usage / 1024**3:.2f} / {memory_total:.2f} GB\n'

    print_memory = f'---------------\n{print_memory}---------------\n'
    return print_memory


def clear_memory() -> None:
    gc.collect()
    torch.cuda.empty_cache()


# загрузка и инициализация модели GGUF
def load_model(model_id: str, model_file: str, n_ctx: int) -> Tuple[Dict[str, Llama], str]:
    model = None
    if isinstance(model_file, list):
        load_log = 'Не выбрана модель'
        return model, load_log

    load_log = ''
    if '(' in model_file:
        model_file = model_file.split('(')[0].rstrip()

    progress = gr.Progress()
    progress(0.3, desc='Шаг 1/2: Загрузка модели GGUF')
    model_path = MODELS_PATH / model_file
    if model_path.is_file():
        load_log += 'Модель уже загружена, повторная инициализация\n'
    else:
        try:
            hf_hub_download(
                repo_id=model_id,
                filename=model_file,
                local_dir=MODELS_PATH,
                local_dir_use_symlinks=False,
                )
            load_log += 'Модель успешно загружена\n'
        except Exception as ex:
            model_path = ''
            load_log += f'Ошибка загрузки модели, код ошибки:\n{ex}\n'

    if model_path:
        progress(0.7, desc='Шаг 2/2: Инициализация модели')
        try:
            model = Llama(model_path=str(model_path), n_ctx=n_ctx, n_gpu_layers=-1)
            load_log += f'Модель {model_id}/{model_file} инициализирована\n'
        except Exception as ex:
            load_log += f'Ошибка инициализации модели, код ошибки:\n{ex}\n'

    model = {'model': model}
    clear_memory()
    return model, load_log


# загрузка и инициализация модели эмбедингов
def load_embed_model(model_id: str) -> Tuple[Dict[str, HuggingFaceEmbeddings], str]:
    embed_model = None
    if isinstance(model_id, list):
        load_log = 'Не выбрана модель'
        return embed_model, load_log

    load_log = ''
    progress = gr.Progress()

    folder_name = model_id.replace('/', '_')
    folder_path = EMBED_MODELS_PATH / folder_name

    if Path(folder_path).is_dir():
        load_log += f'Повторная инициализация модели {model_id} \n'
    else:
        progress(0.5, desc='Шаг 1/2: Загрузка модели')
        snapshot_download(
            repo_id=model_id,
            local_dir=folder_path,
            ignore_patterns='*.h5',
            local_dir_use_symlinks=False,
        )

    progress(0.7, desc='Шаг 2/2: Инициализация модели')
    model_kwargs = {'device': 'cuda' if torch.cuda.is_available() else 'cpu'}
    embed_model = HuggingFaceEmbeddings(
        model_name=str(folder_path),
        model_kwargs=model_kwargs,
        )
    load_log += f'Модель эмбедингов {model_id} инициализирована\n'
    load_log += f'Выберите ретривер и загрузите документы повторно\n'

    embed_model = {'embed_model': embed_model}
    clear_memory()
    return embed_model, load_log


# добавление ноового репозитория HF new_model_id к текущему списку model_ids
def add_new_model_id(new_model_id: str, model_ids: List[str]) -> Tuple[gr.Dropdown, str]:
    load_log = ''
    model_id = new_model_id.strip()

    if model_id:
        model_id = model_id.split('/')[-2:]
        if len(model_id) == 2:
            model_id = '/'.join(model_id).split('?')[0]
            if repo_exists(model_id) and model_id not in model_ids:
                if any([file_name.endswith('.gguf') for file_name in list_repo_files(model_id)]):
                    model_ids.insert(0, model_id)
                    load_log += f'Репозиторий модели {model_id} успешно добавлен\n'
                else:
                    load_log += f'Не найдены модели GGUF в репозитории {model_id}\n'
            else:
                load_log += 'Неверное название репозитория HF или модель уже есть в списке\n'
        else:
            load_log += 'Неверная ссылка на репозиторий HF\n'
    else:
        load_log += 'Пустая строка в поле репозитория HF\n'

    model_id_dropdown = gr.Dropdown(choices=model_ids, value=model_ids[0])
    return model_id_dropdown, load_log


# получить список моделей GGUF из репозитория HF
def get_gguf_model_names(model_id: str) -> gr.Dropdown:
    repo_files = list(list_repo_tree(model_id))
    repo_files = [file for file in repo_files if file.path.endswith('.gguf')]
    model_paths = [f'{file.path} ({file.size / 1000 ** 3:.2f}G)' for file in repo_files]

    model_paths_dropdown = gr.Dropdown(
        choices=model_paths,
        value=model_paths[0],
        label='Файл модели GGUF',
        )
    return model_paths_dropdown


# удаление файлов и папок моделей для очистки места кроме текущей модели ignore_link
def clear_folder(ignore_link: str) -> None:
    folder = EMBED_MODELS_PATH
    if len(ignore_link.split('/')) != 2:
        folder = MODELS_PATH
        if '(' in ignore_link:
            ignore_link = ignore_link.split('(')[0].rstrip()

    for path in folder.iterdir():
        if path.name == ignore_link:
            continue
        if path.is_file():
            path.unlink()
        elif path.is_dir():
            rmtree(path)
    clear_memory()


# очистка текста
def clear_text(text: str) -> str:
    lines = text.split('\n')
    lines = [line for line in lines if len(line.strip()) > 2]
    text = '\n'.join(lines).strip()
    return text


def clear_documents(documents: List[Document]) -> List[Document]:
    output_documents = []
    for document in documents:
        text = clear_text(document.page_content)
        if len(text) > 10:
            document.page_content = text
            output_documents.append(document)
    return output_documents


# получение разделителя для csv файла на слуяай если он отличется от ','
def get_csv_delimiter(file_path: str) -> str:
    n_bytes = 4096
    with open(file_path) as csvfile:
        delimiter = csv.Sniffer().sniff(csvfile.read(n_bytes)).delimiter
    return delimiter


# извлечение документов (в формате langchain Documents) из загруженных файлов
def load_documents_from_files(upload_files: List[str]) -> Tuple[List[Document], str]:
    load_log = ''
    documents = []

    for upload_file in upload_files:
        file_extension = f'.{upload_file.split(".")[-1]}'
        if file_extension in LOADER_CLASSES:
            loader_class = LOADER_CLASSES[file_extension]
            loader_kwargs = {}
            if file_extension == '.csv':
                delimiter = get_csv_delimiter(upload_file)
                loader_kwargs = {'csv_args': {'delimiter': delimiter}}
            try:
                load_documents = loader_class(upload_file, **loader_kwargs).load()
                documents.extend(load_documents)
            except Exception as ex:
                load_log += f'Ошибка загрузки файла {upload_file}\n'
                load_log += f'Код ошибки: {ex}\n'
                continue
        else:
            load_log += f'Неподдерживаемый формат файла {upload_file}\n'
            continue
    return documents, load_log


# извлечение документов (в формате langchain Documents) из WEB ссылок
def load_documents_from_links(
        web_links: str,
        subtitles_lang: str,
        ) -> Tuple[List[Document], str]:
    load_log = ''
    documents = []
    loader_class_kwargs = {}
    # фильтрация пустых строк
    web_links = [web_link.strip() for web_link in web_links.split('\n') if web_link.strip()]

    for web_link in web_links:
        # ----------------- ссылка на YouTube ---------------------------------
        if 'youtube.com' in web_link:
            # проверка что субтитры на выбранном языке subtitles_lang доступны в видео web_link
            youtube_id = web_link.split('watch?v=')[-1].split('&')[0]
            available_langs = [t.language for t in list(YouTubeTranscriptApi.list_transcripts(youtube_id))]
            if subtitles_lang not in str(available_langs):
                load_log += f'Язык субтитров {subtitles_lang} недоступен для видео {web_link}\n'
                continue
            # если доступны только автоматические субтитры - запись в логи
            if len(available_langs) == 1 and 'auto-generated' in str(available_langs):
                load_log += f'Загружены автоматические субтитры, ручные недоступны для видео {web_link}\n'
            # инициализация YouTubeLoader с параметром языка субтитров
            loader_class = LOADER_CLASSES['youtube'].from_youtube_url
            language = subtitles_lang[:2].lower()
            loader_class_kwargs = {'language': language}

        # ----------------- ссылка не на YouTube ------------------------------
        else:
            loader_class = LOADER_CLASSES['web']
        try:
            if requests.get(web_link).status_code != 200:
                load_log += f'Ссылка недоступна для Python: {web_link}\n'
                continue
            load_documents = loader_class(web_link, **loader_class_kwargs).load()
            documents.extend(load_documents)
        except MissingSchema:
            load_log += f'Неверная ссылка: {web_link}\n'
            continue
        except Exception as ex:
            load_log += f'Ошибка загрузки лоадером данных по ссылке: {web_link}\n'
            load_log += f'Код ошибки: {ex}\n'
            continue
    return documents, load_log


# инициализация ретривера или ансамбля
def create_retriver(
        embed_model: Dict[str, HuggingFaceEmbeddings],
        documents: List[Document],
        retriever_classes: List[BaseRetriever],
        k: int,
        score_threshold: float,
        ) -> BaseRetriever:

    retrievers = []
    for retriever_class in retriever_classes:
        # инициализация ретривера в зависимости от класса
        if retriever_class.__name__ in ('FAISS', 'Chroma'):
            db = retriever_class.from_documents(
                documents=documents,
                embedding=embed_model,
                )
            retriever = db.as_retriever(
                search_type='similarity_score_threshold',
                search_kwargs={'k': k, 'score_threshold': score_threshold},
                )
        else:
            retriever = retriever_class.from_documents(documents=documents)
            retriever.k = k
        retrievers.append(retriever)

    if len(retrievers) == 1:
        final_retriver = retrievers[0]

    # инициализация ансамбля если выбрано несколько ретриверов
    else:
        weights = [0.5] * len(retrievers)
        final_retriver = EnsembleRetriever(retrievers=retrievers, weights=weights)

    return final_retriver


# загрузка файлов и формирование документов с ретривером
# если параметр k выбран 'max' то в качестве контекста к промту
# будут использованы все найденные фрагменты текста
def load_documents_and_create_retriver(
        upload_files: Optional[List[str]],
        web_links: str,
        subtitles_lang: str,
        chunk_size: int,
        chunk_overlap: int,
        k: Union[int, str],
        score_threshold: float,
        retriever_indexes: List[int],
        embed_model: Dict[str, HuggingFaceEmbeddings],
        ) -> Tuple[List[Document], Optional[BaseRetriever], str]:

    documents = []
    retriever = None

    embed_model = embed_model.get('embed_model')
    if embed_model is None:
        load_log = 'Не инициализирована модель эмбедингов'
        return documents, retriever, load_log

    if not retriever_indexes:
        load_log = 'Не выбран ретривер'
        return documents, retriever, load_log

    if upload_files is None and not web_links:
        load_log = 'Не выбраны файлы или ссылки'
        return documents, retriever, load_log


    progress = gr.Progress()
    all_documents = []
    load_log = ''

    # загрузка документов из файлов
    if upload_files is not None:
        progress(0.3, desc='Шаг 1/2: Загрузка документов из файлов')
        docs, log = load_documents_from_files(upload_files)
        all_documents.extend(docs)
        load_log += log

    # загрузка документов по ссылкам
    if web_links:
        progress(0.3 if upload_files is None else 0.5, desc='Шаг 1/2: Загрузка документов по ссылкам')
        docs, log = load_documents_from_links(web_links, subtitles_lang)
        all_documents.extend(docs)
        load_log += log

    if len(all_documents) == 0:
        load_log += 'Загрузка прервана так как не было извлечено ни одного документа\n'
        load_log += 'Режим RAG не может быть активирован'
        return documents, retriever, load_log

    load_log += f'Загружено документов: {len(all_documents)}\n'

    # использовать документы все документы без разделения на фрагменты
    if k == 'max':
        documents = clear_documents(all_documents)
        load_log += f'Используются документы без разделения в кол-ве: {len(documents)}\n'
        k = 1
    # разделить документы на фрагменты
    else:
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )
        documents = text_splitter.split_documents(all_documents)
        documents = clear_documents(documents)
        load_log += f'Документы разделены, кол-во фрагментов текста: {len(documents)}\n'

    # создание ретривера или ансамбля
    progress(0.7, desc='Шаг 2/2: Инициализация ретривера')

    retriever_classes = [RETRIEVER_CLASSES[i] for i in retriever_indexes]
    retriever = create_retriver(embed_model, documents, retriever_classes, k, score_threshold)

    load_log += 'Режим RAG активирован и может быть дективирован на вкладке Generate'
    return documents, retriever, load_log


# загрузка документов, разбитие на фрагменты и инициализация ретривера
def get_promt_with_context(
        user_message: str,
        chatbot: List[List[Optional[str]]],
        history_len: int,
        rag_mode: bool,
        retriever: BaseRetriever,
        bos_token: str,
        context_template,
        *template_components: List[str],
        ) -> Tuple[str, List[List[Optional[str]]], str]:

    chatbot.append([user_message, None])
    # если сообщение пустое - обработаем это в следующей функции generate_text
    if not user_message.strip():
        return '', chatbot, full_promt

    # извлечение шаблонов
    CONTEXT_TEMPLATE = context_template
    SYSTEM_TEMPLATE, USER_TEMPLATE, BOT_TEMPLATE, ADD_BOS = template_components

    # формирование промта из шаблонов
    full_promt = ''
    if SYSTEM_TEMPLATE:
        full_promt += SYSTEM_TEMPLATE

    # добавлять ли токен BOS один раз вначале промта эзера
    if ADD_BOS:
        full_promt += bos_token

    # формирование промта с историей если она есть и параметр history_len != 0
    if history_len != 0:
        for user_msg, bot_msg in chatbot[:-1][-history_len:]:
            full_promt += USER_TEMPLATE.format(message=user_msg)
            full_promt += BOT_TEMPLATE.format(message=bot_msg)

    # если ретривер готов и включен режим RAG то ищем релевантные доки и добавляем в промт
    if retriever is not None and rag_mode:
        retriever_docs = retriever.invoke(user_message)  #--------------------------------------------------------------------------------
        retriever_context = '\n'.join([doc.page_content for doc in retriever_docs])
        user_message = CONTEXT_TEMPLATE.format(
            message=user_message,
            context=retriever_context,
            )

    # формирование последнего сообщения промта от юзера
    full_promt += USER_TEMPLATE.format(message=user_message)
    full_promt += BOT_TEMPLATE.split('{message}')[0].rstrip()
    return '', chatbot, full_promt


# генерация текста моделью
def generate_text(
        chatbot: List[List[Optional[str]]],
        full_promt: str,
        history_len: int,
        do_sample: bool,
        model: Dict[str, Llama],
        *generate_args: List[Union[int, float]],
        ) -> List[List[Optional[str]]]:

    model = model.get('model')
    if model is None:
        gr.Info('Не выбрана модель GGUF')
        yield chatbot[:-1]

    # если сообщение эзера пустое - ничего не делать
    if chatbot[-1][0].strip() == '':
        yield chatbot[:-1]

    # генерация ответа моделью
    else:
        promt_tokens = model.tokenize(full_promt.encode('utf-8'), special=True, add_bos=False)
        gen_kwargs = dict(zip(GENERATE_KWARGS.keys(), generate_args))
        if not do_sample:
            gen_kwargs['top_k'] = 1
            gen_kwargs['repeat_penalty'] = 1

        # generator = 'И тебе привет'
        chatbot[-1][1] = ''
        generator = model.generate(promt_tokens, **gen_kwargs)
        for token in generator:
            if token == model.token_eos():
                break
            character = model.detokenize([token]).decode('utf-8', errors='ignore')
            # character = token
            chatbot[-1][1] += character
            yield chatbot

### Приложение

In [None]:
# import gradio as gr

# from utils import (
#     get_memory_usage,
#     load_model,
#     load_embed_model,
#     add_new_model_id,
#     get_gguf_model_names,
#     clear_folder,
#     load_documents_and_create_retriver,
#     get_promt_with_context,
#     generate_text,
# )

# from config import (
#     MODEL_IDS,
#     EMBEDERS_IDS,
#     TEMPLATES,
#     CONTEXT_TEMPLATE,
#     RETRIEVER_NAMES,
#     SUBTITLES_LANGUAGES,
#     HISTORY_LEN,
#     GENERATE_KWARGS,
# )


# загрузка и инициализация моделей
start_model = load_model(MODEL_IDS[0], 'openchat-3.5-0106.Q2_K.gguf', 2000)[0]['model']
start_embed_model = load_embed_model(EMBEDERS_IDS[0])[0]['embed_model']

# шаблон промта при старте модели
DEFAULT_TEMPLATE = TEMPLATES['Openchat']


# =================== ИНТЕРФЕЙС ПРИЛОЖЕНИЯ ===========================

with gr.Blocks(theme=gr.themes.Monochrome()) as demo:  # css=CSS

    # ==================== СОСТОЯНИЯ ===============================

    # загруженные фрагменты текста (список объектов langchain Document)
    documents = gr.State([])
    # ретривер
    retriever = gr.State(None)
    # полный текст текущего промта в сыром виде вместе с контекстом и служебными токенами
    full_promt = gr.State('')

    # списки готовых репозиториев моделей HF
    model_ids_state = gr.State(MODEL_IDS)
    embed_ids_state = gr.State(EMBEDERS_IDS)

    # модели GGUF и эмбедингов
    model = gr.State({'model': start_model})
    embed_model = gr.State({'embed_model': start_embed_model})

    # ==================== СТРАНИЦА БОТА =================================

    # рендер окна чат бота с названием текущего режима - RAG или Chatbot
    def get_chatbot(chatbot_history: list = [], rag_mode_label: bool = False):
        label = 'RAG' if rag_mode_label else 'Chatbot'
        chatbot = gr.Chatbot(
                value=chatbot_history,
                # height=300,
                show_copy_button=True,
                bubble_full_width=False,
                label=label,
                )
        return chatbot

    with gr.Tab(label='Generate'):
        with gr.Row():
            with gr.Column(scale=3):
                # чат бот и кнопки отправить сообщение, стоп генерации и удалить историю чата
                chatbot = get_chatbot()
                user_message = gr.Textbox(label='User')
                with gr.Row():
                    user_message_btn = gr.Button('Отправить')
                    stop_btn = gr.Button('Стоп')
                    clear_btn = gr.Button('Очистить чат')

                # включить или выключить режим RAG даже если ретривер готов к работе
                rag_mode = gr.Checkbox(value=False, label='Режим RAG', scale=1, visible=False)
                rag_mode.change(fn=get_chatbot, inputs=[chatbot, rag_mode], outputs=chatbot)

            # ------------------ ПАРАМЕТРЫ ГЕНЕРАЦИИ -------------------------
            def get_generate_args(do_sample: bool):
                visible = do_sample
                generate_args = [
                    gr.Slider(5, 50, GENERATE_KWARGS['top_k'], step=5, label='top_k', visible=visible),
                    gr.Slider(0, 1, GENERATE_KWARGS['top_p'], step=0.1, label='top_p', visible=visible),
                    gr.Slider(0.1, 4, GENERATE_KWARGS['temp'], step=0.1, label='temperature', visible=visible),
                    gr.Slider(1, 5, GENERATE_KWARGS['repeat_penalty'], step=0.1, label='repeat penalty', visible=visible),
                ]
                return generate_args

            with gr.Column(scale=1, min_width=80):
                with gr.Group():
                    # длина истории которую будет учитывать модель
                    gr.Markdown('Размер истории')
                    history_len = gr.Slider(
                        minimum=0,
                        maximum=5,
                        value=HISTORY_LEN,
                        step=1,
                        info='Кол-во предыдущих сообщенией, учитываемых в истории',
                        label='history len',
                        show_label=False,
                        )

                    # with gr.Group():
                    #     gr.Markdown('Настройка отбора фрагментов для ретриривера Faiss или Chroma')
                    #     score_threshold = gr.Slider(
                    #         label='searh_score_threshold',
                    #         value=0.5,
                    #         minimum=0.1,
                    #         maximum=1,
                    #         step=0.1,
                    #         )

                    with gr.Group():
                        gr.Markdown('Параметры генерации')
                        # переключатель активации случайного семплирования при генерации текста моделью
                        do_sample = gr.Checkbox(
                            value=False,
                            label='do_sample',
                            info='Активация случайного семплирования',
                            )
                        # настройки семплирования
                        generate_args = get_generate_args(do_sample.value)

                        do_sample.change(
                            fn=get_generate_args,
                            inputs=do_sample,
                            outputs=generate_args,
                            show_progress=False,
                            )

        # ------------------ ТЕКСТ ПРОМТА С КОНТЕКСТОМ --------------------
        full_promt = gr.Textbox(
            label='Полный текст текущего промта с контекстом',
            interactive=False,
            )

        # -------- КОМПОНЕНТЫ РЕДАКТИРОВАНИЯ ПРОМТОВ И СПЕЦ ТОКЕНОВ --------
        def get_bos_token(visible=False, render=True):
            bos_token_component = gr.Textbox(
                value='<s>',
                label='Bos token',
                visible=visible,
                render=render,
                )
            return bos_token_component

        # добавлять токен {BOS} только один раз вначале шаблона юзера
        add_bos = gr.Checkbox(
            value=False,
            label='add bos',
            scale=1,
            render=False,
            info='Добавлять ли токен BOS один раз вначале диалога перед промтом пользователя',
            )
        bos_token = get_bos_token(visible=False, render=False)
        add_bos.change(
            fn=get_bos_token,
            inputs=add_bos,
            outputs=bos_token,
            show_progress=False,
            )

        # -------------------- ШАБЛОНЫ ПРОМТОВ ---------------------------
        # окно редактирования шаблона системного промта
        system_template = gr.Textbox(
            DEFAULT_TEMPLATE['SYSTEM_TEMPLATE'],
            label='System template',
            lines=4,
            render=False,
            )
        # окно редактирования шаблона промта пользователя
        user_template = gr.Textbox(
            DEFAULT_TEMPLATE['USER_TEMPLATE'],
            label='User template',
            lines=4,
            render=False,
            )
        # окно редактирования шаблона промта бота
        bot_template = gr.Textbox(
            DEFAULT_TEMPLATE['BOT_TEMPLATE'],
            label='Bot template',
            lines=4,
            render=False,
            )

        template_components = [system_template, user_template, bot_template, add_bos]

        # окно редактирования шаблона промта при условии контекста
        context_template = gr.Textbox(
            CONTEXT_TEMPLATE,
            label='Context template',
            lines=6,
            render=False,
            )

        # ------------------ КНОПКИ ОТПРАВИТЬ ОЧИСТИТЬ И СТОП ------------
        # нажатие Enter и кнопка отправить
        generate_event = gr.on(
            triggers=[user_message.submit, user_message_btn.click],
            fn=get_promt_with_context,
            inputs=[user_message, chatbot, history_len, rag_mode, retriever,
            bos_token, context_template, *template_components],
            outputs=[user_message, chatbot, full_promt],
            queue=True,
        ).then(
            fn=lambda promt: promt,
            inputs=full_promt,
            outputs=full_promt,
            queue=False,
        ).then(
            fn=generate_text,
            inputs=[chatbot, full_promt, history_len, do_sample, model, *generate_args],
            outputs=chatbot,
            queue=True,
            )
        # кнопка Стоп
        stop_btn.click(
            fn=None,
            inputs=None,
            outputs=None,
            cancels=generate_event,
            queue=False,
        )
        # кнопка Очистить чат
        clear_btn.click(
            fn=lambda: (None, ''),
            inputs=None,
            outputs=[chatbot, full_promt],
            queue=False,
            )

    # ===================== СТРАНИЦА ЗАГРУЗКИ ФАЙЛОВ =========================

    # настройки параметров для нарезки текстов
    def get_chunk_size_overlap(k: int = 2):
        visible = k != 'max'
        chunk_size = gr.Slider(50, 2000, value=500, step=50, label='Длина фрагментов', visible=visible)
        chunk_overlap = gr.Slider(0, 200, value=20, step=10, label="Длина пересечения фрагментов", visible=visible)
        return chunk_size, chunk_overlap

    with gr.Tab(label='Load documents'):
        with gr.Row(variant='compact'):
            # загрузка файлов и ссылок
            upload_files = gr.File(file_count='multiple', label='Загрузка текстовых файлов')
            web_links = gr.Textbox(lines=8, label='Ссылки на Web сайты или Ютуб')

        with gr.Row(variant='compact'):
            with gr.Column(scale=2):
                # выбор ретриверов
                retriever_indexes = gr.CheckboxGroup(
                    choices=RETRIEVER_NAMES,
                    value=RETRIEVER_NAMES[0],
                    type='index',
                    label='Ретривер',
                    )

                with gr.Row(variant='compact'):
                    score_threshold = gr.Slider(
                        label='searh_score_threshold',
                        value=0.5,
                        minimum=0.1,
                        maximum=1,
                        step=0.1,
                        )

                    # кол-во релевантных кусков текста для поиска ретривером
                    k = gr.Radio(
                        choices=[1, 2, 3, 'max'],
                        value=2,
                        label='Количество релевантных документов для поиска',
                        )

                    # язык субтитров для YouTube
                    subtitles_lang = gr.Radio(
                        SUBTITLES_LANGUAGES,
                        value=SUBTITLES_LANGUAGES[0],
                        label="Язык субтитров YouTube",
                        )

            with gr.Column(scale=1, min_width=160):
                # параметры нарезки текста
                chunk_size, chunk_overlap = get_chunk_size_overlap()
                k.change(
                    get_chunk_size_overlap,
                    inputs=k,
                    outputs=[chunk_size, chunk_overlap],
                    show_progress=False,
                    )

        # кнопка загрузки документов и инициализации ретривера
        load_documents_btn = gr.Button(value='Загрузить документы и создать ретривер')
        # статус прогресса
        load_docs_log = gr.Textbox(label='Прогресс загрузки и разделения документов')

        # главный цикл загрузки доков и инициализации ретривера
        load_event = load_documents_btn.click(
            fn=load_documents_and_create_retriver,
            inputs=[upload_files, web_links, subtitles_lang, chunk_size,
                    chunk_overlap, k, score_threshold, retriever_indexes, embed_model],
            outputs=[documents, retriever, load_docs_log],
            )

        # сменить название бота на RAG или Chatbot в зависимости от готовности ретривера
        def load_success(chatbot_history, retriever):
            rag_mode = retriever is not None

            chatbot = get_chatbot(chatbot_history, rag_mode)
            rag_mode_checkbox = gr.Checkbox(value=rag_mode, label='Режим RAG', scale=1, visible=rag_mode)
            return chatbot, rag_mode_checkbox

        load_event.success(
            fn=load_success,
            inputs=[chatbot, retriever],
            outputs=[chatbot, rag_mode],
            )

    # ================= СТРАНИЦА ПРОСМОТРА ВСЕХ ДОКУМЕНТОВ =================

    with gr.Tab(label='View documents'):
        view_documents_btn = gr.Button(value='Отобразить загруженные фрагменты')
        view_documents_textbox = gr.Textbox(
            lines=1,
            placeholder='Для просмотра фрагментов загрузите документы на вкладке Load documents',
            label='Загруженные фрагменты',
            )
        sep = '=' * 20
        # отображение загруженных фрагментов текста если они готовы
        view_documents_btn.click(
            lambda documents: f'\n{sep}\n\n'.join([doc.page_content for doc in documents]),
            inputs=documents,
            outputs=view_documents_textbox,
        )

    # ================= СТРАНИЦА РЕДАКТИРОВАНИЯ ШАБЛОНА ПРОМТА ===============

    # выбор заготовок шаблонов для протмтов
    def select_template(template_name):
        template = list(TEMPLATES[template_name].values())
        return template

    # отображение результата ручного редактирования промтов на примере
    def get_example_prompt(bos_token, *template_components):
        user_message = 'Как дела?'
        chatbot_history = [['Привет', 'Здорова']]

        _, _, example_prompt = get_promt_with_context(
            user_message, chatbot_history, 1, False, None,
            bos_token, '', *template_components)

        example_prompt = gr.Textbox(
            value=example_prompt,
            label='Example prompt',
            lines=example_prompt.count('\n'),
            )
        return example_prompt

    with gr.Tab(label='Edit pomt templates'):
        with gr.Group():
            # выбор шаблонов промтов из предварительных
            gr.Markdown('Готовые шаблоны по умолчанию')
            default_template = gr.Dropdown(
                choices=list(TEMPLATES.keys()),
                value=None,
                label='Default Templates',
                )
            default_template.change(
                fn=select_template,
                inputs=default_template,
                outputs=[*template_components],
            )

        with gr.Group():
            gr.Markdown('Настройки форматирования промтов перед подачей в модель')
            with gr.Row(variant='compact'):
                # добавлять ли токен BOS один раз перед шаблоном юзера
                add_bos.render()
                bos_token.render()

        with gr.Row(variant='compact'):
            with gr.Column():
                # окна редактирования промтов (системный, юзер, бот)
                system_template.render()
                user_template.render()
                bot_template.render()

            with gr.Column():
                # окно отображения результата ручного редактирования промтов на примере
                example_prompt = get_example_prompt(
                    bos_token.value,
                    *[c.value for c in template_components],
                    )

        gr.on(
            triggers=[c.change for c in [add_bos, bos_token, *template_components]],
            fn=get_example_prompt,
            inputs=[bos_token, *template_components],
            outputs=[example_prompt],
            queue=False,
            )

        with gr.Group():
            # окно редактирования промта при условии контекста
            gr.Markdown('Настройки форматирования промта при условии контекста (RAG)')
            context_template.render()

    # ================= ЗАГРУЗКА GGUF МОДЕЛЕЙ ============================

    with gr.Tab('Load model'):
        # окно добавления нового репозитория с HF
        new_model_id = gr.Textbox(
            value='',
            label='Добавить репозиторий',
            placeholder='Ссылка на репозиторий HF моделей в формате GGUF',
            )
        new_model_btn = gr.Button('Добавить репозиторий')

        # выбор репозитория HF из доступных
        model_id = gr.Dropdown(
            choices=MODEL_IDS,
            value=None,
            label='Репозиторий модели HF',
            )
        # выбор модели GGUF из выбранного репозитория
        model_path = gr.Dropdown(
            choices=[],
            value=None,
            label='Файл модели GGUF',
            )
        # размер контекста для модели llama-cpp
        n_ctx = gr.Slider(500, 500 * 8, step=500, label='n_ctx')
        load_model_btn = gr.Button('Загрука и инициализация модели')

        # статус загрузки модели
        load_model_log = gr.Textbox(
            value=f'Модель {MODEL_IDS[0]} загружена по умолчанию',
            label='Статус загрузки модели',
            )

        with gr.Group():
            gr.Markdown('Освободить место на диске путем удаления всех моделей кроме текущей')
            remove_models_btn = gr.Button('Очистить папку')

        # добавление нового репозитория и отображение статуса
        new_model_btn.click(
            fn=add_new_model_id,
            inputs=[new_model_id, model_ids_state],
            outputs=[model_id, load_model_log],
        ).success(
            fn=lambda: '',
            inputs=None,
            outputs=new_model_id,
        )

        # получение списка моделей GGUF из выбранного репозитория
        model_id.change(
            fn=get_gguf_model_names,
            inputs=[model_id],
            outputs=[model_path],
        )

        # загрузка модели GGUF
        load_model_btn.click(
            fn=load_model,
            inputs=[model_id, model_path, n_ctx],
            outputs=[model, load_model_log],
            queue=True,
        ).success(
            fn=lambda log: log + get_memory_usage(),
            inputs=load_model_log,
            outputs=load_model_log,
        )

        # очистка папки с моделями кроме текущей загруженной
        remove_models_btn.click(
            fn=clear_folder,
            inputs=[model_path],
            outputs=None,
        ).success(
            fn=lambda model: f'Модели кроме {model} удалены',
            inputs=model_path,
            outputs=None,
        )

    # ================= ЗАГРУЗКА ЭМБЕДИНГ МОДЕЛЕЙ ========================
    with gr.Tab('Load embed model'):
        # окно добавления нового репозитория с HF
        new_embed_id = gr.Textbox(
            value='',
            label='Добавить репозиторий',
            placeholder='Ссылка на репозиторий модели HF',
            )
        new_embed_btn = gr.Button('Добавить репозиторий')

        # выбор репозитория HF из доступных
        embed_id = gr.Dropdown(
            choices=EMBEDERS_IDS,
            value=None,
            label='Репозиторий модели HF',
            )

        # статус загрузки модели
        load_embed_btn = gr.Button('Загрука и инициализация модели')
        load_embed_log = gr.Textbox(
            value=f'Модель {EMBEDERS_IDS[0]} загружена по умолчанию',
            label='Статус загрузки модели',
            )
        with gr.Group():
            gr.Markdown('Освободить место на диске путем удаления всех моделей кроме текущей')
            remove_embed_models_btn = gr.Button('Очистить папку')

        # добавление нового репозитория и отображение статуса
        new_embed_btn.click(
            fn=add_new_model_id,
            inputs=[new_embed_id, embed_ids_state],
            outputs=[embed_id, load_embed_log],
        ).success(
            fn=lambda: '',
            inputs=None,
            outputs=new_embed_id,
        )

        # загрузка модели эмбедингов
        load_embed_btn.click(
            fn=load_embed_model,
            inputs=[embed_id],
            outputs=[embed_model, load_embed_log],
        ).success(
            fn=lambda log: log + get_memory_usage(),
            inputs=load_embed_log,
            outputs=load_embed_log,
        )

        # очистка папки с моделями кроме текущей загруженной
        remove_embed_models_btn.click(
            fn=clear_folder,
            inputs=[embed_id],
            outputs=None,
        ).success(
            fn=lambda model: f'Модели кроме {model} удалены',
            inputs=embed_id,
            outputs=None,
        )

demo.queue().launch(debug=True)

llama_model_loader: loaded meta data with 23 key-value pairs and 291 tensors from models/openchat-3.5-0106.Q2_K.gguf (version GGUF V3 (latest))
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = llama
llama_model_loader: - kv   1:                               general.name str              = openchat_openchat-3.5-0106
llama_model_loader: - kv   2:                       llama.context_length u32              = 8192
llama_model_loader: - kv   3:                     llama.embedding_length u32              = 4096
llama_model_loader: - kv   4:                          llama.block_count u32              = 32
llama_model_loader: - kv   5:                  llama.feed_forward_length u32              = 14336
llama_model_loader: - kv   6:                 llama.rope.dimension_count u32              = 128
llama_model_loader: - kv   7:                 llama.attent

Здорова
Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://36a268dccb5d25b581.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://36a268dccb5d25b581.gradio.live




## Вариант с классами

Конфиг

In [None]:
from dataclasses import dataclass
from pathlib import Path

# from langchain_community.vectorstores import FAISS, Chroma
# from langchain_community.retrievers import BM25Retriever, TFIDFRetriever

# from langchain_community.document_loaders import (
#     CSVLoader,
#     PDFMinerLoader,
#     PyPDFLoader,
#     TextLoader,
#     UnstructuredEmailLoader,
#     UnstructuredHTMLLoader,
#     UnstructuredMarkdownLoader,
#     UnstructuredODTLoader,
#     UnstructuredPowerPointLoader,
#     UnstructuredWordDocumentLoader,
#     WebBaseLoader,
#     YoutubeLoader,
#     DirectoryLoader,
# )

@dataclass
class Config:

    LOADER_CLASSES = {
        '.csv': CSVLoader,
        '.doc': UnstructuredWordDocumentLoader,
        '.docx': UnstructuredWordDocumentLoader,
        '.html': UnstructuredHTMLLoader,
        '.md': UnstructuredMarkdownLoader,
        '.pdf': PDFMinerLoader,
        '.ppt': UnstructuredPowerPointLoader,
        '.pptx': UnstructuredPowerPointLoader,
        '.txt': TextLoader,
        'web': WebBaseLoader,
        'directory': DirectoryLoader,
        'youtube': YoutubeLoader,
    }

    TEMPLATES = {
        'Openchat': {
            'SYSTEM_TEMPLATE': '',
            'USER_TEMPLATE': ' GPT4 Correct User: {message}<|end_of_turn|>',
            'BOT_TEMPLATE': ' GPT4 Correct Assistant: {message}<|end_of_turn|>',
            'ADD_BOS': False,
            },

        'Saiga mistral': {
            'SYSTEM_TEMPLATE': '''<s> system
Ты — Сайга, русскоязычный автоматический ассистент. Ты разговариваешь с людьми и помогаешь им.
</s> ''',
            'USER_TEMPLATE': '<s> user\n{message}\n</s> ',
            'BOT_TEMPLATE': '<s> bot\n{message}\n</s> ',
            'ADD_BOS': False,
            },

        'Mistral instruct': {
            'SYSTEM_TEMPLATE': '',
            'USER_TEMPLATE': ' [INST] {message} [/INST]',
            'BOT_TEMPLATE': '{message}</s>  ',
            'ADD_BOS': True,
            },
        }

    CONTEXT_TEMPLATE = '''Дан контекст:
    {context}
    Дан вопрос:
    {message}
    Ответ:'''

    CSS = '''
    .gradio-container {width: 80% !important}
    '''

    MODELS_PATH = Path('models')
    EMBED_MODELS_PATH = Path('embed_models')

    EMBEDERS_IDS = [
        'cointegrated/rubert-tiny2',
        'cointegrated/LaBSE-en-ru',
        'intfloat/multilingual-e5-large',
        'intfloat/multilingual-e5-base',
        'intfloat/multilingual-e5-small',
        'sentence-transformers/all-mpnet-base-v2',
        'sentence-transformers/paraphrase-multilingual-mpnet-base-v2',
        'inkoziev/sbert_pq',
        'ai-forever/ruElectra-medium',
        'ai-forever/sbert_large_nlu_ru',
    ]

    MODEL_IDS = [
        'TheBloke/openchat-3.5-0106-GGUF',
        'IlyaGusev/saiga_mistral_7b_gguf',
        'TheBloke/Mistral-7B-Instruct-v0.1-GGUF',
    ]

    RETRIEVER_CLASSES = [BM25Retriever, FAISS, TFIDFRetriever]

    SUBTITLES_LANGUAGES = ['Russian', 'English']

    HISTORY_LEN = 0
    GENERATE_KWARGS = dict(
        top_k=40,
        top_p=0.9,
        temp=0.8,
        repeat_penalty=1.0,
        )

    def __post_init__(self):
        self.RETRIEVER_NAMES = [cls.__name__ for cls in self.RETRIEVER_CLASSES]


CONFIG = Config()

CONFIG.MODELS_PATH.mkdir(exist_ok=True)
CONFIG.EMBED_MODELS_PATH.mkdir(exist_ok=True)

In [None]:
CONFIG.TEMPLATES

{'Openchat': {'SYSTEM_TEMPLATE': '',
  'USER_TEMPLATE': ' GPT4 Correct User: {message}<|end_of_turn|>',
  'BOT_TEMPLATE': ' GPT4 Correct Assistant: {message}<|end_of_turn|>',
  'ADD_BOS': False},
 'Saiga mistral': {'SYSTEM_TEMPLATE': '<s> system\nТы — Сайга, русскоязычный автоматический ассистент. Ты разговариваешь с людьми и помогаешь им.\n</s> ',
  'USER_TEMPLATE': '<s> user\n{message}\n</s> ',
  'BOT_TEMPLATE': '<s> bot\n{message}\n</s> ',
  'ADD_BOS': False},
 'Mistral instruct': {'SYSTEM_TEMPLATE': '',
  'USER_TEMPLATE': ' [INST] {message} [/INST]',
  'BOT_TEMPLATE': '{message}</s>  ',
  'ADD_BOS': True}}

In [None]:
CONFIG.RETRIEVER_NAMES

['BM25Retriever', 'FAISS', 'TFIDFRetriever']

Классы

Не делал только начал

In [None]:
# # from utils import (
# #     get_memory_usage,
# #     load_model,
# #     load_embed_model,
# #     add_new_model_id,
# #     get_gguf_model_names,
# #     clear_folder,
# #     load_documents_and_create_retriver,
# #     get_promt_with_context,
# #     generate_text,
# # )

# # from config import (
# #     MODEL_IDS,
# #     EMBEDERS_IDS,
# #     TEMPLATES,
# #     CONTEXT_TEMPLATE,
# #     RETRIEVER_NAMES,
# #     SUBTITLES_LANGUAGES,
# #     HISTORY_LEN,
# #     GENERATE_KWARGS,
# # )


# class RagUtils:

#     # очистка текста
#     def clear_text(text: str) -> str:
#         lines = text.split('\n')
#         lines = [line for line in lines if len(line.strip()) > 2]
#         text = '\n'.join(lines).strip()
#         return text

#     def clear_documents(documents: List[Document]) -> List[Document]:
#         output_documents = []
#         for document in documents:
#             text = clear_text(document.page_content)
#             if len(text) > 10:
#                 document.page_content = text
#                 output_documents.append(document)
#         return output_documents


#     # получение разделителя для csv файла на слуяай если он отличется от ','
#     def get_csv_delimiter(file_path: str) -> str:
#         n_bytes = 4096
#         with open(file_path) as csvfile:
#             delimiter = csv.Sniffer().sniff(csvfile.read(n_bytes)).delimiter
#         return delimiter


#     # извлечение документов (в формате langchain Documents) из загруженных файлов
#     def load_documents_from_files(upload_files: List[str]) -> Tuple[List[Document], str]:
#         load_log = ''
#         documents = []

#         for upload_file in upload_files:
#             file_extension = f'.{upload_file.split(".")[-1]}'
#             if file_extension in LOADER_CLASSES:
#                 loader_class = LOADER_CLASSES[file_extension]
#                 loader_kwargs = {}
#                 if file_extension == '.csv':
#                     delimiter = get_csv_delimiter(upload_file)
#                     loader_kwargs = {'csv_args': {'delimiter': delimiter}}
#                 try:
#                     load_documents = loader_class(upload_file, **loader_kwargs).load()
#                     documents.extend(load_documents)
#                 except Exception as ex:
#                     load_log += f'Ошибка загрузки файла {upload_file}\n'
#                     load_log += f'Код ошибки: {ex}\n'
#                     continue
#             else:
#                 load_log += f'Неподдерживаемый формат файла {upload_file}\n'
#                 continue
#         return documents, load_log


#     # извлечение документов (в формате langchain Documents) из WEB ссылок
#     def load_documents_from_links(
#             web_links: str,
#             subtitles_lang: str,
#             ) -> Tuple[List[Document], str]:
#         load_log = ''
#         documents = []
#         loader_class_kwargs = {}
#         # фильтрация пустых строк
#         web_links = [web_link.strip() for web_link in web_links.split('\n') if web_link.strip()]

#         for web_link in web_links:
#             # ----------------- ссылка на YouTube ---------------------------------
#             if 'youtube.com' in web_link:
#                 # проверка что субтитры на выбранном языке subtitles_lang доступны в видео web_link
#                 youtube_id = web_link.split('watch?v=')[-1].split('&')[0]
#                 available_langs = [t.language for t in list(YouTubeTranscriptApi.list_transcripts(youtube_id))]
#                 if subtitles_lang not in str(available_langs):
#                     load_log += f'Язык субтитров {subtitles_lang} недоступен для видео {web_link}\n'
#                     continue
#                 # если доступны только автоматические субтитры - запись в логи
#                 if len(available_langs) == 1 and 'auto-generated' in str(available_langs):
#                     load_log += f'Загружены автоматические субтитры, ручные недоступны для видео {web_link}\n'
#                 # инициализация YouTubeLoader с параметром языка субтитров
#                 loader_class = LOADER_CLASSES['youtube'].from_youtube_url
#                 language = subtitles_lang[:2].lower()
#                 loader_class_kwargs = {'language': language}

#             # ----------------- ссылка не на YouTube ------------------------------
#             else:
#                 loader_class = LOADER_CLASSES['web']
#             try:
#                 if requests.get(web_link).status_code != 200:
#                     load_log += f'Ссылка недоступна для Python: {web_link}\n'
#                     continue
#                 load_documents = loader_class(web_link, **loader_class_kwargs).load()
#                 documents.extend(load_documents)
#             except MissingSchema:
#                 load_log += f'Неверная ссылка: {web_link}\n'
#                 continue
#             except Exception as ex:
#                 load_log += f'Ошибка загрузки лоадером данных по ссылке: {web_link}\n'
#                 load_log += f'Код ошибки: {ex}\n'
#                 continue
#         return documents, load_log


#     # инициализация ретривера или ансамбля
#     def create_retriver(
#             embed_model: Dict[str, HuggingFaceEmbeddings],
#             documents: List[Document],
#             retriever_classes: List[BaseRetriever],
#             k: int,
#             ) -> BaseRetriever:

#         retrievers = []
#         for retriever_class in retriever_classes:
#             # инициализация ретривера в зависимости от класса
#             if retriever_class.__name__ in ('FAISS', 'Chroma'):
#                 db = retriever_class.from_documents(
#                     documents=documents,
#                     embedding=embed_model,
#                     )
#                 retriever = db.as_retriever(search_kwargs={'k': k})
#             else:
#                 retriever = retriever_class.from_documents(documents=documents)
#                 retriever.k = k
#             retrievers.append(retriever)

#         if len(retrievers) == 1:
#             final_retriver = retrievers[0]

#         # инициализация ансамбля если выбрано несколько ретриверов
#         else:
#             weights = [0.5] * len(retrievers)
#             final_retriver = EnsembleRetriever(retrievers=retrievers, weights=weights)

#         return final_retriver


#     # загрузка файлов и формирование документов с ретривером
#     # если параметр k выбран 'max' то в качестве контекста к промту
#     # будут использованы все найденные фрагменты текста
#     def load_documents_and_create_retriver(
#             upload_files: Optional[List[str]],
#             web_links: str,
#             subtitles_lang: str,
#             chunk_size: int,
#             chunk_overlap: int,
#             k: Union[int, str],
#             retriever_indexes: List[int],
#             embed_model: Dict[str, HuggingFaceEmbeddings],
#             ) -> Tuple[List[Document], Optional[BaseRetriever], str]:

#         documents = []
#         retriever = None

#         embed_model = embed_model.get('embed_model')
#         if embed_model is None:
#             load_log = 'Не инициализирована модель эмбедингов'
#             return documents, retriever, load_log

#         if not retriever_indexes:
#             load_log = 'Не выбран ретривер'
#             return documents, retriever, load_log

#         if upload_files is None and not web_links:
#             load_log = 'Не выбраны файлы или ссылки'
#             return documents, retriever, load_log


#         progress = gr.Progress()
#         all_documents = []
#         load_log = ''

#         # загрузка документов из файлов
#         if upload_files is not None:
#             progress(0.3, desc='Шаг 1/2: Загрузка документов из файлов')
#             docs, log = load_documents_from_files(upload_files)
#             all_documents.extend(docs)
#             load_log += log

#         # загрузка документов по ссылкам
#         if web_links:
#             progress(0.3 if upload_files is None else 0.5, desc='Шаг 1/2: Загрузка документов по ссылкам')
#             docs, log = load_documents_from_links(web_links, subtitles_lang)
#             all_documents.extend(docs)
#             load_log += log

#         if len(all_documents) == 0:
#             load_log += 'Загрузка прервана так как не было извлечено ни одного документа\n'
#             load_log += 'Режим RAG не может быть активирован'
#             return documents, retriever, load_log

#         load_log += f'Загружено документов: {len(all_documents)}\n'

#         # использовать документы все документы без разделения на фрагменты
#         if k == 'max':
#             documents = clear_documents(all_documents)
#             load_log += f'Используются документы без разделения в кол-ве: {len(documents)}\n'
#             k = 1
#         # разделить документы на фрагменты
#         else:
#             text_splitter = RecursiveCharacterTextSplitter(
#                 chunk_size=chunk_size,
#                 chunk_overlap=chunk_overlap,
#             )
#             documents = text_splitter.split_documents(all_documents)
#             documents = clear_documents(documents)
#             load_log += f'Документы разделены, кол-во фрагментов текста: {len(documents)}\n'

#         # создание ретривера или ансамбля
#         progress(0.7, desc='Шаг 2/2: Инициализация ретривера')

#         retriever_classes = [RETRIEVER_CLASSES[i] for i in retriever_indexes]
#         retriever = create_retriver(embed_model, documents, retriever_classes, k)

#         load_log += 'Режим RAG активирован и может быть дективирован на вкладке Generate'
#         return documents, retriever, load_log


#     # загрузка документов, разбитие на фрагменты и инициализация ретривера
#     def get_promt_with_context(
#             user_message: str,
#             chatbot: List[List[Optional[str]]],
#             history_len: int,
#             rag_mode: bool,
#             retriever: BaseRetriever,
#             bos_token: str,
#             context_template,
#             *template_components: List[str],
#             ) -> Tuple[str, List[List[Optional[str]]], str]:

#         chatbot.append([user_message, None])
#         # если сообщение пустое - обработаем это в следующей функции generate_text
#         if not user_message.strip():
#             return '', chatbot, full_promt

#         # извлечение шаблонов
#         CONTEXT_TEMPLATE = context_template
#         SYSTEM_TEMPLATE, USER_TEMPLATE, BOT_TEMPLATE, ADD_BOS = template_components

#         # формирование промта из шаблонов
#         full_promt = ''
#         if SYSTEM_TEMPLATE:
#             full_promt += SYSTEM_TEMPLATE

#         # добавлять ли токен BOS один раз вначале промта эзера
#         if ADD_BOS:
#             full_promt += bos_token

#         # формирование промта с историей если она есть и параметр history_len != 0
#         if history_len != 0:
#             for user_msg, bot_msg in chatbot[:-1][-history_len:]:
#                 full_promt += USER_TEMPLATE.format(message=user_msg)
#                 full_promt += BOT_TEMPLATE.format(message=bot_msg)
#                 print(bot_msg)

#         # если ретривер готов и включен режим RAG то ищем релевантные доки и добавляем в промт
#         if retriever is not None and rag_mode:
#             retriever_docs = retriever.invoke(user_message)
#             retriever_context = '\n'.join([doc.page_content for doc in retriever_docs])
#             user_message = CONTEXT_TEMPLATE.format(
#                 message=user_message,
#                 context=retriever_context,
#                 )

#         # формирование последнего сообщения промта от юзера
#         full_promt += USER_TEMPLATE.format(message=user_message)
#         full_promt += BOT_TEMPLATE.split('{message}')[0].rstrip()
#         return '', chatbot, full_promt


#     # генерация текста моделью
#     def generate_text(
#             chatbot: List[List[Optional[str]]],
#             full_promt: str,
#             history_len: int,
#             do_sample: bool,
#             model: Dict[str, Llama],
#             *generate_args: List[Union[int, float]],
#             ) -> List[List[Optional[str]]]:

#         model = model.get('model')
#         if model is None:
#             gr.Info('Не выбрана модель GGUF')
#             yield chatbot[:-1]

#         # если сообщение эзера пустое - ничего не делать
#         if chatbot[-1][0].strip() == '':
#             yield chatbot[:-1]

#         # генерация ответа моделью
#         else:
#             promt_tokens = model.tokenize(full_promt.encode('utf-8'), special=True, add_bos=False)
#             gen_kwargs = dict(zip(GENERATE_KWARGS.keys(), generate_args))
#             if not do_sample:
#                 gen_kwargs['top_k'] = 1
#                 gen_kwargs['repeat_penalty'] = 1

#             # generator = 'И тебе привет'
#             chatbot[-1][1] = ''
#             generator = model.generate(promt_tokens, **gen_kwargs)
#             for token in generator:
#                 if token == model.token_eos():
#                     break
#                 character = model.detokenize([token]).decode('utf-8', errors='ignore')
#                 # character = token
#                 chatbot[-1][1] += character
#                 yield chatbot


# class ModelUtils:

#     # получение количества свободной памяти на диске, CPU и GPU
#     def get_memory_usage() -> str:
#         print_memory = ''

#         memory_type = 'Disk'
#         psutil_stats = psutil.disk_usage('.')
#         memory_total = psutil_stats.total / 1024**3
#         memory_usage = psutil_stats.used / 1024**3
#         print_memory += f'{memory_type} Menory Usage: {memory_usage:.2f} / {memory_total:.2f} GB\n'

#         memory_type = 'CPU'
#         psutil_stats = psutil.virtual_memory()
#         memory_total = psutil_stats.total / 1024**3
#         memory_usage =  memory_total - (psutil_stats.available / 1024**3)
#         print_memory += f'{memory_type} Menory Usage: {memory_usage:.2f} / {memory_total:.2f} GB\n'

#         if torch.cuda.is_available():
#             memory_type = 'GPU'
#             memory_free, memory_total = torch.cuda.mem_get_info()
#             memory_usage = memory_total - memory_free
#             print_memory += f'{memory_type} Menory Usage: {memory_usage / 1024**3:.2f} / {memory_total:.2f} GB\n'

#         print_memory = f'---------------\n{print_memory}---------------\n'
#         return print_memory


#     def clear_memory() -> None:
#         gc.collect()
#         torch.cuda.empty_cache()


#     # загрузка и инициализация модели GGUF
#     def load_model(model_id: str, model_file: str, n_ctx: int) -> Tuple[Dict[str, Llama], str]:
#         model = None
#         if isinstance(model_file, list):
#             load_log = 'Не выбрана модель'
#             return model, load_log

#         load_log = ''
#         if '(' in model_file:
#             model_file = model_file.split('(')[0].rstrip()

#         progress = gr.Progress()
#         progress(0.3, desc='Шаг 1/2: Загрузка модели GGUF')
#         model_path = MODELS_PATH / model_file
#         if model_path.is_file():
#             load_log += 'Модель уже загружена, повторная инициализация\n'
#         else:
#             try:
#                 hf_hub_download(
#                     repo_id=model_id,
#                     filename=model_file,
#                     local_dir=MODELS_PATH,
#                     local_dir_use_symlinks=False,
#                     )
#                 load_log += 'Модель успешно загружена\n'
#             except Exception as ex:
#                 model_path = ''
#                 load_log += f'Ошибка загрузки модели, код ошибки:\n{ex}\n'

#         if model_path:
#             progress(0.7, desc='Шаг 2/2: Инициализация модели')
#             try:
#                 model = Llama(model_path=str(model_path), n_ctx=n_ctx)
#                 load_log += f'Модель {model_id}/{model_file} инициализирована\n'
#             except Exception as ex:
#                 load_log += f'Ошибка инициализации модели, код ошибки:\n{ex}\n'

#         model = {'model': model}
#         clear_memory()
#         return model, load_log


#     # загрузка и инициализация модели эмбедингов
#     def load_embed_model(model_id: str) -> Tuple[Dict[str, HuggingFaceEmbeddings], str]:
#         embed_model = None
#         if isinstance(model_id, list):
#             load_log = 'Не выбрана модель'
#             return embed_model, load_log

#         load_log = ''
#         progress = gr.Progress()

#         folder_name = model_id.replace('/', '_')
#         folder_path = EMBED_MODELS_PATH / folder_name

#         if Path(folder_path).is_dir():
#             load_log += f'Повторная инициализация модели {model_id} \n'
#         else:
#             progress(0.5, desc='Шаг 1/2: Загрузка модели')
#             snapshot_download(
#                 repo_id=model_id,
#                 local_dir=folder_path,
#                 ignore_patterns='*.h5',
#                 local_dir_use_symlinks=False,
#             )

#         progress(0.7, desc='Шаг 2/2: Инициализация модели')
#         model_kwargs = {'device': 'cuda' if torch.cuda.is_available() else 'cpu'}
#         embed_model = HuggingFaceEmbeddings(
#             model_name=str(folder_path),
#             model_kwargs=model_kwargs,
#             )
#         load_log += f'Модель эмбедингов {model_id} инициализирована\n'
#         load_log += f'Выберите ретривер и загрузите документы повторно\n'

#         embed_model = {'embed_model': embed_model}
#         clear_memory()
#         return embed_model, load_log


#     # добавление ноового репозитория HF new_model_id к текущему списку model_ids
#     def add_new_model_id(new_model_id: str, model_ids: List[str]) -> Tuple[gr.Dropdown, str]:
#         load_log = ''
#         model_id = new_model_id.strip()

#         if model_id:
#             model_id = model_id.split('/')[-2:]
#             if len(model_id) == 2:
#                 model_id = '/'.join(model_id).split('?')[0]
#                 if repo_exists(model_id) and model_id not in model_ids:
#                     if any([file_name.endswith('.gguf') for file_name in list_repo_files(model_id)]):
#                         model_ids.insert(0, model_id)
#                         load_log += f'Репозиторий модели {model_id} успешно добавлен\n'
#                     else:
#                         load_log += f'Не найдены модели GGUF в репозитории {model_id}\n'
#                 else:
#                     load_log += 'Неверное название репозитория HF или модель уже есть в списке\n'
#             else:
#                 load_log += 'Неверная ссылка на репозиторий HF\n'
#         else:
#             load_log += 'Пустая строка в поле репозитория HF\n'

#         model_id_dropdown = gr.Dropdown(choices=model_ids, value=model_ids[0])
#         return model_id_dropdown, load_log


#     # получить список моделей GGUF из репозитория HF
#     def get_gguf_model_names(model_id: str) -> gr.Dropdown:
#         repo_files = list(list_repo_tree(model_id))
#         repo_files = [file for file in repo_files if file.path.endswith('.gguf')]
#         model_paths = [f'{file.path} ({file.size / 1000 ** 3:.2f}G)' for file in repo_files]

#         model_paths_dropdown = gr.Dropdown(
#             choices=model_paths,
#             value=model_paths[0],
#             label='Файл модели GGUF',
#             )
#         return model_paths_dropdown


#     # удаление файлов и папок моделей для очистки места кроме текущей модели ignore_link
#     def clear_folder(ignore_link: str) -> None:
#         folder = EMBED_MODELS_PATH
#         if len(ignore_link.split('/')) != 2:
#             folder = MODELS_PATH
#             if '(' in ignore_link:
#                 ignore_link = ignore_link.split('(')[0].rstrip()

#         for path in folder.iterdir():
#             if path.name == ignore_link:
#                 continue
#             if path.is_file():
#                 path.unlink()
#             elif path.is_dir():
#                 rmtree(path)
#         clear_memory()


# class Interface:
#     pass



In [None]:
# def page1():
#     with gr.Tab(label='Page1'):
#         textbox = gr.Text(value='hello')
#         gr.Markdown('page1')

# def page2():
#     with gr.Tab(label='Page2'):
#         gr.Markdown('page2')

# with gr.Blocks(theme=gr.themes.Monochrome()) as demo:
#     page1()
#     page2()
#     gr.Markdown(textbox.text)

# demo.queue().launch(debug=True)

In [None]:
# def change():
#     return 2

# with gr.Blocks(theme=gr.themes.Monochrome()) as demo:
#     dr = gr.Dropdown(choices=[1, 2, 3], value=1)
#     gr.Button().click(change, None, dr)

# demo.queue().launch(debug=True)

Приложение

## Other

Посмотреть список файлов репозитория  
https://huggingface.co/docs/huggingface_hub/v0.20.3/en/package_reference/hf_api#huggingface_hub.HfApi.list_repo_files

In [None]:
from huggingface_hub import HfApi

repo_id = 'IlyaGusev/saiga_mistral_7b_gguf'
api = HfApi()
repo_files = api.list_repo_files(repo_id, repo_type='model')
repo_files = [repo_file for repo_file in repo_files if repo_file.endswith('.gguf')]
repo_files

['model-q2_K.gguf', 'model-q4_K.gguf', 'model-q8_0.gguf']

In [None]:
MODELS_PATH = './models'
model_path = hf_hub_download(
    repo_id=repo_id,
    filename=repo_files[0],
    local_dir=MODELS_PATH,
    )

In [None]:
model_path

'./models/model-q2_K.gguf'

Доп методы

In [None]:
# методы можно вызывать либо через api. либо через функции
from huggingface_hub import list_files_info, get_paths_info, list_repo_tree, list_repo_files

# либо посмотреть дерево (возвращает генератор)
repo_files = api.list_repo_tree(repo_id)
repo_files = list(repo_files)
repo_files

# # get_paths_info вместо list_files_info который устарел - неудобно нужно передавать файлы
# files_info = api.get_paths_info(model_name, ['README.md'])
# files_info

[RepoFile(path='.gitattributes', size=1675, blob_id='09adc185a26b5a2b5a67d1e336302448b2643437', lfs=None, last_commit=None, security=None),
 RepoFile(path='README.md', size=978, blob_id='f2718d0a4626ddbd8253ea2567c3fd6f346e9570', lfs=None, last_commit=None, security=None),
 RepoFile(path='model-q2_K.gguf', size=3083107232, blob_id='001ce60f387e43cfb13386796a4c49e35e0a8654', lfs={'size': 3083107232, 'sha256': '178e59c10b8bd6fc31a6596f4a4250aff5b66c370018910fb2ead50a0a57594a', 'pointer_size': 135}, last_commit=None, security=None),
 RepoFile(path='model-q4_K.gguf', size=4368450336, blob_id='321e4c61d4cbb20a250c0dd0bdeed7afb4c1be76', lfs={'size': 4368450336, 'sha256': '2798f33ff63c791a21f05c1ee9a10bc95630b17225c140c197188a3d5cf32644', 'pointer_size': 135}, last_commit=None, security=None),
 RepoFile(path='model-q8_0.gguf', size=7695874784, blob_id='4bb8e4b2e335d8fef53ba9355d6f392319fbd2c1', lfs={'size': 7695874784, 'sha256': 'a39fdd999a61231b274ea7ed14aaca0e77e1bd8754699542328a84ceaeba4ab

In [None]:
repo_files_sizes = [f'{file.path} ({file.size / 1024 ** 3:.2f}G)' for file in repo_files]
repo_files_sizes

['.gitattributes (0.00G)',
 'README.md (0.00G)',
 'model-q2_K.gguf (2.87G)',
 'model-q4_K.gguf (4.07G)',
 'model-q8_0.gguf (7.17G)']

In [None]:
import os
os.path.getsize('/content/models/mistral-7b-instruct-v0.1.Q2_K.gguf')

3083097760

In [None]:
os.path.getsize('/content/models/mistral-7b-instruct-v0.1.Q2_K.gguf') / 1000 ** 3

3.08309776

In [None]:
os.path.getsize('/content/models/mistral-7b-instruct-v0.1.Q2_K.gguf') / 1024 ** 3

2.8713585436344147

In [None]:
os.path.getsize('/content/models/mistral-7b-instruct-v0.1.Q2_K.gguf') / 1000 / 1024 / 1024

2.9402711486816404

In [None]:
from huggingface_hub import repo_exists
repo_exists(repo_id)

True

In [None]:
from huggingface_hub import repo_info
info = repo_info(repo_id)
info

ModelInfo(id='IlyaGusev/saiga_mistral_7b_gguf', author='IlyaGusev', sha='a184baabd9d99676f8051d00da1a09ebe59c5aa2', created_at=datetime.datetime(2023, 10, 9, 17, 44, 21, tzinfo=datetime.timezone.utc), last_modified=datetime.datetime(2023, 10, 9, 17, 58, 4, tzinfo=datetime.timezone.utc), private=False, gated=False, disabled=False, downloads=0, likes=48, library_name=None, tags=['gguf', 'conversational', 'ru', 'dataset:IlyaGusev/ru_turbo_saiga', 'dataset:IlyaGusev/ru_sharegpt_cleaned', 'dataset:IlyaGusev/oasst1_ru_main_branch', 'dataset:IlyaGusev/ru_turbo_alpaca_evol_instruct', 'dataset:lksy/ru_instruct_gpt4', 'license:apache-2.0', 'has_space', 'region:us'], pipeline_tag='conversational', mask_token=None, card_data={'language': ['ru'], 'license': 'apache-2.0', 'library_name': None, 'tags': None, 'datasets': ['IlyaGusev/ru_turbo_saiga', 'IlyaGusev/ru_sharegpt_cleaned', 'IlyaGusev/oasst1_ru_main_branch', 'IlyaGusev/ru_turbo_alpaca_evol_instruct', 'lksy/ru_instruct_gpt4'], 'metrics': None, 

# Gradio App RAG

https://colab.research.google.com/drive/1tClehCQILXLSXYqIZII569S1eTWnf8hZ#forceEdit=true&sandboxMode=true

## Screenshots

**Скриншоты интерфейса приложения**

Скриншоты сделаны с использованием LLM модели `openchat/openchat-3.6-8b-20240522` и ембединг модели `sentence-transformers/all-mpnet-base-v2`, в качестве текстов для БД использовались ссылки на YouTube

Страница интерфейса 1  

<img src='https://drive.google.com/uc?export=view&id=1O08_gmClVTZnhcrvCG9g6_n0jrna2s5n' width=100%>

Страница интерфейса 2  

<img src='https://drive.google.com/uc?export=view&id=166q7QiCA2SEw1SUq16bK5NLiy3UA3w_L' width=100%>

Страница интерфейса 3  

<img src='https://drive.google.com/uc?export=view&id=1F5CMhWQ8iSGY5KR4mV9zouHB9Ot6iuYx' width=100%>

Страница интерфейса 1 демонстрация RAG  

<img src='https://drive.google.com/uc?export=view&id=1OxPNQOp7pNAJM-fWpZUKv_Hjl5fD-60b' width=100%>

Страница интерфейса 1 демонстрация RAG  

<img src='https://drive.google.com/uc?export=view&id=1fpfT8iccu2Cacb90aHWNHKfQNAwuxL4n' width=100%>

Скриншоты сделаны с использованием LLM модели `openchat/openchat-3.6-8b-20240522` (GGUF версия `openchat-3.5-0106.Q2_K.gguf` 3Gb) и ембединг модели `cointegrated/rubert-tiny2`, в качестве текстов использовалась документация мотокультиватора

Страница интерфейса 1 демонстрация RAG  

<img src='https://drive.google.com/uc?export=view&id=1VkAm8E4CFoWKucnImBMgy93DLMuWj7kw' width=100%>

Страница интерфейса 1 демонстрация RAG  

<img src='https://drive.google.com/uc?export=view&id=17EX6EQ6GqWdHxK8qfTh18F_YfGNXGwx6' width=100%>

## Запуск приложения в Colab

### Установка библиотек и импорты

Установка библиотек

In [1]:
%%time
%%capture

# faiss-gpu or faiss-gpu
!pip install accelerate langchain langchain_community langchain_huggingface \
            pdfminer.six faiss-cpu gradio youtube-transcript-api  # sentence_transformers

CPU times: user 325 ms, sys: 89.4 ms, total: 414 ms
Wall time: 41.9 s


Доки llama-cpp-python  
https://llama-cpp-python.readthedocs.io/en/latest/?badge=latest  
Страница llama-cpp-python   
https://github.com/abetlen/llama-cpp-python


Установка библиотеки llama-cpp-python

In [2]:
# установка с поддержкой CPU
!pip install -q llama-cpp-python==0.2.88

# установка с поддержкой GPU CUDA
# !CMAKE_ARGS='-DGGML_CUDA=on' FORCE_CMAKE=1 pip install llama-cpp-python

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.7/63.7 MB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for llama-cpp-python (pyproject.toml) ... [?25l[?25hdone


In [3]:
import gc
import psutil
import csv
from pathlib import Path
from shutil import rmtree
from typing import List, Tuple, Dict, Union, Optional, Any
from tqdm import tqdm

import requests
from requests.exceptions import MissingSchema

import torch
import numpy as np
import gradio as gr

from llama_cpp import Llama
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound, TranscriptsDisabled

from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings

from langchain_community.document_loaders import (
    CSVLoader,
    PDFMinerLoader,
    PyPDFLoader,
    TextLoader,
    UnstructuredHTMLLoader,
    UnstructuredMarkdownLoader,
    UnstructuredPowerPointLoader,
    UnstructuredWordDocumentLoader,
    WebBaseLoader,
    YoutubeLoader,
    DirectoryLoader,
)

# annotations
from langchain_core.retrievers import BaseRetriever
from langchain.docstore.document import Document
from langchain_core.vectorstores import VectorStore
from langchain_core.embeddings import Embeddings



In [None]:
!pip list | grep -P 'accelerate|torch|langchain|transformers|llama_cpp|gradio|youtube|huggingface-hub|pdfminer.six|faiss'

accelerate                       0.34.2
faiss-cpu                        1.8.0.post1
gradio                           4.44.0
gradio_client                    1.3.0
huggingface-hub                  0.24.7
langchain                        0.3.1
langchain-community              0.3.1
langchain-core                   0.3.6
langchain-huggingface            0.1.0
langchain-text-splitters         0.3.0
llama_cpp_python                 0.2.88
pdfminer.six                     20240706
sentence-transformers            3.1.1
torch                            2.4.1+cu121
torchaudio                       2.4.1+cu121
torchsummary                     1.5.1
torchvision                      0.19.1+cu121
transformers                     4.44.2
youtube-transcript-api           0.6.2


In [None]:
# сохранить зависимости в requirements.txt (при необходимости вырезать лишнее)
!pip freeze | grep -P 'accelerate|torch|langchain|transformers|llama_cpp|gradio|youtube|huggingface-hub|pdfminer.six|faiss' > requirements.txt

In [None]:
# просмотр requirements.txt
!cat requirements.txt

accelerate==0.34.2
faiss-cpu==1.8.0.post1
gradio==4.44.0
gradio_client==1.3.0
huggingface-hub==0.24.7
langchain==0.3.1
langchain-community==0.3.1
langchain-core==0.3.6
langchain-huggingface==0.1.0
langchain-text-splitters==0.3.0
llama_cpp_python==0.2.88
pdfminer.six==20240706
sentence-transformers==3.1.1
torch @ https://download.pytorch.org/whl/cu121_full/torch-2.4.1%2Bcu121-cp310-cp310-linux_x86_64.whl#sha256=f3ed9a2b7f8671b2b32a2f036d1b81055eb3ad9b18ba43b705aa34bae4289e1a
torchaudio @ https://download.pytorch.org/whl/cu121_full/torchaudio-2.4.1%2Bcu121-cp310-cp310-linux_x86_64.whl#sha256=da8c87c80a1c1376a48dc33eef30b03bbdf1df25a05bd2b1c620b8811c7b19be
torchsummary==1.5.1
torchvision @ https://download.pytorch.org/whl/cu121_full/torchvision-0.19.1%2Bcu121-cp310-cp310-linux_x86_64.whl#sha256=b8cc4bf381b75522995b601e07a1b433b5fd925dc3e34a7fa6cd22f449d65379
transformers==4.44.2
youtube-transcript-api==0.6.2


In [None]:
# версия Python
!python -V

Python 3.10.12


### Загрузка и инициализация моделей

Функция для загрузки моделей через библиотеку `requests` с прогресс баром

In [4]:
def download_file(file_url: str, file_path: Union[str, Path]) -> None:
    response = requests.get(file_url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    chunk_size = 4096  # 4 Kb
    progress_bar = tqdm(desc='Загрузка файла GGUF', total=total_size, unit='iB', unit_scale=True)

    with open(file_path, 'wb') as file:
        for data in response.iter_content(chunk_size):
            file.write(data)
            progress_bar.update(len(data))
    # progress_bar.close()

Инийциализация модели ембедингов и LLM модели

In [21]:
# инициализация модели эмбедингов
print('Инициализация модели эмбедингов ...')
device = 'cuda' if torch.cuda.is_available() else 'cpu'
embed_model_name = 'sentence-transformers/all-mpnet-base-v2'
embed_model_path = Path('embed_model')
embed_model = HuggingFaceEmbeddings(
    model_name=embed_model_name,
    model_kwargs={'device': device},  # cpu cuda
    cache_folder=str(embed_model_path),
    )

# прямая ссылка на модель в формате GGUF
# llm_model_url = 'https://huggingface.co/bartowski/openchat-3.6-8b-20240522-GGUF/resolve/main/openchat-3.6-8b-20240522-IQ4_XS.gguf'
llm_model_url = 'https://huggingface.co/bartowski/gemma-2-2b-it-GGUF/resolve/main/gemma-2-2b-it-Q8_0.gguf'

#
# полный путь до модели в папке ./models
llm_model_path = Path('models') / gguf_url.rsplit('/')[-1]
llm_model_path.parent.mkdir(exist_ok=True)

# если модель отсутствует в папке models то нужно ее загрузить - например через библиотеку wget
if not llm_model_path.is_file():
    if not str(llm_model_url).endswith('.gguf'):
        raise Exception('Ссылка на модель LLM должна быть прямой ссылкой на файл GGUF')
    else:
        print('Загрузка файла LLM модели ...')
        download_file(llm_model_url, llm_model_path)
        # model_path = wget.download(llm_model_url, out=str(llm_model_path))

# инициализация модели для генерации текста (установить нужный или поддерживаемый размер контекста n_ctx)
print('Инициализация LLM модели ...')
llm_model = Llama(model_path=str(llm_model_path), n_gpu_layers=-1, verbose=True)

# поддерживает ли модель системный промт
support_system_role = 'System role not supported' not in llm_model.metadata['tokenizer.chat_template']

# параметры генерации
# чтобы модель отвечала одинаково достаточно поставить top_k=1, top_p=0 и repeat_penalty=1 не зависимо от остальных параметров
GENERATE_KWARGS = dict(
    temperature=1,  # температура для софтмакса
    top_p=0.0,  # сумма вероятностей токенов из которых нужно выбирать следующий токен
    top_k=1,  # из скольки максимально вероятных токенов выбирать следующий токен
    repeat_penalty=1.0,  # штраф модели за повторения
    )

Инициализация модели эмбедингов ...


llama_model_loader: loaded meta data with 39 key-value pairs and 288 tensors from model/gemma-2-2b-it-Q8_0.gguf (version GGUF V3 (latest))
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = gemma2
llama_model_loader: - kv   1:                               general.type str              = model
llama_model_loader: - kv   2:                               general.name str              = Gemma 2 2b It
llama_model_loader: - kv   3:                           general.finetune str              = it
llama_model_loader: - kv   4:                           general.basename str              = gemma-2
llama_model_loader: - kv   5:                         general.size_label str              = 2B
llama_model_loader: - kv   6:                            general.license str              = gemma
llama_model_loader: - kv   7:                               general.tags

Инициализация LLM модели ...


llama_model_loader: - kv  23:                      tokenizer.ggml.tokens arr[str,256000]  = ["<pad>", "<eos>", "<bos>", "<unk>", ...
llama_model_loader: - kv  24:                      tokenizer.ggml.scores arr[f32,256000]  = [-1000.000000, -1000.000000, -1000.00...
llama_model_loader: - kv  25:                  tokenizer.ggml.token_type arr[i32,256000]  = [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, ...
llama_model_loader: - kv  26:                tokenizer.ggml.bos_token_id u32              = 2
llama_model_loader: - kv  27:                tokenizer.ggml.eos_token_id u32              = 1
llama_model_loader: - kv  28:            tokenizer.ggml.unknown_token_id u32              = 3
llama_model_loader: - kv  29:            tokenizer.ggml.padding_token_id u32              = 0
llama_model_loader: - kv  30:               tokenizer.ggml.add_bos_token bool             = true
llama_model_loader: - kv  31:               tokenizer.ggml.add_eos_token bool             = false
llama_model_loader: - kv  32: 

In [7]:
support_system_role = 'System role not supported' not in start_model.metadata['tokenizer.chat_template']
support_system_role

False

In [11]:
texts = ['зима', 'весна']
db = FAISS.from_texts(texts=texts, embedding=embed_model)

In [12]:
len(db)

TypeError: object of type 'FAISS' has no len()

In [20]:
len(db.docstore._dict)

2

Проверка генерации текста LLM моделью

Генерация текста моделью

Документация по методу `create_chat_completion`  
https://llama-cpp-python.readthedocs.io/en/latest/api-reference/#llama_cpp.Llama.create_chat_completion

In [None]:
%%time

# системный промт
system_prompt = ''
# входной запрос пользователя
user_message = 'Почему трава зеленая?'
# список с репликами юзера и бота (в данном примере одна)
messages = []

# формирование стандартного промта с запросом от пользователя
messages.append({'role': 'user', 'content': user_message})

# параметры генерации
# чтобы модель отвечала одинаково достаточно поставить top_k=1, top_p=0 и repeat_penalty=1 не зависимо от остальных параметров
GENERATE_KWARGS = dict(
    temperature=1,  # температура для софтмакса
    top_p=0.0,  # сумма вероятностей токенов из которых нужно выбирать следующий токен
    top_k=1,  # из скольки максимально вероятных токенов выбирать следующий токен
    repeat_penalty=1.0,  # штраф модели за повторения
    )

# создание объекта итератора для генерации текста
# при итерации по этому объекту в цикле она будет возвращать сгенерированный текст частями текста (токенами)
stream_response = llm_model.create_chat_completion(
    messages=messages,  # входной промт на который надо сгенерировать ответ
    stream=True,  # вернуть генератор
    **GENERATE_KWARGS,  # передача параметров генерации
    )

# пустую строку будем конкатенировать с токенами ответа модели
response_text = ''
# итерация и последовательная генерация текста моделью в цикле
for chunk in stream_response:
    # извлечение текущего сгенерированного токена
    token = chunk['choices'][0]['delta'].get('content')
    if token is not None:
        response_text += token
        print(token, end='')

Трава зеленая из-за пигмента **хлорофилла**. 

Вот как это работает:

* **Хлорофилл** - это пигмент, который содержится в хлоропластах, органеллах, расположенных в клетках растений. 
* **Хлорофилл** поглощает солнечный свет, особенно в красном и синем диапазоне, и отражает зеленый свет. 
* **Зеленый свет** - это то, что мы видим, когда трава отражает свет. 

Таким образом, трава кажется зеленой, потому что она отражает зеленый свет, который поглощается хлорофиллом. 



llama_print_timings:        load time =   14636.85 ms
llama_print_timings:      sample time =      45.09 ms /   149 runs   (    0.30 ms per token,  3304.50 tokens per second)
llama_print_timings: prompt eval time =   14631.99 ms /    15 tokens (  975.47 ms per token,     1.03 tokens per second)
llama_print_timings:        eval time =   76900.53 ms /   148 runs   (  519.60 ms per token,     1.92 tokens per second)
llama_print_timings:       total time =   92487.25 ms /   163 tokens


CPU times: user 1min 12s, sys: 465 ms, total: 1min 12s
Wall time: 1min 32s


### Функции

In [27]:
# аннотации
CHAT_HISTORY = List[Tuple[Optional[str], Optional[str]]]
MODEL_DICT = Dict[str, Llama]

# получение количества свободной памяти на диске, CPU и GPU
def get_memory_usage() -> str:
    print_memory = ''

    memory_type = 'Disk'
    psutil_stats = psutil.disk_usage('.')
    memory_total = psutil_stats.total / 1024**3
    memory_usage = psutil_stats.used / 1024**3
    print_memory += f'{memory_type} Menory Usage: {memory_usage:.2f} / {memory_total:.2f} GB\n'

    memory_type = 'CPU'
    psutil_stats = psutil.virtual_memory()
    memory_total = psutil_stats.total / 1024**3
    memory_usage =  memory_total - (psutil_stats.available / 1024**3)
    print_memory += f'{memory_type} Menory Usage: {memory_usage:.2f} / {memory_total:.2f} GB\n'

    if torch.cuda.is_available():
        memory_type = 'GPU'
        memory_free, memory_total = torch.cuda.mem_get_info()
        memory_usage = memory_total - memory_free
        print_memory += f'{memory_type} Menory Usage: {memory_usage / 1024**3:.2f} / {memory_total:.2f} GB\n'

    print_memory = f'---------------\n{print_memory}---------------\n'
    return print_memory


def clear_memory() -> None:
    gc.collect()
    torch.cuda.empty_cache()


# загрузка и инициализация модели GGUF
def load_model(model_id: str, model_file: str, n_ctx: int) -> Tuple[Dict[str, Llama], str]:
    model = None
    if isinstance(model_file, list):
        load_log = 'Не выбрана модель'
        return model, load_log

    load_log = ''
    if '(' in model_file:
        model_file = model_file.split('(')[0].rstrip()

    progress = gr.Progress()
    progress(0.3, desc='Шаг 1/2: Загрузка модели GGUF')
    model_path = MODELS_PATH / model_file
    if model_path.is_file():
        load_log += 'Модель уже загружена, повторная инициализация\n'
    else:
        try:
            hf_hub_download(
                repo_id=model_id,
                filename=model_file,
                local_dir=MODELS_PATH,
                local_dir_use_symlinks=False,
                )
            load_log += 'Модель успешно загружена\n'
        except Exception as ex:
            model_path = ''
            load_log += f'Ошибка загрузки модели, код ошибки:\n{ex}\n'

    if model_path:
        progress(0.7, desc='Шаг 2/2: Инициализация модели')
        try:
            model = Llama(model_path=str(model_path), n_ctx=n_ctx, n_gpu_layers=-1)
            load_log += f'Модель {model_id}/{model_file} инициализирована\n'
        except Exception as ex:
            load_log += f'Ошибка инициализации модели, код ошибки:\n{ex}\n'

    model = {'model': model}
    clear_memory()
    return model, load_log


# загрузка и инициализация модели эмбедингов
def load_embed_model(model_id: str) -> Tuple[Dict[str, HuggingFaceEmbeddings], str]:
    embed_model = None
    if isinstance(model_id, list):
        load_log = 'Не выбрана модель'
        return embed_model, load_log

    load_log = ''
    progress = gr.Progress()

    folder_name = model_id.replace('/', '_')
    folder_path = EMBED_MODELS_PATH / folder_name

    if Path(folder_path).is_dir():
        load_log += f'Повторная инициализация модели {model_id} \n'
    else:
        progress(0.5, desc='Шаг 1/2: Загрузка модели')
        snapshot_download(
            repo_id=model_id,
            local_dir=folder_path,
            ignore_patterns='*.h5',
            local_dir_use_symlinks=False,
        )

    progress(0.7, desc='Шаг 2/2: Инициализация модели')
    model_kwargs = {'device': 'cuda' if torch.cuda.is_available() else 'cpu'}
    embed_model = HuggingFaceEmbeddings(
        model_name=str(folder_path),
        model_kwargs=model_kwargs,
        )
    load_log += f'Модель эмбедингов {model_id} инициализирована\n'
    load_log += f'Выберите ретривер и загрузите документы повторно\n'

    embed_model = {'embed_model': embed_model}
    clear_memory()
    return embed_model, load_log


# добавление ноового репозитория HF new_model_id к текущему списку model_ids
def add_new_model_id(new_model_id: str, model_ids: List[str]) -> Tuple[gr.Dropdown, str]:
    load_log = ''
    model_id = new_model_id.strip()

    if model_id:
        model_id = model_id.split('/')[-2:]
        if len(model_id) == 2:
            model_id = '/'.join(model_id).split('?')[0]
            if repo_exists(model_id) and model_id not in model_ids:
                if any([file_name.endswith('.gguf') for file_name in list_repo_files(model_id)]):
                    model_ids.insert(0, model_id)
                    load_log += f'Репозиторий модели {model_id} успешно добавлен\n'
                else:
                    load_log += f'Не найдены модели GGUF в репозитории {model_id}\n'
            else:
                load_log += 'Неверное название репозитория HF или модель уже есть в списке\n'
        else:
            load_log += 'Неверная ссылка на репозиторий HF\n'
    else:
        load_log += 'Пустая строка в поле репозитория HF\n'

    model_id_dropdown = gr.Dropdown(choices=model_ids, value=model_ids[0])
    return model_id_dropdown, load_log


# получить список моделей GGUF из репозитория HF
def get_gguf_model_names(model_id: str) -> gr.Dropdown:
    repo_files = list(list_repo_tree(model_id))
    repo_files = [file for file in repo_files if file.path.endswith('.gguf')]
    model_paths = [f'{file.path} ({file.size / 1000 ** 3:.2f}G)' for file in repo_files]

    model_paths_dropdown = gr.Dropdown(
        choices=model_paths,
        value=model_paths[0],
        label='Файл модели GGUF',
        )
    return model_paths_dropdown


# удаление файлов и папок моделей для очистки места кроме текущей модели ignore_link
def clear_folder(ignore_link: str) -> None:
    folder = EMBED_MODELS_PATH
    if len(ignore_link.split('/')) != 2:
        folder = MODELS_PATH
        if '(' in ignore_link:
            ignore_link = ignore_link.split('(')[0].rstrip()

    for path in folder.iterdir():
        if path.name == ignore_link:
            continue
        if path.is_file():
            path.unlink()
        elif path.is_dir():
            rmtree(path)
    clear_memory()

# очистка текста
def clear_text(text: str) -> str:
    lines = text.split('\n')
    # брать строки, кол-во символов в которых больше 2
    lines = [line for line in lines if len(line.strip()) > 2]
    text = '\n'.join(lines).strip()
    return text


# очистка списка документов
def clear_documents(documents: List[Document]) -> List[Document]:
    output_documents = []
    # итерация по документам
    for document in documents:
        # очистка текста текущего документа
        text = clear_text(document.page_content)
        # берем документы с длиной текста более 1
        if len(text) > 10:
            # записываем очищенный текст обратно в документ и добавляем в список
            document.page_content = text
            output_documents.append(document)
    return output_documents


# получение разделителя для csv файла на слуяай если он отличется от ','
def get_csv_delimiter(file_path: str) -> str:
    n_bytes = 4096
    with open(file_path) as csvfile:
        delimiter = csv.Sniffer().sniff(csvfile.read(n_bytes)).delimiter
    return delimiter


# функция проверки доступности субтитров, если доступны ручные или автоматические - возвращает True и логи
# если субтитры недоступны - возвращает False и логи
def check_subtitles_available(yt_video_link: str, target_lang: str) -> Tuple[bool, str]:
    # извлечение ID видео из полной ссылки видео на YouTube
    video_id = yt_video_link.split('watch?v=')[-1].split('&')[0]
    # строка с логами
    load_log = ''
    # статус доступности субтитров
    available = True
    try:
        # доступные языки субтитров для текущего видео
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        try:
            # поиск языка target_lang в доступных языках
            transcript = transcript_list.find_transcript([target_lang])
            # проверка что субтитры автоматические или ручные
            if transcript.is_generated:
                load_log += f'Будут загружены автоматические субтитры, ручные недоступны для видео {yt_video_link}\n'
            else:
                load_log += f'Будут загружены ручные субтитры для видео {yt_video_link}\n'

        # если субтитры для видео есть но нет субтитров на желаемом языке
        except NoTranscriptFound:
            load_log += f'Язык субтитров {target_lang} недоступен для видео {yt_video_link}\n'
            available = False

    # если никаких субтитров для видео нет
    except TranscriptsDisabled:
        load_log += f'Нет субтитров для видео {yt_video_link}\n'
        available = False

    return available, load_log


# извлечение документов (в формате langchain Documents) из загруженных файлов
def load_documents_from_files(upload_files: List[str]) -> Tuple[List[Document], str]:
    # логи загрузки и список для загруженных документов
    load_log = ''
    documents = []
    # итерация по путям до файлов
    for upload_file in upload_files:
        # получение расширения файла
        file_extension = f".{upload_file.split('.')[-1]}"
        # если файл расширения есть в словаре с лоадерами LOADER_CLASSES
        if file_extension in LOADER_CLASSES:
            # извлекаем соотвествующий раширению лоадер
            loader_class = LOADER_CLASSES[file_extension]
            loader_kwargs = {}
            # если это csv то дополнительно находим разделитель
            if file_extension == '.csv':
                delimiter = get_csv_delimiter(upload_file)
                loader_kwargs = {'csv_args': {'delimiter': delimiter}}
            try:
                # получаем документы из файлов
                load_documents = loader_class(upload_file, **loader_kwargs).load()
                documents.extend(load_documents)
            except Exception as ex:
                load_log += f'Ошибка загрузки файла {upload_file}\n'
                load_log += f'Код ошибки: {ex}\n'
                continue
        else:
            load_log += f'Неподдерживаемый формат файла {upload_file}\n'
            continue
    return documents, load_log


# извлечение документов (в формате langchain Documents) из WEB ссылок
def load_documents_from_links(
        web_links: str,  # ссылки на web-сайты или на видео YouTube
        subtitles_lang: str,  # желаемый язык субтитров на YouTube
        ) -> Tuple[List[Document], str]:

    # логи загрузки, список для документов и параметры для лоадера
    load_log = ''
    documents = []
    loader_class_kwargs = {}

    # фильтрация пустых строк
    web_links = [web_link.strip() for web_link in web_links.split('\n') if web_link.strip()]

    # итераци по переданным ссылкам
    for web_link in web_links:
        # ----------------- ссылка на YouTube ---------------------------------
        if 'youtube.com' in web_link:
            # проверка что субтитры на выбранном языке subtitles_lang доступны в видео web_link
            available, log = check_subtitles_available(web_link, subtitles_lang)
            load_log += log

            # если субтитры недоступны - пропускаем ссылку
            if not available:
                continue

            # инициализация YouTubeLoader с параметром языка субтитров
            loader_class = LOADER_CLASSES['youtube'].from_youtube_url
            loader_class_kwargs = {'language': subtitles_lang}

        # ----------------- ссылка не на YouTube ------------------------------
        else:
            # инициализация WebBaseLoader
            loader_class = LOADER_CLASSES['web']
        try:
            # проверка что сайт доступен для парсинга с помощью python requests
            if requests.get(web_link).status_code != 200:
                load_log += f'Ссылка недоступна для Python requests: {web_link}\n'
                continue
            # если доступен то загружаем документ с текстом из web ссылки
            load_documents = loader_class(web_link, **loader_class_kwargs).load()
            if len(load_documents) == 0:
                load_log += f'Фрагменты текста не были найдены по ссылке: {web_link}\n'
                continue
            documents.extend(load_documents)
        except MissingSchema:
            # если ссылка неверная
            load_log += f'Неверная ссылка: {web_link}\n'
            continue
        except Exception as ex:
            # если какая то другая ошибка
            load_log += f'Ошибка загрузки web лоадером данных по ссылке: {web_link}\n'
            load_log += f'Код ошибки: {ex}\n'
            continue
    return documents, load_log


# загрузка файлов и формирование документов и БД
def load_documents_and_create_db(
        upload_files: Optional[List[str]],  # пути до файлов, загружаемые через gr.File()
        web_links: str,  # ссылки на web-сайты или на видео YouTube
        subtitles_lang: str,  # желаемый язык субтитров на YouTube
        chunk_size: int,  # кол-во символов
        chunk_overlap: int,
        embed_model,
        ) -> Tuple[List[Document], Optional[BaseRetriever], str]:

    # логи загрузки, список для документов из всех источников, БД и прогресс бар Gradio
    load_log = ''
    all_documents = []
    db = None
    progress = gr.Progress()

    # если не переданы ни пути до файлов ни ссылки
    if upload_files is None and not web_links:
        load_log = 'Не выбраны файлы или ссылки'
        return all_documents, db, load_log

    # загрузка документов из файлов
    if upload_files is not None:
        progress(0.3, desc='Шаг 1/2: Загрузка документов из файлов')
        docs, log = load_documents_from_files(upload_files)
        all_documents.extend(docs)
        load_log += log

    # загрузка документов по ссылкам
    if web_links:
        progress(0.3 if upload_files is None else 0.5, desc='Шаг 1/2: Загрузка документов по ссылкам')
        docs, log = load_documents_from_links(web_links, subtitles_lang)
        all_documents.extend(docs)
        load_log += log

    # если не загружено ни одного документа
    if len(all_documents) == 0:
        load_log += 'Загрузка прервана так как не было извлечено ни одного документа\n'
        load_log += 'Режим RAG не может быть активирован'
        return all_documents, db, load_log

    load_log += f'Загружено документов: {len(all_documents)}\n'

    # разделить документы на фрагменты и очистка фрагментов
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    documents = text_splitter.split_documents(all_documents)
    documents = clear_documents(documents)
    load_log += f'Документы разделены, кол-во фрагментов текста: {len(documents)}\n'

    # инициализация БД
    progress(0.7, desc='Шаг 2/2: Инициализация БД')
    db = FAISS.from_documents(documents=documents, embedding=embed_model)
    load_log += 'Режим RAG активирован и может быть дективирован на вкладке Generate'
    return documents, db, load_log


# добавление сообщение пользователя в окошко чат бота
def user_message_to_chatbot(user_message: str, chatbot: CHAT_HISTORY) -> Tuple[str, CHAT_HISTORY]:
    if user_message:
        chatbot.append((user_message, None))
    return '', chatbot


# форматирование промта с добавлением контекста, если БД доступна и режим RAG активирован
# иначе формирование промта без контекста - для простого общения с ботом
def update_user_message_with_context(
        chatbot: List[List[Optional[str]]],  # объект чатбота - список списков реплик юзера и бота
        rag_mode: bool,  # режим RAG - включен или отключен
        db: VectorStore,  # БД
        k: Union[int, str],  # кол-во релевантных запросу пользователя фрагментов текста для контекста
        score_threshold: float,  # порого по которому ищутся релевантные фрагменты текста для контекста
        ) -> Tuple[str, List[List[Optional[str]]], str]:

    # текущее сообщение пользователя
    user_message = chatbot[-1][0]

    # если сообщение пользователя пустое - обработаем это в следующей функции generate_text
    if not user_message.strip():
        return '', chatbot

    # если БД готова и включен режим RAG то ищем релевантные доки и добавляем в промт
    if db is not None and rag_mode:
        # использовать все документы, скор которых больше score_threshold
        if k == 'max':
            k = len(db.docstore._dict)
        # поиск релевантных фрагментов текста
        docs_and_distances = db.similarity_search_with_relevance_scores(
            user_message,
            k=k,
            score_threshold=score_threshold,
            )
        # если релевантные документы не найдены
        if len(docs_and_distances) == 0:
            gr.Info((
                f'Релевантные запросу документы не найдены, '
                f'используется промт без контекста (попробуйте уменьшить searh_score_threshold)'
                ))
        else:
            # формирование контекста из релевантных фрагментов текста
            retriever_context = '\n\n'.join([doc[0].page_content for doc in docs_and_distances])
            # обогащение сообщения юзера контекстом
            user_message = CONTEXT_TEMPLATE.format(
                user_message=user_message,
                context=retriever_context,
                )

    # добавление в чат бот текущего сообщения юзера чтобы оно было видно на экране
    chatbot.append([user_message, None])
    return '', chatbot


# получить текст текущего промта с контекстом
def get_user_message_with_context(chatbot, rag_mode) -> gr.component:
    user_message = chatbot[-1][0] if len(chatbot) > 0 else ''
    return gr.Texbox(user_message, visible=rag_mode, interactive=False)


# генерация текста моделью
def get_llm_response(
        chatbot: CHAT_HISTORY,  # список переписок пользователя и бота, который отображается в окошке чат-бота
        model_dict: MODEL_DICT,  # словарь с модель llama-cpp-python
        system_prompt: str,  # системный промт
        support_system_role: bool,  # поддержиывает ли модель системный промт
        history_len: int,  # кол-во предыдущих сообщений которые учитываются в истории
        do_sample: bool,  # использовать ли случайное семплирование при генерации ответа моделью
        *generate_args,  # параметры генерации ответа модели
        ) -> List[List[Optional[str]]]:


    # если сообщение пользователя пустое - обработаем это в следующей функции generate_text
    user_message = chatbot[-1][0]
    if not user_message.strip():
        return '', chatbot

    # получение модели
    model = model_dict.get('model')
    # список с переписками юзера и бота
    messages = []

    # обновление словаря параметров генерации текста
    gen_kwargs = dict(zip(GENERATE_KWARGS.keys(), generate_args))
    gen_kwargs['top_k'] = int(gen_kwargs['top_k'])

    # отключение случайного семплирования если do_sample=False
    if not do_sample:
        gen_kwargs['top_p'] = 0.0
        gen_kwargs['top_k'] = 1
        gen_kwargs['repeat_penalty'] = 1.0

    # добавление системного промта если он есть и поддерживается моделью
    if support_system_role and system_prompt:
        messages.append({'role': 'system', 'content': system_prompt})

    # добавление истории переписки в промт
    if history_len != 0:
        for user_msg, bot_msg in chatbot[:-1][-history_len:]:
            print(user_msg, bot_msg)
            messages.append({'role': 'user', 'content': user_msg})
            messages.append({'role': 'assistant', 'content': bot_msg})

    # формирование генератора для генерации ответа моделью с форматированием промта
    stream_response = model.create_chat_completion(
        messages=messages,
        stream=True,
        **gen_kwargs,
        )

    # пустую строку будем конкатенировать с токенами ответа модели
    chatbot[-1][1] = ''
    # итерация и генерация токенов ответа модели в цикле
    for chunk in stream_response:
        token = chunk['choices'][0]['delta'].get('content')
        if token is not None:
            chatbot[-1][1] += token
            yield chatbot


# получение окошка для системного промта. если interactive=True то можно вводить системный промт
# примеры системных промтов:
# Отвечай на вопросы максимально кратко, если не знаешь ответ - говори "не знаю".
# Если в вопросе есть хоть малейший намек на политику, насилие, ругательства, ответь так: "На эту тему я не могу говорить".
def get_system_prompt_component(interactive: bool) -> gr.Textbox:
    value = '' if interactive else 'System prompt is not supported by this model'
    return gr.Textbox(value=value, label='System prompt', interactive=interactive)


# компоненты настройки конфига генерации текста
def get_generate_args(do_sample: bool) -> List[gr.component]:
    # если do_sample включен (элемент gr.Checkbox() активен) то отображать слайдера с параметрами генерации
    generate_args = [
        gr.Slider(label='temperature', value=GENERATE_KWARGS['temperature'], minimum=0.1, maximum=3, step=0.1, visible=do_sample),
        gr.Slider(label='top_p', value=GENERATE_KWARGS['top_p'], minimum=0.1, maximum=1, step=0.1, visible=do_sample),
        gr.Slider(label='top_k', value=GENERATE_KWARGS['top_k'], minimum=1, maximum=50, step=5, visible=do_sample),
        gr.Slider(label='repeat_penalty', value=GENERATE_KWARGS['repeat_penalty'], minimum=1, maximum=5, step=0.1, visible=do_sample),
    ]
    return generate_args

### Приложение

Запуск приложения

In [None]:
# классы langchain для извлечения текста из различных источников
LOADER_CLASSES = {
    '.csv': CSVLoader,
    '.doc': UnstructuredWordDocumentLoader,
    '.docx': UnstructuredWordDocumentLoader,
    '.html': UnstructuredHTMLLoader,
    '.md': UnstructuredMarkdownLoader,
    '.pdf': PDFMinerLoader,
    '.ppt': UnstructuredPowerPointLoader,
    '.pptx': UnstructuredPowerPointLoader,
    '.txt': TextLoader,
    'web': WebBaseLoader,
    'directory': DirectoryLoader,
    'youtube': YoutubeLoader,
}

# языки для субтитров YouTube
SUBTITLES_LANGUAGES = ['ru', 'en']

# пример видео с автоматически сгенерированными субтитрами
# https://www.youtube.com/watch?v=CFVABT8wtl4
# пример видео с обычными субтитрами
# https://www.youtube.com/watch?v=EEGk7gHoKfY

# начальные настройки бота при первом запуске
HISTORY_LEN = 0

# шаблон промта при условии контекста
CONTEXT_TEMPLATE = '''Ответь на вопрос при условии контекста.

Контекст:
{context}

Вопрос:
{user_message}

Ответ:'''

# параметры генерации
# чтобы модель отвечала одинаково достаточно поставить top_k=1, top_p=0 и repeat_penalty=1 не зависимо от остальных параметров
GENERATE_KWARGS = dict(
    temperature=1,  # температура для софтмакса
    top_p=0.0,  # сумма вероятностей токенов из которых нужно выбирать следующий токен
    top_k=1,  # из скольки максимально вероятных токенов выбирать следующий токен
    repeat_penalty=1.0,  # штраф модели за повторения
    )


# ====================== ИНТЕРФЕЙС ПРИЛОЖЕНИЯ ============================

# тема оформления и CSS
# theme = gr.themes.Monochrome()
theme = gr.themes.Base(primary_hue='green', secondary_hue='yellow', neutral_hue='zinc').set(
    loader_color='rgb(0, 255, 0)',
    slider_color='rgb(0, 200, 0)',
    body_text_color_dark='rgb(0, 200, 0)',
    button_secondary_background_fill_dark='green',
)
css = '''.gradio-container {width: 60% !important}'''

# начало описания интерфейса приложения
with gr.Blocks(theme=theme, css=css) as interface:

    # ==================== СОСТОЯНИЯ ===============================

    # загруженные фрагменты текста (список объектов langchain Document)
    documents = gr.State([])
    # БД
    db = gr.State(None)
    # обогащенный контекстом промт пользователя
    user_message_with_context = gr.State('')
    # флаг поддержки системного промта моделью
    support_system_role = gr.State(start_support_system_role)

    model = gr.State({'model': start_model})
    embed_model = gr.State({'embed_model': start_embed_model})

    # ==================== СТРАНИЦА БОТА =================================

    # функция получения окна чат бота с названием текущего режима - RAG или Chatbot
    # а так же параметров RAG -
    def get_rag_settings(rag_mode: bool):
        # кол-во релевантных фрагментов текста для поиска в БД
        k = gr.Radio(
            choices=[1, 2, 3, 'max'],
            value=2,
            label='Количество релевантных документов для поиска',
            visible=rag_mode,
            render=False,
            )

        # порог для поиска релевантных фрагментов текста от 0 до 1 (чем ниже тем больше документов будет найдено)
        score_threshold = gr.Slider(
            label='searh_score_threshold',
            value=0.5,
            minimum=0,
            maximum=1,
            step=0.1,
            visible=rag_mode,
            render=False,
            )
        return k, score_threshold


    with gr.Tab(label='Generate'):
        with gr.Row():
            with gr.Column(scale=3):
                # окошко чат бота
                chatbot = gr.Chatbot(
                    show_copy_button=True,
                    bubble_full_width=False,
                    height=480,
                )
                # сообщение пользователя
                user_message = gr.Textbox(label='User')

                with gr.Row():
                    # кнопки отправить сообщение, стоп генерации и удалить историю чата
                    user_message_btn = gr.Button('Отправить')
                    stop_btn = gr.Button('Стоп')
                    clear_btn = gr.Button('Очистить чат')


            # ------------------ ПАРАМЕТРЫ ГЕНЕРАЦИИ -------------------------

            with gr.Column(scale=1, min_width=80):
                with gr.Group():
                    # длина истории которую будет учитывать модель
                    gr.Markdown('Размер истории')
                    history_len = gr.Slider(
                        minimum=0,
                        maximum=5,
                        value=HISTORY_LEN,
                        step=1,
                        info='Кол-во предыдущих сообщенией, учитываемых в истории',
                        label='history len',
                        show_label=False,
                        )

                    with gr.Group():
                        gr.Markdown('Параметры генерации')
                        # переключатель активации случайного семплирования при генерации текста моделью
                        do_sample = gr.Checkbox(
                            value=False,
                            label='do_sample',
                            info='Активация случайного семплирования',
                            )
                        # настройки семплирования
                        generate_args = get_generate_args(do_sample.value)
                        do_sample.change(
                            fn=get_generate_args,
                            inputs=do_sample,
                            outputs=generate_args,
                            show_progress=False,
                            )

        # переключатель включить или выключить режим RAG (даже если БД готова к работе RAG можно отключить)
        rag_mode = gr.Checkbox(value=False, label='Режим RAG', scale=1, visible=False)
        rag_mode.change(
            fn=get_rag_settings,
            inputs=[rag_mode],
            outputs=[k, score_threshold],
            )

        # отобразить в этом месте экрана параметры k и score_threshold
        # число релевантных фргаментов текста для контекста и параметр score_threshold
        k, score_threshold = get_rag_settings(rag_mode)
        with gr.Row()
            k.render()
            score_threshold.render()

        # -------------------- СИСТЕМНЫЙ ПРОМТ И ИТОГОВЫЙ ПРОМТ ---------------------------

        with gr.Accordion('Промт', open=True):
            # окошко для системного промта
            system_prompt = get_system_prompt_component(interactive=support_system_role.value)
            # итоговыый промт который подается в модель
            user_message_with_context = get_user_message_with_context(gr.Chatbot().value, rag_mode.value)

        # ------------------ КНОПКИ ОТПРАВИТЬ ОЧИСТИТЬ И СТОП ------------

        # нажатие Enter и кнопка отправить
        generate_event = gr.on(
            triggers=[user_message.submit, user_message_btn.click],
            fn=get_promt_with_context,
            inputs=[user_message, chatbot, history_len, rag_mode, db, system_prompt, k, score_threshold],
            outputs=[user_message, chatbot, full_promt],
            queue=True,
        ).then(
            fn=lambda promt: promt,
            inputs=full_promt,
            outputs=full_promt,
            queue=False,
        ).then(
            fn=generate_text,
            inputs=[chatbot, full_promt, do_sample, *generate_args],
            outputs=[chatbot],
            queue=True,
            )

        # кнопка Стоп
        stop_btn.click(
            fn=None,
            inputs=None,
            outputs=None,
            cancels=generate_event,
            queue=False,
        )

        # кнопка Очистить чат
        clear_btn.click(
            fn=lambda: (None, ''),
            inputs=None,
            outputs=[chatbot, full_promt],
            queue=False,
            )


    # ===================== СТРАНИЦА ЗАГРУЗКИ ФАЙЛОВ =========================

    with gr.Tab(label='Load documents'):
        with gr.Row(variant='compact'):
            # загрузка файлов и ссылок
            upload_files = gr.File(file_count='multiple', label='Загрузка текстовых файлов')
            web_links = gr.Textbox(lines=6, label='Ссылки на Web сайты или Ютуб')

        with gr.Row(variant='compact'):
            # параметры нарезки текста на фрагменты
            chunk_size = gr.Slider(50, 2000, value=500, step=50, label='Длина фрагментов')
            chunk_overlap = gr.Slider(0, 200, value=20, step=10, label='Длина пересечения фрагментов')

            # язык субтитров
            subtitles_lang = gr.Radio(
                SUBTITLES_LANGUAGES,
                value=SUBTITLES_LANGUAGES[0],
                label='Язык субтитров YouTube',
                )

        # кнопка загрузки документов и инициализации БД
        load_documents_btn = gr.Button(value='Загрузить документы и инициализировать БД')
        # статус прогресса загрузки файлов и инициализации БД
        load_docs_log = gr.Textbox(label='Прогресс загрузки и разделения документов', interactive=False)

        # главный цикл загрузки доков и инициализации ретривера
        load_event = load_documents_btn.click(
            fn=load_documents_and_create_db,
            inputs=[upload_files, web_links, subtitles_lang, chunk_size, chunk_overlap],
            outputs=[documents, db, load_docs_log],
            )

        # сменить название бота на RAG или Chatbot в зависимости от готовности ретривера
        def documents_load_success(chatbot_history, db):
            rag_mode = db is not None
            chatbot, k, score_threshold = get_chatbot_and_rag_settings(chatbot_history, rag_mode)
            rag_mode_checkbox = gr.Checkbox(value=rag_mode, label='Режим RAG', scale=1, visible=rag_mode)
            return rag_mode_checkbox

        load_event.success(
            fn=documents_load_success,
            inputs=[chatbot, db],
            outputs=[rag_mode],
            )


    # ================= СТРАНИЦА ПРОСМОТРА ВСЕХ ДОКУМЕНТОВ =================

    with gr.Tab(label='View documents'):
        # кнопка и текстовое поле для отображения загруженных фрагментов текста
        view_documents_btn = gr.Button(value='Отобразить загруженные фрагменты')
        view_documents_textbox = gr.Textbox(
            lines=1,
            placeholder='Для просмотра фрагментов загрузите документы на вкладке Load documents',
            label='Загруженные фрагменты',
            )
        sep = '=' * 20
        # отображение загруженных фрагментов текста если они готовы
        view_documents_btn.click(
            lambda documents: f'\n{sep}\n\n'.join([doc.page_content for doc in documents]),
            inputs=documents,
            outputs=view_documents_textbox,
        )

    # ============== СТРАНИЦА ЗАГРУЗКИ GGUF МОДЕЛЕЙ =====================

    with gr.Tab('Load model'):
        new_model_id = gr.Textbox(
            value='',
            label='Добавить репозиторий',
            placeholder='Ссылка на репозиторий HF моделей в формате GGUF',
            )
        new_model_btn = gr.Button('Добавить репозиторий')

        model_id = gr.Dropdown(
            choices=MODELS_IDS,
            value=None,
            label='Репозиторий модели HF',
            )

        model_path = gr.Dropdown(
            choices=[],
            value=None,
            label='Файл модели GGUF',
            )

        n_ctx = gr.Slider(500, 500 * 8, step=500, label='n_ctx')
        load_model_btn = gr.Button('Загрука и инициализация модели')

        load_model_log = gr.Textbox(
            value=f'Модель {MODELS_IDS[0]} загружена по умолчанию',
            label='Статус загрузки модели',
            )

        with gr.Group():
            gr.Markdown('Освободить место на диске путем удаления всех моделей кроме текущей')
            remove_models_btn = gr.Button('Очистить папку')

        new_model_btn.click(
            fn=add_new_model_id,
            inputs=[new_model_id, model_ids_state],
            outputs=[model_id, load_model_log],
        ).success(
            fn=lambda: '',
            inputs=None,
            outputs=new_model_id,
        )

        model_id.change(
            fn=get_gguf_model_names,
            inputs=[model_id],
            outputs=[model_path],
        )

        load_model_btn.click(
            fn=load_model,
            inputs=[model_id, model_path, n_ctx],
            outputs=[model, load_model_log],
            queue=True,
        ).success(
            fn=lambda log: log + get_memory_usage(),
            inputs=load_model_log,
            outputs=load_model_log,
        )

        remove_models_btn.click(
            fn=clear_folder,
            inputs=[model_path],
            outputs=None,
        ).success(
            fn=lambda model: f'Модели кроме {model} удалены',
            inputs=model_path,
            outputs=None,
        )

    # ============== СТРАНИЦА ЗАГРУЗКИ ЭМБЕДИНГ МОДЕЛЕЙ =================
    with gr.Tab('Load embed model'):
        new_embed_id = gr.Textbox(
            value='',
            label='Добавить репозиторий',
            placeholder='Ссылка на репозиторий модели HF',
            )
        new_embed_btn = gr.Button('Добавить репозиторий')

        embed_id = gr.Dropdown(
            choices=EMBEDERS_IDS,
            value=None,
            label='Репозиторий модели HF',
            )

        load_embed_btn = gr.Button('Загрука и инициализация модели')
        load_embed_log = gr.Textbox(
            value=f'Модель {EMBEDERS_IDS[0]} загружена по умолчанию',
            label='Статус загрузки модели',
            )
        with gr.Group():
            gr.Markdown('Освободить место на диске путем удаления всех моделей кроме текущей')
            remove_embed_models_btn = gr.Button('Очистить папку')

        new_embed_btn.click(
            fn=add_new_model_id,
            inputs=[new_embed_id, embed_ids_state],
            outputs=[embed_id, load_embed_log],
        ).success(
            fn=lambda: '',
            inputs=None,
            outputs=new_embed_id,
        )

        load_embed_btn.click(
            fn=load_embed_model,
            inputs=[embed_id],
            outputs=[embed_model, load_embed_log],
        ).success(
            fn=lambda log: log + get_memory_usage(),
            inputs=load_embed_log,
            outputs=load_embed_log,
        )

        remove_embed_models_btn.click(
            fn=clear_folder,
            inputs=[embed_id],
            outputs=None,
        ).success(
            fn=lambda model: f'Модели кроме {model} удалены',
            inputs=embed_id,
            outputs=None,
        )


demo.launch(debug=True)  # server_name='0.0.0.0'

## Структура проекта и итоговый код по модулям

Код итогового приложения (не для для запуска в Colab)

**Структура проекта:**

 - 📁 `embed_model`
 - 📁 `model`
 - 📁 `tokenizer`
 - `requirements-base.txt`
 - `requirements-cpu.txt`
 - `requirements-cuda.txt`
 - `app.py`
 - `models.py`
 - `utils.py`



**Установка и запуск:**

1. Создание и активация виртуального окружения (опционально)

 Создание окружения:
 ```
python3 -m venv env
 ```
 Активация окружения:
 - Linux
```
source env/bin/activate
```

 - Windows
```
env\Scripts\activate.bat
```

2. Установка библиотек
 - с поддержкой CPU
 ```
 pip install -r requirements-cpu.txt
 ```

 - с поддержкой CUDA

     - Linux
     ```
     CMAKE_ARGS='-DLLAMA_CUBLAS=on' FORCE_CMAKE=1 pip install -r requirements-cuda.txt
     ```
     - Windows - потребуются дополнительно инструменты для сборки, [инструкции](https://github.com/abetlen/llama-cpp-python?tab=readme-ov-file#installation)
     ```
     set CMAKE_ARGS=-DLLAMA_CUBLAS=ON && set CMAKE_ARGS=-DLLAMA_CUBLAS=ON
     pip install llama-cpp-python
     ```

3. Запуск приложения
```
python3 app.py
```
 Перейти в браузере http://localhost:7860/ после того как появится надпись `Running on local URL:  http://127.0.0.1:7860`

Чтобы заменить модели и токенайзер на другие, изменить переменные в модуле `models.py`:

 - `tokenizer_name` - название репозитория на HF
 - `embed_model_name` - название репозитория на HF
 - `llm_model_url` - прямая ссылка на модель в формате GGUF

**Содержимое файлов `requirements:`**

`requirements-base.txt`
```
accelerate==0.31.0
gradio==4.36.1
langchain==0.2.3
langchain-community==0.2.4
langchain-core==0.2.5
langchain-huggingface==0.0.3
langchain-text-splitters==0.2.1
llama_cpp_python==0.2.77
pdfminer.six==20231228
sentence-transformers==3.0.1
transformers==4.41.2
youtube-transcript-api==0.6.2
```

`requirements-cpu.txt`
```
--extra-index-url https://download.pytorch.org/whl/cpu
torch==2.3.0
faiss-cpu==1.8.0
-r ./requirements-base.txt
```

`requirements-cuda.txt`
```
--extra-index-url https://download.pytorch.org/whl/cu121
torch==2.3.0
faiss-cpu==1.8.0
-r ./requirements-base.txt
```

Если в системе установлена CUDA не 12 а 11 версии - заменить в `requirements-cuda.txt`  
`https://download.pytorch.org/whl/cu121`  
на  
`https://download.pytorch.org/whl/cu118`  
для корректной устаноки Pytorch

Версия `Python:` `3.8+` (проверено на `3.10`)

Папка `env` с зависимостями для CPU занимает 1.5Gb, для CUDA 6Gb

### Модуль `models.py`

Модуль `models.py` для загрузки и инициализации моделей и токенайзера

In [None]:
from pathlib import Path
from typing import List, Tuple, Dict, Union, Iterable, Optional, Any
from tqdm import tqdm

import requests
import torch

from transformers import AutoTokenizer, GenerationConfig
from langchain_huggingface import HuggingFaceEmbeddings
from llama_cpp import Llama


def download_file(file_url: str, file_path: Union[str, Path]) -> None:
    response = requests.get(file_url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    chunk_size = 4096  # 4 Kb
    progress_bar = tqdm(desc='Загрузка файла GGUF', total=total_size, unit='iB', unit_scale=True)

    with open(file_path, 'wb') as file:
        for data in response.iter_content(chunk_size):
            file.write(data)
            progress_bar.update(len(data))


print('Инициализация токенайзера ...')

# не забыть установить токенайзер, соотвествующий модели GGUF
# tokenizer_name = 'openchat/openchat-3.6-8b-20240522'
tokenizer_name = 'unsloth/gemma-2-2b-it'  # (оригинальный репозиторий google/gemma-2-2b требует HF токен)

tokenizer_path = Path('tokenizer')
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, cache_dir=tokenizer_path)


print('Инициализация модели эмбедингов ...')
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# embed_model_name = 'sentence-transformers/all-mpnet-base-v2'
embed_model_name = 'cointegrated/rubert-tiny2'

embed_model_path = Path('embed_model')
embed_model = HuggingFaceEmbeddings(
    model_name=embed_model_name,
    model_kwargs={'device': device},  # cpu cuda
    cache_folder=str(embed_model_path),
    )

# llm_model_url = 'https://huggingface.co/bartowski/openchat-3.6-8b-20240522-GGUF/resolve/main/openchat-3.6-8b-20240522-IQ4_XS.gguf'
llm_model_url = 'https://huggingface.co/bartowski/gemma-2-2b-it-GGUF/resolve/main/gemma-2-2b-it-Q8_0.gguf'

llm_model_path = Path('model') / llm_model_url.rsplit('/')[-1]
llm_model_path.parent.mkdir(exist_ok=True)

# если модель отсутствует в папке models то нужно ее загрузить - например через библиотеку wget
if not llm_model_path.is_file():
    if not str(llm_model_url).endswith('.gguf'):
        raise Exception('Ссылка на модель LLM должна быть прямой ссылкой на файл GGUF')
    else:
        print('Загрузка файла LLM модели ...')
        download_file(llm_model_url, llm_model_path)

print('Инициализация LLM модели ...')
llm_model = Llama(model_path=str(llm_model_path), n_gpu_layers=-1, n_ctx=4096)

Инициализация токенайзера ...


Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Инициализация модели эмбедингов ...
Загрузка файла LLM модели ...


Загрузка файла GGUF: 100%|██████████| 3.08G/3.08G [00:51<00:00, 59.4MiB/s]
llama_model_loader: loaded meta data with 23 key-value pairs and 291 tensors from model/openchat-3.5-0106.Q2_K.gguf (version GGUF V3 (latest))
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = llama
llama_model_loader: - kv   1:                               general.name str              = openchat_openchat-3.5-0106
llama_model_loader: - kv   2:                       llama.context_length u32              = 8192
llama_model_loader: - kv   3:                     llama.embedding_length u32              = 4096
llama_model_loader: - kv   4:                          llama.block_count u32              = 32
llama_model_loader: - kv   5:                  llama.feed_forward_length u32              = 14336
llama_model_loader: - kv   6:                 llama.rope.dimension_count u32    

Инициализация LLM модели ...


llm_load_print_meta: ssm_d_state      = 0
llm_load_print_meta: ssm_dt_rank      = 0
llm_load_print_meta: model type       = 7B
llm_load_print_meta: model ftype      = Q2_K - Medium
llm_load_print_meta: model params     = 7.24 B
llm_load_print_meta: model size       = 2.87 GiB (3.41 BPW) 
llm_load_print_meta: general.name     = openchat_openchat-3.5-0106
llm_load_print_meta: BOS token        = 1 '<s>'
llm_load_print_meta: EOS token        = 32000 '<|end_of_turn|>'
llm_load_print_meta: UNK token        = 0 '<unk>'
llm_load_print_meta: LF token         = 13 '<0x0A>'
llm_load_tensors: ggml ctx size =    0.15 MiB
llm_load_tensors:        CPU buffer size =  2939.58 MiB
..................................................................................................
llama_new_context_with_model: n_ctx      = 512
llama_new_context_with_model: n_batch    = 512
llama_new_context_with_model: n_ubatch   = 512
llama_new_context_with_model: flash_attn = 0
llama_new_context_with_model: freq_base  = 

Другие варианты загрузки репозиториев с HF

In [None]:
from huggingface_hub import hf_hub_download, snapshot_download

snapshot_download(
    repo_id=model_id,
    local_dir=folder_path,
    ignore_patterns='*.h5',
)

hf_hub_download(
    repo_id=model_id,
    filename=model_file,
    local_dir=MODELS_PATH,
    )

### Модуль `utils.py`

Модуль с функциями `utils.py`

In [None]:
import csv
from pathlib import Path
from typing import List, Tuple, Dict, Union, Optional, Any
from tqdm import tqdm

import requests
from requests.exceptions import MissingSchema

import gradio as gr
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound, TranscriptsDisabled

from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
from langchain_community.vectorstores import FAISS

from langchain_community.document_loaders import (
    CSVLoader,
    PDFMinerLoader,
    PyPDFLoader,
    TextLoader,
    UnstructuredHTMLLoader,
    UnstructuredMarkdownLoader,
    UnstructuredPowerPointLoader,
    UnstructuredWordDocumentLoader,
    WebBaseLoader,
    YoutubeLoader,
    DirectoryLoader,
)

# annotations
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from langchain_core.retrievers import BaseRetriever
from langchain.docstore.document import Document
from langchain_core.vectorstores import VectorStore
from langchain_core.embeddings import Embeddings

from models import tokenizer, embed_model, llm_model


# ========================= ПАРАМЕТРЫ ===============================

LOADER_CLASSES = {
    '.csv': CSVLoader,
    '.doc': UnstructuredWordDocumentLoader,
    '.docx': UnstructuredWordDocumentLoader,
    '.html': UnstructuredHTMLLoader,
    '.md': UnstructuredMarkdownLoader,
    '.pdf': PDFMinerLoader,
    '.ppt': UnstructuredPowerPointLoader,
    '.pptx': UnstructuredPowerPointLoader,
    '.txt': TextLoader,
    'web': WebBaseLoader,
    'directory': DirectoryLoader,
    'youtube': YoutubeLoader,
}

GENERATE_KWARGS = dict(
    temp=0.5,
    top_k=40,
    top_p=1,
    repeat_penalty=1,
)
CONTEXT_TEMPLATE = '''Ответь на вопрос при условии контекста

Контекст:
{context}

Дан вопрос:
{user_message}

Ответ:'''


# ============================= ФУНКЦИИ =============================


def clear_text(text: str) -> str:
    lines = text.split('\n')
    lines = [line for line in lines if len(line.strip()) > 2]
    text = '\n'.join(lines).strip()
    return text


def clear_documents(documents: List[Document]) -> List[Document]:
    output_documents = []
    for document in documents:
        text = clear_text(document.page_content)
        if len(text) > 10:
            document.page_content = text
            output_documents.append(document)
    return output_documents


def get_csv_delimiter(file_path: str) -> str:
    n_bytes = 4096
    with open(file_path) as csvfile:
        delimiter = csv.Sniffer().sniff(csvfile.read(n_bytes)).delimiter
    return delimiter


def check_subtitles_available(yt_video_link: str, target_lang: str) -> Tuple[bool, str]:
    video_id = yt_video_link.split('watch?v=')[-1].split('&')[0]
    load_log = ''
    available = True
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        try:
            transcript = transcript_list.find_transcript([target_lang])
            if transcript.is_generated:
                load_log += f'Будут загружены автоматические субтитры, ручные недоступны для видео {yt_video_link}\n'
            else:
                load_log += f'Будут загружены ручные субтитры для видео {yt_video_link}\n'
        except NoTranscriptFound:
            load_log += f'Язык субтитров {target_lang} недоступен для видео {yt_video_link}\n'
            available = False
    except TranscriptsDisabled:
        load_log += f'Нет субтитров для видео {yt_video_link}\n'
        available = False
    return available, load_log


def load_documents_from_files(upload_files: List[str]) -> Tuple[List[Document], str]:
    load_log = ''
    documents = []
    for upload_file in upload_files:
        file_extension = f".{upload_file.split('.')[-1]}"
        if file_extension in LOADER_CLASSES:
            loader_class = LOADER_CLASSES[file_extension]
            loader_kwargs = {}
            if file_extension == '.csv':
                delimiter = get_csv_delimiter(upload_file)
                loader_kwargs = {'csv_args': {'delimiter': delimiter}}
            try:
                load_documents = loader_class(upload_file, **loader_kwargs).load()
                documents.extend(load_documents)
            except Exception as ex:
                load_log += f'Ошибка загрузки файла {upload_file}\n'
                load_log += f'Код ошибки: {ex}\n'
                continue
        else:
            load_log += f'Неподдерживаемый формат файла {upload_file}\n'
            continue
    return documents, load_log


def load_documents_from_links(web_links: str, subtitles_lang: str) -> Tuple[List[Document], str]:
    load_log = ''
    documents = []
    loader_class_kwargs = {}
    web_links = [web_link.strip() for web_link in web_links.split('\n') if web_link.strip()]

    for web_link in web_links:
        if 'youtube.com' in web_link:
            available, log = check_subtitles_available(web_link, subtitles_lang)
            load_log += log
            if not available:
                continue
            loader_class = LOADER_CLASSES['youtube'].from_youtube_url
            loader_class_kwargs = {'language': subtitles_lang}

        else:
            loader_class = LOADER_CLASSES['web']
        try:
            if requests.get(web_link).status_code != 200:
                load_log += f'Ссылка недоступна для Python requests: {web_link}\n'
                continue
            load_documents = loader_class(web_link, **loader_class_kwargs).load()
            if len(load_documents) == 0:
                load_log += f'Фрагменты текста не были найдены по ссылке: {web_link}\n'
                continue
            documents.extend(load_documents)
        except MissingSchema:
            load_log += f'Неверная ссылка: {web_link}\n'
            continue
        except Exception as ex:
            load_log += f'Ошибка загрузки web лоадером данных по ссылке: {web_link}\n'
            load_log += f'Код ошибки: {ex}\n'
            continue
    return documents, load_log


def load_documents_and_create_db(
        upload_files: Optional[List[str]],
        web_links: str,
        subtitles_lang: str,
        chunk_size: int,
        chunk_overlap: int,
        ) -> Tuple[List[Document], Optional[BaseRetriever], str]:

    load_log = ''
    all_documents = []
    db = None
    progress = gr.Progress()

    if upload_files is None and not web_links:
        load_log = 'Не выбраны файлы или ссылки'
        return all_documents, db, load_log

    if upload_files is not None:
        progress(0.3, desc='Шаг 1/2: Загрузка документов из файлов')
        docs, log = load_documents_from_files(upload_files)
        all_documents.extend(docs)
        load_log += log

    if web_links:
        progress(0.3 if upload_files is None else 0.5, desc='Шаг 1/2: Загрузка документов по ссылкам')
        docs, log = load_documents_from_links(web_links, subtitles_lang)
        all_documents.extend(docs)
        load_log += log

    if len(all_documents) == 0:
        load_log += 'Загрузка прервана так как не было извлечено ни одного документа\n'
        load_log += 'Режим RAG не может быть активирован'
        return all_documents, db, load_log

    load_log += f'Загружено документов: {len(all_documents)}\n'

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    documents = text_splitter.split_documents(all_documents)
    documents = clear_documents(documents)
    load_log += f'Документы разделены, кол-во фрагментов текста: {len(documents)}\n'

    progress(0.7, desc='Шаг 2/2: Инициализация БД')
    db = FAISS.from_documents(documents=documents, embedding=embed_model)
    load_log += 'Режим RAG активирован и может быть дективирован на вкладке Generate'
    return documents, db, load_log


def get_promt_with_context(
        user_message: str,
        chatbot: List[List[Optional[str]]],
        history_len: int,
        rag_mode: bool,
        db: VectorStore,
        system_prompt: str,
        k: Union[int, str],
        score_threshold: float,
        ) -> Tuple[str, List[List[Optional[str]]], str]:

    chatbot.append([user_message, None])
    if not user_message.strip():
        return '', chatbot, full_promt

    messages = []
    if history_len == 0 and system_prompt:
        messages.append({'role': 'system', 'content': system_prompt})

    if history_len != 0:
        for user_msg, bot_msg in chatbot[:-1][-history_len:]:
            messages.append({'role': 'user', 'content': user_msg})
            messages.append({'role': 'assistant', 'content': bot_msg})

    if db is not None and rag_mode:
        if k == 'max':
            k = len(documents)

        docs_and_distances = db.similarity_search_with_relevance_scores(
            user_message,
            k=k,
            score_threshold=score_threshold,
            )

        if len(docs_and_distances) == 0:
            gr.Info((
                f'Релевантные запросу документы не найдены, '
                f'используется промт без контекста (попробуйте уменьшить searh_score_threshold)'
                ))
        else:
            retriever_context = '\n\n'.join([doc[0].page_content for doc in docs_and_distances])
            user_message = CONTEXT_TEMPLATE.format(
                user_message=user_message,
                context=retriever_context,
                )

    messages.append({'role': 'user', 'content': user_message})
    full_promt = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True,
        )
    return '', chatbot, full_promt


def generate_text(
        chatbot: List[List[Optional[str]]],
        full_promt: str,
        do_sample: bool,
        *generate_args: List[Union[int, float]],
        ) -> List[List[Optional[str]]]:

    if chatbot[-1][0].strip() == '':
        yield chatbot[:-1]

    else:
        tokens_indxs = llm_model.tokenize(full_promt.encode('utf-8'), special=True, add_bos=False)
        gen_kwargs = dict(zip(GENERATE_KWARGS.keys(), generate_args))
        if not do_sample:
            gen_kwargs['top_k'] = 1
            gen_kwargs['repeat_penalty'] = 1

        chatbot[-1][1] = ''
        generator = llm_model.generate(tokens_indxs, **gen_kwargs)
        for token_indx in generator:
            if token_indx == llm_model.token_eos():
                break
            token = llm_model.detokenize([token_indx]).decode('utf-8', errors='ignore')
            chatbot[-1][1] += token
            yield chatbot

### Файл приложения `app.py`

Главный файл приложения `app.py`

In [None]:
from pathlib import Path
from typing import List, Tuple, Dict, Union, Iterable, Optional, Any

import requests
import gradio as gr
import torch

from utils import (
    load_documents_and_create_db,
    get_promt_with_context,
    generate_text,
)


# ========================= ПАРАМЕТРЫ ===============================

SUBTITLES_LANGUAGES = ['ru', 'en']
HISTORY_LEN = 0
GENERATE_KWARGS = dict(
    temp=0.5,
    top_k=40,
    top_p=1,
    repeat_penalty=1,
)


# ============================ ПРИЛОЖЕНИЕ ==============================

CSS = '''
.gradio-container {width: 70% !important}
'''

with gr.Blocks(theme=gr.themes.Monochrome(), css=CSS) as demo:

    # ==================== СОСТОЯНИЯ ===============================

    documents = gr.State([])
    db = gr.State(None)
    full_promt = gr.State('')


    # ==================== СТРАНИЦА БОТА =================================

    def get_chatbot_and_rag_settings(chatbot_history: list = [], rag_mode: bool = False):
        label = 'RAG' if rag_mode else 'Chatbot'
        chatbot = gr.Chatbot(
                value=chatbot_history,
                show_copy_button=True,
                bubble_full_width=False,
                label=label,
                # height=300,
                )
        k = gr.Radio(
            choices=[1, 2, 3, 'max'],
            value=2,
            label='Количество релевантных документов для поиска',
            visible=rag_mode,
            render=False,
            )
        score_threshold = gr.Slider(
            label='searh_score_threshold',
            value=0.5,
            minimum=0,
            maximum=1,
            step=0.1,
            visible=rag_mode,
            render=False,
            )
        return chatbot, k, score_threshold


    with gr.Tab(label='Generate'):
        with gr.Row():
            with gr.Column(scale=3):
                chatbot, k, score_threshold = get_chatbot_and_rag_settings()
                user_message = gr.Textbox(label='User')
                with gr.Row():
                    user_message_btn = gr.Button('Отправить')
                    stop_btn = gr.Button('Стоп')
                    clear_btn = gr.Button('Очистить чат')

            # ------------------ ПАРАМЕТРЫ ГЕНЕРАЦИИ -------------------------
            def get_generate_args(do_sample: bool):
                visible = do_sample
                generate_args = [
                    gr.Slider(0.1, 4, GENERATE_KWARGS['temp'], step=0.1, label='temperature', visible=visible),
                    gr.Slider(5, 50, GENERATE_KWARGS['top_k'], step=5, label='top_k', visible=visible),
                    gr.Slider(0.1, 1, GENERATE_KWARGS['top_p'], step=0.1, label='top_p', visible=visible),
                    gr.Slider(1, 3, GENERATE_KWARGS['repeat_penalty'], step=0.1, label='repeat penalty', visible=visible),
                ]
                return generate_args

            with gr.Column(scale=1, min_width=80):
                with gr.Group():
                    gr.Markdown('Размер истории')
                    history_len = gr.Slider(
                        minimum=0,
                        maximum=5,
                        value=HISTORY_LEN,
                        step=1,
                        info='Кол-во предыдущих сообщенией, учитываемых в истории',
                        label='history len',
                        show_label=False,
                        )

                    with gr.Group():
                        gr.Markdown('Параметры генерации')
                        do_sample = gr.Checkbox(
                            value=False,
                            label='do_sample',
                            info='Активация случайного семплирования',
                            )
                        generate_args = get_generate_args(do_sample.value)
                        do_sample.change(
                            fn=get_generate_args,
                            inputs=do_sample,
                            outputs=generate_args,
                            show_progress=False,
                            )

        rag_mode = gr.Checkbox(value=False, label='Режим RAG', scale=1, visible=False)
        rag_mode.change(
            fn=get_chatbot_and_rag_settings,
            inputs=[chatbot, rag_mode],
            outputs=[chatbot, k, score_threshold],
            )

        k.render()
        score_threshold.render()

        # -------------------- СИСТЕМНЫЙ ПРОМТ И ИТОГОВЫЙ ПРОМТ ---------------------------

        with gr.Accordion('Промт', open=True):
            system_prompt = gr.Textbox('', label='Задать системный промт')
            full_promt = gr.Textbox(label='Полный текст текущего промта с контекстом', interactive=False)

        # ------------------ КНОПКИ ОТПРАВИТЬ ОЧИСТИТЬ И СТОП ------------

        generate_event = gr.on(
            triggers=[user_message.submit, user_message_btn.click],
            fn=get_promt_with_context,
            inputs=[user_message, chatbot, history_len, rag_mode, db, system_prompt, k, score_threshold],
            outputs=[user_message, chatbot, full_promt],
            queue=True,
        ).then(
            fn=lambda promt: promt,
            inputs=full_promt,
            outputs=full_promt,
            queue=False,
        ).then(
            fn=generate_text,
            inputs=[chatbot, full_promt, do_sample, *generate_args],
            outputs=chatbot,
            queue=True,
            )

        stop_btn.click(
            fn=None,
            inputs=None,
            outputs=None,
            cancels=generate_event,
            queue=False,
        )

        clear_btn.click(
            fn=lambda: (None, ''),
            inputs=None,
            outputs=[chatbot, full_promt],
            queue=False,
            )


    # ===================== СТРАНИЦА ЗАГРУЗКИ ФАЙЛОВ =========================

    with gr.Tab(label='Load documents'):
        with gr.Row(variant='compact'):
            upload_files = gr.File(file_count='multiple', label='Загрузка текстовых файлов')
            web_links = gr.Textbox(lines=6, label='Ссылки на Web сайты или Ютуб')

        with gr.Row(variant='compact'):
            chunk_size = gr.Slider(50, 2000, value=500, step=50, label='Длина фрагментов')
            chunk_overlap = gr.Slider(0, 200, value=20, step=10, label='Длина пересечения фрагментов')

            subtitles_lang = gr.Radio(
                SUBTITLES_LANGUAGES,
                value=SUBTITLES_LANGUAGES[0],
                label='Язык субтитров YouTube',
                )

        load_documents_btn = gr.Button(value='Загрузить документы и инициализировать БД')
        load_docs_log = gr.Textbox(label='Прогресс загрузки и разделения документов', interactive=False)

        load_event = load_documents_btn.click(
            fn=load_documents_and_create_db,
            inputs=[upload_files, web_links, subtitles_lang, chunk_size, chunk_overlap],
            outputs=[documents, db, load_docs_log],
            )

        def documents_load_success(chatbot_history, db):
            rag_mode = db is not None
            chatbot, k, score_threshold = get_chatbot_and_rag_settings(chatbot_history, rag_mode)
            rag_mode_checkbox = gr.Checkbox(value=rag_mode, label='Режим RAG', scale=1, visible=rag_mode)
            return rag_mode_checkbox

        load_event.success(
            fn=documents_load_success,
            inputs=[chatbot, db],
            outputs=[rag_mode],
            )


    # ================= СТРАНИЦА ПРОСМОТРА ВСЕХ ДОКУМЕНТОВ =================

    with gr.Tab(label='View documents'):
        view_documents_btn = gr.Button(value='Отобразить загруженные фрагменты')
        view_documents_textbox = gr.Textbox(
            lines=1,
            placeholder='Для просмотра фрагментов загрузите документы на вкладке Load documents',
            label='Загруженные фрагменты',
            )
        sep = '=' * 20
        view_documents_btn.click(
            lambda documents: f'\n{sep}\n\n'.join([doc.page_content for doc in documents]),
            inputs=documents,
            outputs=view_documents_textbox,
        )

# для докера устанвоить server_name='0.0.0.0' либо в Dockerfile установить ENV GRADIO_SERVER_NAME='0.0.0.0'
demo.launch()

## Docker

Содержимое всех файлов кроме `*docker*` такое же как в разделе `Итоговое приложение` выше

Инструкции по образам Docker для `llama-cpp-python`  
https://github.com/abetlen/llama-cpp-python/tree/main/docker

**Структура проекта:**

 - 📁 `embed_model`
 - 📁 `model`
 - 📁 `tokenizer`
 - `.dockerignore`
 - `Dockerfile-cpu`
 - `Dockerfile-cuda`
 - `requirements-base.txt`
 - `requirements-cpu.txt`
 - `requirements-cuda.txt`
 - `app.py`
 - `models.py`
 - `utils.py`



**Сборка образа и запуск контейнера:**

1. Сборка образа  

 - с поддержкой CPU
```
docker build -t chatbot-rag-cpu -f Dockerfile-cpu .
```
 - с поддержкой CUDA
```
docker build -t chatbot-rag-cuda -f Dockerfile-cuda .
```

2. Запуск контейнера на 7860 порту с пробросом папок через `docker volumes`
 - с поддержкой CPU
```
docker run -it -p 7860:7860 \
	-v $(pwd)/tokenizer:/app/tokenizer \
	-v $(pwd)/embed_model:/app/embed_model \
	-v $(pwd)/model:/app/model \
	chatbot-rag-cpu
```

 - с поддержкой CUDA
```
docker run -it --gpus all --ipc=host -p 7860:7860 \
	-v $(pwd)/tokenizer:/app/tokenizer \
	-v $(pwd)/embed_model:/app/embed_model \
	-v $(pwd)/model:/app/model \
	chatbot-rag-cuda
```

Перейти в браузере http://localhost:7860/ после того как появится надпись `Running on local URL:  http://0.0.0.0:7860`

Образ CPU занимает 2.65GB  
Образ CUDA занимает 13.9GB

**Содержимое `.dockerignore`:**  
```
__pycache__
model/*
embed_model/*
tokenizer/*
env*
*.ipynb
```

**Содержимое `Dockerfile-cpu`**
```
FROM python:3.10
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir -r requirements-cpu.txt
EXPOSE 7860
ENV GRADIO_SERVER_NAME='0.0.0.0'
CMD ['python', 'app.py']
```

**Содержимое `Dockerfile-cuda`**
```
FROM nvidia/cuda:12.1.1-devel-ubuntu22.04
WORKDIR /app
COPY . .
RUN apt-get update && apt-get upgrade -y \
    && apt-get install -y git build-essential \
    python3 python3-pip gcc wget \
    ocl-icd-opencl-dev opencl-headers clinfo \
    libclblast-dev libopenblas-dev \
    && mkdir -p /etc/OpenCL/vendors && echo 'libnvidia-opencl.so.1' > /etc/OpenCL/vendors/nvidia.icd

ENV CUDA_DOCKER_ARCH=all
ENV LLAMA_CUDA=1
ENV CMAKE_ARGS='-DLLAMA_CUBLAS=ON'

RUN pip install --no-cache-dir -r requirements-cuda.txt
EXPOSE 7860
ENV GRADIO_SERVER_NAME='0.0.0.0'
CMD ['python3', 'app.py']
```

Для других версий CUDA заменить в `Dockerfile-cuda` соответствующий базовый образ вместо  
```
FROM nvidia/cuda:12.1.1-devel-ubuntu22.04
```