# RAG-system prototype

In [1]:
import os
import json
from dotenv import load_dotenv
import pytesseract
from PIL import Image
import docx
import pandas as pd
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.schema import HumanMessage, SystemMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document as LangchainDocument
from langchain.vectorstores import FAISS
import magic
import re
from io import BytesIO
import base64
from tqdm import tqdm
import zipfile
import hashlib
import logging
from typing import List, Dict, Tuple, Optional
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity

print('Libs Done.')

Libs Done.


In [2]:
# Load .env params
load_dotenv()
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
print(f"OPENAI_API_KEY: ...{OPENAI_API_KEY[-3:]}")


# --- Settings ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'  # 4 Linux/macOS


MODEL_NAME = "gpt-4o-mini"
llm = ChatOpenAI(model=MODEL_NAME, temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")

# Simple cache on the basis of files
INDEX_PATH = "faiss_index"
IMAGE_CACHE_FILE = "image_cache.json"
RAG_CACHE_FILE = "rag_cache.json"
IMAGES_DIR = "images"
CHUNKS_LOG_FILE = "chunks.json"
LINKS_DICT_FILE = "links.json"

FORMULA_AREA_THRESHOLD = 50000
MAX_TOKENS_PER_BATCH = 250000


def is_image_file(file_path: str) -> bool:
    """
    Проверка на изображение
    """
    try:
        mime = magic.Magic(mime=True)
        mime_type = mime.from_file(file_path)
        return mime_type.startswith('image')
    except Exception as e:
        logger.error(f"is_image_file Exception: checking MIME for {file_path}: {e}")
        return False

def convert_to_png(image_data: bytes, output_path: str) -> bool:
    """
    Конвертация байтового изображения в PNG (с поддержкой WMF/EMF).
    """
    try:
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        try:
            img = Image.open(BytesIO(image_data))
            img.save(output_path, 'PNG')
            return True
        except Exception:
            pass
        try:
            from wand.image import Image as WandImage
            with WandImage(blob=image_data) as img:
                img.format = 'png'
                img.save(filename=output_path)
            return True
        except ImportError:
            logger.warning("Wand is not installed.")
            return False
        except Exception as e:
            logger.error(f"convert_to_png Exception: Wand for {output_path}: {e}")
            return False
    except Exception as e:
        logger.error(f"convert_to_png Exception: convert to PNG {output_path}: {e}")
        return False

def image_to_base64(img_path: str) -> Optional[str]:
    logger.info(f"Попытка конвертации изображения: {img_path}")
    try:
        if not os.path.exists(img_path):
            logger.warning(f"Изображение не найдено: {img_path}")
            return None
        img = Image.open(img_path)
        buffered = BytesIO()
        img.save(buffered, format="PNG")
        logger.info(f"Изображение {img_path} успешно конвертировано в base64")
        return base64.b64encode(buffered.getvalue()).decode('utf-8')
    except Exception as e:
        logger.error(f"image_to_base64 Exception: {img_path}: {e}")
        return None

def ocr_image(img_path: str, lang='eng+rus+equ') -> str:
    try:
        if not os.path.exists(img_path):
            return ""
        img = Image.open(img_path)
        text = pytesseract.image_to_string(img, lang=lang)
        return text.strip()
    except Exception as e:
        logger.error(f"ocr_image Exception: {img_path}: {e}")
        return ""

def extract_images_from_zip(doc_path: str, images_dir: str) -> List[str]:
    """
    Извлечение изображений из .docx like ZIP.
    """
    images = []
    os.makedirs(images_dir, exist_ok=True)
    try:
        with zipfile.ZipFile(doc_path, 'r') as zip_ref:
            for file_name in zip_ref.namelist():
                if file_name.startswith('word/media/') and any(ext in file_name.lower() for ext in ['jpg', 'jpeg', 'png', 'bmp', 'gif', 'wmf']):
                    try:
                        image_data = zip_ref.read(file_name)
                        hash_id = hashlib.md5(image_data).hexdigest()
                        image_path = os.path.join(images_dir, f"img_{hash_id[:8]}.png")
                        if not os.path.exists(image_path):
                            if convert_to_png(image_data, image_path) and is_image_file(image_path):
                                images.append(image_path)
                        else:
                            images.append(image_path)
                    except Exception as e:
                        logger.error(f"extract_images_from_zip Error: processing {file_name}: {e}")
    except Exception as e:
        logger.error(f"extract_images_from_zip Exception: {e}")
    return images

def extract_links(text: str) -> Dict[str, None]:
    matches = \
    re.findall(
        r'(рисунок|таблица|формула|приложение|карта)\s+([\d\wа-яА-Яa-zA-Z\.,\s\-/]+?)(?=\s*[\(\.\,\;]|\s*$)', 
        text, 
        re.IGNORECASE
    )
    return {f"{m[0].lower()} {m[1].strip('.')}": None for m in matches}

def extract_from_docx(
    doc_path: str, 
    images_dir: str = IMAGES_DIR,
    images_cache_file: str = IMAGE_CACHE_FILE,
    chunks_log_file: str = CHUNKS_LOG_FILE,
    links_dict_file: str = LINKS_DICT_FILE) -> Tuple[List[LangchainDocument], Dict[str, str], bool]:
    
    if not os.path.exists(doc_path):
        raise FileNotFoundError(f"Файл {doc_path} не найден")
    documents = []
    links = {}
    current_section = ""
    current_subsection = ""
    image_index = 0
    table_index = 0
    fallback_used = False
    doc_filename = os.path.basename(doc_path)
    os.makedirs(images_dir, exist_ok=True)
    image_cache = {}
    zip_image_index = {}
    zip_images = extract_images_from_zip(doc_path, images_dir)
    
    for img in zip_images:
        try:
            with open(img, 'rb') as f:
                image_data = f.read()
                hash_id = hashlib.md5(image_data).hexdigest()
                zip_image_index[hash_id] = img
        except Exception as e:
            logger.error(f"extract_from_docx Exception: hashing {img}: {e}")
    try:
        doc = docx.Document(doc_path)
        pending_header = None
        section_level = 0  # 0: section, 1: subsection
        for para in doc.paragraphs:
            text = para.text.strip()
            style_name = para.style.name if para.style else ""
            
            if style_name.startswith("Heading") or (text.isupper() and len(text) < 120 and re.match(r'^[А-ЯA-Z0-9\s\-\.\(\)]{5,}$', text)):
                if "БИБЛИОГРАФИЯ" in text.upper():
                    current_section = "БИБЛИОГРАФИЯ"
                    current_subsection = ""
                elif re.match(r'^\d+\.\d+', text):  # Подраздел
                    current_subsection = text.strip()
                    section_level = 1
                else:
                    current_section = text.strip()
                    current_subsection = ""  # Reset
                    section_level = 0
                    
            if any(k in style_name.lower() for k in ["caption", "подпись"]) or \
               re.search(r'(рисунок|таблица|формула|карта|приложение)\s+([\d\w\.,\s\-]+)', text, re.IGNORECASE):
                if len(text) <= 500:
                    pending_header = text.strip()
                    
            if text and not style_name.startswith("Heading") and "caption" not in style_name.lower():
                header_context = f"{current_section} → {current_subsection}".strip(" → ")
                content = f"{header_context}\n{text}" if header_context else text
                local_links = extract_links(content)
                keywords = [current_section.lower(), current_subsection.lower()] + list(local_links.keys())
                keywords += re.findall(r'\b(коэффициент|зона|район|регион|область|край|республика)\b', content.lower(), re.IGNORECASE)
                doc_item = LangchainDocument(
                    page_content=content,
                    metadata={
                        "source": f"para_{len(documents)}",
                        "type": "text",
                        "links": local_links,
                        "filename": doc_filename,
                        "header": header_context,
                        "section": current_section,
                        "subsection": current_subsection,
                        "keywords": list(set(keywords))
                    }
                )
                documents.append(doc_item)
                
            for run in para.runs:
                for blip in run._element.xpath('.//a:blip'):
                    rel_id = blip.embed
                    if rel_id in doc.part.rels:
                        try:
                            rel = doc.part.rels[rel_id]
                            if "image" in rel.reltype:
                                image_data = rel.target_part.blob
                                hash_id = hashlib.md5(image_data).hexdigest()
                                
                                if hash_id in zip_image_index:
                                    image_path = zip_image_index[hash_id]
                                    logger.warning(f"Обнаружено дублирующееся изображение с хэшем {hash_id}, используется: {image_path}")
                                else:
                                    image_index += 1
                                    image_path = os.path.join(images_dir, f"image_{image_index}.png")
                                    if convert_to_png(image_data, image_path) and is_image_file(image_path):
                                        zip_image_index[hash_id] = image_path
                                    else:
                                        continue
                                        
                                img = Image.open(image_path)
                                width, height = img.size
                                area = width * height
                                final_header = pending_header or current_subsection or f"Изображение {image_index}"
                                full_header = f"{current_section}\n{final_header}"
                                ocr_text = ""
                                if area < FORMULA_AREA_THRESHOLD:
                                    ocr_text = ocr_image(image_path)
                                    page_content = f"{final_header}\n{ocr_text}"
                                    doc_type = "formula"
                                    link_key_match = re.search(r'(формула|рисунок)\s+([\d\w\.,\s\-]+)', final_header, re.IGNORECASE)
                                    link_key = f"формула {link_key_match.group(2)}" if link_key_match else f"формула {image_index}"
                                    links[link_key] = ocr_text
                                else:
                                    page_content = final_header  # Без OCR
                                    doc_type = "image"
                                    link_key_match = re.search(r'(рисунок|карта|приложение)\s+([\d\w\.,\s\-]+)', final_header, re.IGNORECASE)
                                    link_key = f"{link_key_match.group(1).lower()} {link_key_match.group(2)}" if link_key_match else f"рисунок {image_index}"
                                    links[link_key] = image_path
                                    
                                if not any(d.metadata["source"] == f"img_{image_index}" for d in documents):
                                    keywords = [current_section.lower(), final_header.lower(), link_key.lower()]
                                    keywords += re.findall(r'\b(коэффициент|зона|район|регион|область|край|республика|карта)\b', final_header.lower(), re.IGNORECASE)  # Из header, не OCR
                                    img_doc = LangchainDocument(
                                        page_content=page_content,
                                        metadata={
                                            "source": f"img_{image_index}",
                                            "type": doc_type,
                                            "links": {link_key: links[link_key]},
                                            "filename": doc_filename,
                                            "header": final_header,
                                            "section": current_section,
                                            "subsection": current_subsection,
                                            "keywords": list(set(keywords))
                                        }
                                    )
                                    documents.append(img_doc)
                                    
                                if image_path not in image_cache:
                                    image_cache[image_path] = {
                                        "ocr": ocr_text,
                                        "size": (width, height),
                                        "header": full_header,
                                        "type": doc_type
                                    }
                                    
                                if pending_header:
                                    pending_header = None
                        except Exception as e:
                            logger.error(f"extract_from_docx Exception: image processing: {e}")
        for table in doc.tables:
            table_index += 1
            header = pending_header or f"Таблица {table_index}"
            # New: Check first row for title
            first_row_text = " ".join(cell.text.strip() for cell in table.rows[0].cells).lower()
            
            if "таблица" in first_row_text or re.search(r'таблица\s+([\d\w\.,\s\-/]+)', first_row_text):
                header = table.rows[0].cells[0].text.strip()  # Assume title in first cell
            if current_subsection and "БИБЛИОГРАФИЯ" not in current_subsection.upper():
                header += f" в {current_subsection}"
            elif current_section and "БИБЛИОГРАФИЯ" not in current_section.upper():
                header += f" в {current_section}"
                
            pending_header = None
            df = pd.DataFrame([[cell.text.strip() for cell in row.cells] for row in table.rows])
            table_md = df.to_markdown(index=False)
            match = re.search(r'(таблица|приложение)\s+([\d\w\.,\s\-/]+)', header, re.IGNORECASE)
            link_key = f"{match.group(1).lower()} {match.group(2).strip('.')}" if match else f"таблица {table_index}"
            links[link_key] = table_md
            table_content = " ".join([cell.text.strip() for row in table.rows for cell in row.cells]).lower()
            table_keywords = re.findall(r'\b(коэффициент|зона|район|регион|область|край|республика)\b', table_content, re.IGNORECASE)
            table_keywords += [link_key.lower(), current_section.lower(), current_subsection.lower()]
            table_doc = LangchainDocument(
                page_content=f"{header}\n{table_md}",
                metadata={
                    "source": f"table_{table_index}",
                    "type": "table",
                    "links": {link_key: table_md},
                    "filename": doc_filename,
                    "header": header,
                    "section": current_section,
                    "subsection": current_subsection,
                    "keywords": list(set(table_keywords))
                }
            )
            documents.append(table_doc)
    except Exception as e:
        logger.warning(f"extract_from_docx Exception: python-docx, transition to fallback: {e}")
        fallback_used = True
        from docx2txt import process
        text = process(doc_path)
        chunks = [c.strip() for c in text.split('\n\n') if c.strip()]
        for i, chunk in enumerate(chunks):
            local_links = extract_links(chunk)
            doc_item = LangchainDocument(
                page_content=chunk,
                metadata={
                    "source": f"fallback_{i}",
                    "type": "text",
                    "links": local_links,
                    "filename": doc_filename,
                    "header": "",
                    "section": "",
                    "subsection": "",
                    "keywords": list(local_links.keys())
                }
            )
            documents.append(doc_item)
            
    with open(images_cache_file, 'w', encoding='utf-8') as f:
        json.dump(image_cache, f, ensure_ascii=False, indent=2)
    logger.info(f"Кэш изображений сохранён: {images_cache_file}")
    
    with open(links_dict_file, 'w', encoding='utf-8') as f:
        json.dump(links, f, ensure_ascii=False, indent=2)
    logger.info(f"Словарь links сохранён: {links_dict_file}")
    
    chunks_log = [
        {
            "source": d.metadata["source"],
            "type": d.metadata["type"],
            "section": d.metadata.get("section", ""),
            "subsection": d.metadata.get("subsection", ""),
            "content": d.page_content[:200]
        } for d in documents
    ]
    
    with open(chunks_log_file, 'w', encoding='utf-8') as f:
        json.dump(chunks_log, f, ensure_ascii=False, indent=2)
    logger.info(f"Лог чанков сохранён: {chunks_log_file}")
    
    return documents, links, fallback_used

def group_by_section(docs: List[LangchainDocument]) -> List[LangchainDocument]:
    grouped = defaultdict(list)
    for d in docs:
        key = (d.metadata.get("section", ""), d.metadata.get("subsection", ""), d.metadata.get("type", "text"))
        grouped[key].append(d)
    
    new_docs = []
    for (section, subsection, dtype), group_docs in grouped.items():
        content = "\n".join(d.page_content for d in group_docs)
        first_metadata = group_docs[0].metadata
        all_keywords = set()
        all_links = {}  # Агрегируем links
        all_headers = set()  # Агрегируем headers
        for d in group_docs:
            all_keywords.update(d.metadata.get("keywords", []))
            all_links.update(d.metadata.get("links", {}))  # Объединяем dict links
            if d.metadata.get("header"):
                all_headers.add(d.metadata.get("header"))  # Уникальные headers
        
        new_docs.append(LangchainDocument(
            page_content=content,
            metadata={
                "section": section,
                "subsection": subsection,
                "type": dtype,
                "keywords": list(all_keywords),
                "filename": first_metadata.get("filename", "unknown"),
                "source": "; ".join(set(d.metadata.get("source", "") for d in group_docs)),
                "links": all_links,  # Добавлено
                "headers": list(all_headers)  # Добавлено
            }
        ))
    return new_docs

def build_index(documents: List[LangchainDocument], index_path: str = INDEX_PATH) -> FAISS:
    if os.path.exists(index_path):
        logger.info("Загрузка существующего FAISS индекса...")
        vectorstore = FAISS.load_local(index_path, embeddings, allow_dangerous_deserialization=True)
        return vectorstore
    
    logger.info("Создание нового FAISS индекса...")
    
    text_docs = [d for d in documents if d.metadata["type"] == "text"]
    other_docs = [d for d in documents if d.metadata["type"] != "text"]  # Tables, images, formulas as is
    grouped_text = group_by_section(text_docs)
    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
    chunked_texts = splitter.split_documents(grouped_text)
    final_docs = chunked_texts + other_docs  # Add grouped text + original others
    
    vectorstore = None
    batch_size = 200
    for i in range(0, len(final_docs), batch_size):
        batch = final_docs[i:i + batch_size]
        logger.info(f"Обработка батча {i//batch_size + 1} из {len(final_docs)//batch_size + 1}...")
        try:
            if vectorstore is None:
                vectorstore = FAISS.from_documents(batch, embeddings)
            else:
                vectorstore.add_documents(batch)
        except Exception as e:
            logger.error(f"build_index Exception: batch processing {i//batch_size + 1}: {e}")
            raise
    
    vectorstore.save_local(index_path)
    logger.info(f"Индекс сохранён: {index_path}")
    return vectorstore

def filter_docs_by_relevance(
    docs: List[LangchainDocument], 
    question: str, 
    embeddings, 
    threshold: float = 0.75) -> List[Tuple[LangchainDocument, float]]:
    
    question_embedding = embeddings.embed_query(question)
    filtered = []
    
    logger.info(f"Документы перед фильтрацией для вопроса '{question}':")
    
    # Извлекаем слова из вопроса (без знаков препинания, in lower)
    question_words = set(re.findall(r'\b\w+\b', question.lower()))
    
    for doc in docs:
        doc_type = doc.metadata.get("type", "text")
        header = ", ".join(doc.metadata.get("headers", [doc.metadata.get("header", "")])).lower()
        keywords = set(doc.metadata.get("keywords", []))
        
        # Формируем текст для эмбеддинга
        if doc_type == "image":
            doc_text = f"{header} {' '.join(keywords)}"
        else:
            doc_text = f"{doc.page_content} {' '.join(keywords)} {header}"
        
        try:
            doc_embedding = embeddings.embed_query(doc_text)
            similarity = cosine_similarity([question_embedding], [doc_embedding])[0][0]
            
            # Базовый порог релевантности в зависимости от типа документа
            adj_threshold = 0.7 if doc_type in ["table", "formula"] else threshold
            if doc_type == "image":
                adj_threshold = 0.5  # Изображения менее строгие
            
            # Пересечение слов вопроса с keywords и header
            keyword_overlap = len(question_words.intersection(keywords)) / max(len(question_words), 1)
            header_overlap = len(question_words.intersection(set(re.findall(r'\b\w+\b', header)))) / max(len(question_words), 1)
            
            # Boost пропорционально совпадению
            similarity += (keyword_overlap + header_overlap) * 0.1
            
            # Дополнительный boost для таблиц и формул, если в вопросе есть слова, связанные с данными
            data_related_words = {"таблица", "таблицы", "коэффициент", "значение", "значения", "параметр", "параметры"}
            if doc_type in ["table", "formula"] and question_words.intersection(data_related_words):
                similarity += 0.1  # Усиление для таблиц/формул при вопросах о данных
                adj_threshold = 0.65  # Меньший порог
            
            # Boost для изображений, если вопрос связан с визуальными данными
            visual_words = {"карта", "зона", "районирование", "рисунок", "диаграмма"}
            if doc_type == "image" and question_words.intersection(visual_words):
                similarity += 0.05  # Меньший boost для изображений
                adj_threshold = 0.45  # Меньший порог
            
            # Ограничение минимальной релевантности
            if similarity >= adj_threshold:
                filtered.append((doc, similarity))
        
        except Exception as e:
            logger.error(f"filter_docs_by_relevance Exception: document embedding calculation {doc.metadata.get('source', 'unknown')}: {e}")
            continue
    
    # Сортировка по релевантности
    filtered = sorted(filtered, key=lambda x: x[1], reverse=True)[:15] # ограничение до 15 документов
    
    if not filtered:
        logger.warning("Фильтрация не дала результатов, возвращаю первые 15 документов")
        filtered = [(doc, 0.0) for doc in docs[:15]]
    
    logger.info(f"Найдено {len(filtered)} релевантных документов с порогом {threshold}")
    
    return filtered
    
def rag_query(
    question: str, 
    vectorstore: FAISS, 
    links: Dict[str, str], 
    cache_file: str = RAG_CACHE_FILE, 
    fallback_used: bool = False) -> Tuple[str, List]:
    
    logger.info(f"Обработка вопроса: {question}")
        
    if os.path.exists(cache_file):
        with open(cache_file, 'r', encoding='utf-8') as f:
            rag_cache = json.load(f)
    else:
        rag_cache = {}
        
    norm_question = re.sub(r'\s+', ' ', question.strip().lower())
    if norm_question in rag_cache:
        logger.info("Ответ взят из кэша.")
        return rag_cache[norm_question], []
        
    retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 30})
    docs = retriever.get_relevant_documents(question)
    filtered_docs = filter_docs_by_relevance(docs, question, embeddings)
    
    logger.info(f"Отфильтровано {len(filtered_docs)} документов:")
    
    context_parts = []
    text_parts = []
    table_parts = []
    image_parts = []
    formula_parts = []
    image_count = 0
    max_images = 5
    analyzed_images = set()
    image_analysis_results = []
    
    for doc, score in filtered_docs:
        doc_type = doc.metadata.get("type", "text")
        source = doc.metadata.get("source", "unknown")
        header = ", ".join(doc.metadata.get("headers", [doc.metadata.get("header", "")]))
        
        if doc_type == "table":
            content = doc.page_content.strip()
            table_parts.append(f"=== ТАБЛИЦА: {header} ===\n(релевантность: {score:.3f})\n{content}")
            
        elif doc_type == "image":
            for key, value in doc.metadata.get("links", {}).items():
                logger.info(f"Обработка изображения -> {value}")
                if isinstance(value, str) and os.path.exists(value) and image_count < max_images and any(k in key.lower() for k in ["рисунок", "карта"]):
                    if value not in analyzed_images:
                        base64_img = image_to_base64(value)
                        if base64_img:
                            instr = "Отвечай на русском языке. Проанализируй изображение и извлеки информацию, относящуюся к вопросу. Укажи конкретные данные (например, зоны, значения, категории)."
                            image_messages = [
                                SystemMessage(content=instr),
                                HumanMessage(content=[
                                    {"type": "text", "text": f"Анализируй изображение (возможно, карту или рисунок), учитывая вопрос: {question}\nКонтекст: {header}"},
                                    {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_img}"}}
                                ])
                            ]
                            
                            try:
                                image_response = llm.invoke(image_messages)
                                logger.info(f"Анализ изображения -> {value}: {image_response.content[:100]}...")
                                
                                image_analysis_results.append(f"{key.upper()} (релевантность: {score:.3f}): {image_response.content.strip()}")
                                analyzed_images.add(value)
                                image_count += 1
                            except Exception as e:
                                logger.error(f"rag_query Exception: image analysis -> {key}: {e}")
                                image_analysis_results.append(f"{key.upper()}: [Ошибка анализа]")
                                
                        else:
                            logger.error(f"rag_query Error: Base64 not received for -> {value}")
                            image_analysis_results.append(f"{key.upper()}: [Ошибка base64]")
            image_parts.append(f"=== ИЗОБРАЖЕНИЕ: {header} ===\n(релевантность: {score:.3f})\n{header}")
            
        elif doc_type == "formula":
            formula_parts.append(f"=== ФОРМУЛА: {header} ===\n(релевантность: {score:.3f})\n{doc.page_content}")
            
        else:
            truncated_content = doc.page_content[:2000] + "..." if len(doc.page_content) > 2000 else doc.page_content
            text_parts.append(f"=== ТЕКСТ: {header} ===\n(релевантность: {score:.3f})\n{truncated_content}")
            
    context_parts = (
        ["=== АНАЛИЗ ИЗОБРАЖЕНИЙ ==="] + image_analysis_results +
        ["=== ОПИСАНИЯ ИЗОБРАЖЕНИЙ ==="] + image_parts +
        ["=== ТАБЛИЦЫ ==="] + table_parts +
        ["=== ФОРМУЛЫ ==="] + formula_parts +
        ["=== ТЕКСТ ==="] + text_parts
    )
    
    total_tokens = 0
    final_context_parts = []
    for part in context_parts:
        part_tokens = len(part) // 4
        
        if total_tokens + part_tokens > MAX_TOKENS_PER_BATCH:
            logger.warning(f"Превышение лимита токенов ({MAX_TOKENS_PER_BATCH}). Часть пропущена: {part[:50]}...")
            break
            
        final_context_parts.append(part)
        total_tokens += part_tokens
        
    context = "\n\n".join(final_context_parts)
    logger.info(f"Финальный контекст: {len(context)} символов, оценочно ~{total_tokens} токенов")
    
    with open("context_log.txt", 'w', encoding='utf-8') as f:
        f.write(context)
    logger.info("Контекст сохранён в context_log.txt")
    
    messages = [
        SystemMessage(content=(
            "Вы — эксперт по техническим и нормативным документам. "
            "Отвечай точно, кратко и только на основе предоставленного контекста. "
            "Приоритет отдавай данным из таблиц и изображений, если они содержат релевантные значения (например, зоны, веса, коэффициенты). "
            "Если в таблице есть числовые данные, явно укажи их в ответе. "
            "Указывай источники: раздел, таблица, рисунок, формула, приложение. "
            "Если запрашиваемая информация отсутствует, но может быть выведена из таблиц или изображений, укажи это. "
            "Если данных нет, напиши 'Данные отсутствуют'. "
            f"{' (Внимание: использован fallback, данные могут быть неполными)' if fallback_used else ''}"
        )),
        HumanMessage(content=(
            f"На основе контекста ответь кратко и точно на вопрос. Укажи источники.\n"
            f"Контекст:\n{context}\n\n"
            f"Вопрос: {question}"
        ))
    ]
    
    messages_log = [{"role": msg.type, "content": str(msg.content)} for msg in messages]
    with open("messages_log.json", 'w', encoding='utf-8') as f:
        json.dump(messages_log, f, ensure_ascii=False, indent=2)
    logger.info("Сообщения для LLM сохранены в messages_log.json")
    
    try:
        logger.info(f"Запрос к {MODEL_NAME}...")
        response = llm.invoke(messages)
        logger.info(f"Ответ от LLM: {response}")
        if response is None:
            logger.error("LLM вернул None")
            raise ValueError("LLM вернул None")
        if not hasattr(response, 'content'):
            logger.error("Ответ LLM не содержит атрибут content")
            raise ValueError("Ответ LLM не содержит атрибут content")
        if response.content is None:
            logger.error("LLM вернул ответ с content=None")
            raise ValueError("LLM вернул ответ с content=None")
        answer = response.content.strip()
    except Exception as e:
        logger.error(f"rag_query Exception LLM: {e}, simplifying the query...")
        simple_msg = [HumanMessage(content=f"Ответь кратко: {question}. Контекст: {context[:2000]}")]
        try:
            response = llm.invoke(simple_msg)
            logger.info(f"Ответ от упрощённого запроса: {response}")
            
            if response is None or not hasattr(response, 'content') or response.content is None:
                logger.error("Упрощённый запрос вернул None или ответ без content")
                answer = "Ошибка: LLM не вернул корректный ответ"
            else:
                answer = response.content.strip()
        except Exception as e:
            logger.error(f"Ошибка упрощённого запроса: {e}")
            answer = "Ошибка: не удалось получить ответ от модели"
            
    # Формирование источников
    sources = []
    seen_files = defaultdict(list)
    for doc, score in filtered_docs[:10]:
        if doc.metadata.get("type") in ["table", "image", "formula"]:
            filename = doc.metadata.get("filename", "Неизвестный файл")
            header = ", ".join(doc.metadata.get("headers", [doc.metadata.get("header", doc.metadata.get("source", "unknown"))]))
            seen_files[filename].append(f"{header} ({doc.metadata.get('type')}, релевантность: {score:.3f})")
    
    for filename, headers in seen_files.items():
        sources.append(f"{filename}: {'; '.join(headers)}")
    
    if not sources:
        sources.append("Нет релевантных таблиц, изображений или формул")
    
    full_answer = f"{answer}\n\nИсточники:\n" + "\n".join(f"- {s}" for s in sources)
    rag_cache[norm_question] = full_answer
    
    with open(cache_file, 'w', encoding='utf-8') as f:
        json.dump(rag_cache, f, ensure_ascii=False, indent=2)
        
    return full_answer, sources

def load_or_create_all(doc_path: str) -> Tuple[FAISS, Dict[str, str], bool]:
    logger.info("Проверка наличия кешированных данных...")
    
    required_files = [INDEX_PATH, IMAGE_CACHE_FILE, CHUNKS_LOG_FILE, IMAGES_DIR, LINKS_DICT_FILE]
    all_exist = all(os.path.exists(f) for f in required_files) and len(os.listdir(IMAGES_DIR)) > 0
    if all_exist:
        logger.info("✅ Все кеши найдены. Загружаю без пересоздания...")
        
        with open(IMAGE_CACHE_FILE, 'r', encoding='utf-8') as f:
            image_cache = json.load(f)
            
        with open(LINKS_DICT_FILE, 'r', encoding='utf-8') as f:
            links = json.load(f)
            
        vectorstore = FAISS.load_local(INDEX_PATH, embeddings, allow_dangerous_deserialization=True)
        logger.info("Кеш загружен.")
        return vectorstore, links, False
    else:
        logger.info("⚙️ Кеш не найден. Создаём всё заново...")
        documents, links, fallback_used = extract_from_docx(doc_path)
        vectorstore = build_index(documents)
        return vectorstore, links, fallback_used



if __name__ == "__main__":
    
    DOC_PATH = "СП 20.13330.2016 2024-09-05.docx" # can be scaled to other files
    logger.info("🚀 Запуск RAG-системы...")
    
    try:
        vectorstore, links, fallback_used = load_or_create_all(DOC_PATH)
        questions = [
            "В каких зонах по весу снегового покрова находятся Херсон и Мелитополь?",
            "Какие регионы Российской Федерации имеют высотный коэффициент k_h, превышающий 2?",
            "Какой коэффициент надежности по нагрузке для металлических конструкицй?",
            "Как определяется нормативное значение основной ветровой нагрузки w?",
            "Что такое коэффициент надежности по нагрузке?",
        ]
        for q in questions:
            print("\n" + "="*77)
            print(f"*❓* Вопрос: {q}")
            print("-" * 77)
            full_answer, _ = rag_query(q, vectorstore, links, fallback_used=fallback_used)
            print(full_answer)
            print("="*77)
    except Exception as e:
        logger.critical(f"__main__ Exception -> critical: {e}")

OPENAI_API_KEY: ...kIA


2025-09-19 09:28:56,969 - INFO - 🚀 Запуск RAG-системы...
2025-09-19 09:28:56,970 - INFO - Проверка наличия кешированных данных...
2025-09-19 09:28:56,971 - INFO - ✅ Все кеши найдены. Загружаю без пересоздания...
2025-09-19 09:28:56,977 - INFO - Loading faiss with AVX2 support.
2025-09-19 09:28:57,010 - INFO - Successfully loaded faiss with AVX2 support.
2025-09-19 09:28:57,029 - INFO - Кеш загружен.
2025-09-19 09:28:57,030 - INFO - Обработка вопроса: В каких зонах по весу снегового покрова находятся Херсон и Мелитополь?
2025-09-19 09:28:57,031 - INFO - Ответ взят из кэша.
2025-09-19 09:28:57,031 - INFO - Обработка вопроса: Какие регионы Российской Федерации имеют высотный коэффициент k_h, превышающий 2?
2025-09-19 09:28:57,032 - INFO - Ответ взят из кэша.
2025-09-19 09:28:57,033 - INFO - Обработка вопроса: Какой коэффициент надежности по нагрузке для металлических конструкицй?
2025-09-19 09:28:57,034 - INFO - Ответ взят из кэша.
2025-09-19 09:28:57,035 - INFO - Обработка вопроса: Как о


*❓* Вопрос: В каких зонах по весу снегового покрова находятся Херсон и Мелитополь?
-----------------------------------------------------------------------------
Херсон находится в зоне I, а Мелитополь в зоне II по весу снегового покрова. Источник: КАРТА 1, В.

Источники:
- СП 20.13330.2016 2024-09-05.docx: КАРТА 1, в. РАЙОНИРОВАНИЕ ТЕРРИТОРИИ ДОНЕЦКОЙ НАРОДНОЙ РЕСПУБЛИКИ, ЛУГАНСКОЙ НАРОДНОЙ РЕСПУБЛИКИ, ЗАПОРОЖСКОЙ ОБЛАСТИ, ХЕРСОНСКОЙ ОБЛАСТИ ПО ВЕСУ СНЕГОВОГО ПОКРОВА (ДОПОЛНЕНИЕ К КАРТЕ 1. РАЙОНИРОВАНИЕ ТЕРРИТОРИИ РОССИЙСКОЙ ФЕДЕРАЦИИ ПО ВЕСУ СНЕГОВОГО ПОКРОВА) (image, релевантность: 0.873); КАРТА 1, б. РАЙОНИРОВАНИЕ ТЕРРИТОРИИ РЕСПУБЛИКИ КРЫМ ПО ВЕСУ СНЕГОВОГО ПОКРОВА (image, релевантность: 0.868)

*❓* Вопрос: Какие регионы Российской Федерации имеют высотный коэффициент k_h, превышающий 2?
-----------------------------------------------------------------------------
Регионы Российской Федерации с высотным коэффициентом \( k_h \), превышающим 2, включают:

1. Краснодарский край (Адле

In [6]:
_

[]