In [1]:
import pandas as pd
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from openai import OpenAI
import requests
from tqdm import tqdm
import json
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
import os
import asyncio
from tqdm.asyncio import tqdm_asyncio
from typing_extensions import Annotated, TypedDict
from typing import List
from langchain_openai import ChatOpenAI
import time

In [29]:
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Set
import asyncio

In [34]:
from langchain_core.documents import Document

In [32]:
from langchain_core.retrievers import BaseRetriever  

In [2]:
class Paragraph(TypedDict):
    title: str
    context: str
    problem: str
    solution: str

class ParagraphedText(TypedDict):
    paragraphs: List[Paragraph]

class LLMChunker:
    def __init__(self, model, corpus, max_concurrent=20):
        self.model = model.with_structured_output(ParagraphedText)
        self.corpus = corpus
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.chunks = []
        self.failed_docs = []
        self.raw_paragraphs = []

    async def chunk_text(self, annotation, text, row_id=None):
        async with self.semaphore:
            try:
                query = f"""
                    Разбей этот текст на один или несколько параграфов.
                    Требования:
                    - Используй только русский язык.
                    - Каждый параграф должен быть самостоятельным текстом, отвечающим на конкретный вопрос.
                    - Перед обработкой текста, начни с краткого чек-листа (3-7 пунктов), описывающего основные шаги, которые ты выполнишь.
                    - Для каждого параграфа создай отдельный JSON-объект со следующими полями:
                      title, context, problem, solution.
                    - Используй ТОЛЬКО информацию, предоставленную в исходном тексте.
                    Текст:
                    {annotation} 
                    {text}"""
                result = await asyncio.to_thread(self.model.invoke, query)
                self.raw_paragraphs.append(result)
                return result['paragraphs']
            except Exception as e:
                print(f"Error on doc {row_id}: {e}")
                self.failed_docs.append(row_id)
                return []

    @staticmethod
    def tag_text(text, tag):
        return f'<{tag}>\n{text}\n</{tag}>'

    def dict_to_xml(self, doc_dict):
        return '\n'.join(self.tag_text(v, k) for k, v in doc_dict.items())

    async def chunk_corpus(self):
        tasks = [
            self.chunk_text(row['annotation'], row['text'], row.get('id'))
            for _, row in self.corpus.iterrows()
        ]
        results = await tqdm_asyncio.gather(*tasks, desc="Processing chunks")

        for result in results:
            for x in result:
                self.chunks.append(self.dict_to_xml(x))

In [3]:
def rerank_docs_with_retry(query, documents, api_key, top_k=1, threshold=0.0, truncation_limit=1500, max_attempts=3, delay=2):
    for attempt in range(max_attempts):
        try:
            url = "https://ai-for-finance-hack.up.railway.app/rerank"
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {api_key}"
            }
            
            truncated_docs = []
            for doc in documents:
                if len(doc) > truncation_limit:
                    truncated_docs.append(doc[:truncation_limit])
                else:
                    truncated_docs.append(doc)
            
            payload = {
                "model": "deepinfra/Qwen/Qwen3-Reranker-4B",
                "query": query,
                "documents": truncated_docs
            }
            
            start_time = time.time()
            response = requests.post(url, headers=headers, json=payload, timeout=12)
            end_time = time.time()
            
            if response.status_code == 200:
                result = response.json()
                sorted_results = sorted(result['results'], key=lambda x: x['relevance_score'], reverse=True)
                top_documents = []
                for res in sorted_results[:top_k]:
                    if res['relevance_score'] >= threshold:
                        doc_index = res['index']
                        top_documents.append({
                            'content': documents[doc_index],
                            'relevance_score': res['relevance_score']
                        })
                return top_documents, end_time - start_time
            else:
                print(f"Rerank error (attempt {attempt + 1}): {response.status_code}")
                if attempt < max_attempts - 1:
                    print(f"Waiting {delay} seconds before retry...")
                    time.sleep(delay)
                    delay *= 1.5
                else:
                    return None, end_time - start_time
        except Exception as e:
            print(f"Rerank exception (attempt {attempt + 1}): {str(e)}")
            if attempt < max_attempts - 1:
                print(f"Waiting {delay} seconds before retry...")
                time.sleep(delay)
                delay *= 1.5
            else:
                return None, 0

In [4]:
def estimate_tokens(text):
    return len(text) // 2.4548

class Query(TypedDict):
    question: str
    problem: str
    answer_structure: str

In [108]:
class MyKAG:
    def __init__(self, model, store, reranker_api_key, reranker_top_k=1, reranker_threshold=0.0, truncation_limit=1500):
        self.model = model
        self.store = store
        self.queries = []
        self.results = []
        self.reranker_api_key = reranker_api_key
        self.reranker_top_k = reranker_top_k
        self.reranker_threshold = reranker_threshold
        self.truncation_limit = truncation_limit

    def formulate_query(self, question):
        prompt = f"""
        Из заданного вам вопроса сформулируйте структурированный запрос на векторную базу данных.
        Отвечайте кратко. В answer_structure также укажите то, что клиент хочет узнать из ответа на вопрос.
        question: вопрос с раскрытыми терминами, таким образом, чтобы не было двусмысленности или недосказанности.
        problem: проблема, которая поставлена в вопросе, а также толкование вопроса.
        answer_structure: структура ответа, которая полностью покроет заданный вопрос.
        Укажи темы, которые обязательно должны быть покрыты.
        Вопрос, на который нужно ответить:
        <question>
        {question}
        </question>
        """
        query = self.model.with_structured_output(Query).invoke(prompt)
        self.queries.append({'question': question, 'query': query})
        return query

    def get_knowledge(self, query):
        docs = self.store.similarity_search(query=str(query), k=20)
        knowledge_candidates = [doc.page_content for doc in docs]
        
        if knowledge_candidates and self.reranker_api_key:
            reranked_docs, reranker_time = rerank_docs_with_retry(
                query=str(query),
                documents=knowledge_candidates,
                api_key=self.reranker_api_key,
                top_k=self.reranker_top_k,
                threshold=self.reranker_threshold,
                truncation_limit=self.truncation_limit
            )
            
            if reranked_docs:
                knowledge = '\n'.join([doc['content'] for doc in reranked_docs])
                print(f"Reranker applied: selected {len(reranked_docs)} documents with scores: {[doc['relevance_score'] for doc in reranked_docs]}")
                return knowledge
        
        knowledge = '\n'.join(knowledge_candidates[:self.reranker_top_k])
        return knowledge

    def answer(self, question):
        query = self.formulate_query(question)
        knowledge = self.get_knowledge(query)
        prompt = f"""
        Ответьте на вопрос клиента, используя только релевантную предоставленную информацию.
        Не используйте не относящиеся к вопросу данные. Приведите ответ на русском языке.
        Если в вопросе есть запрос на какие-то численные данные, и эти данные есть в предоставленных вам знаниях,
        обязательно предоставьте их. Отвечайте на вопрос так, чтобы помочь клиенту.
        Если в релевантных знаниях есть упоминание законодательной базы, обязательно предоставьте их. 
        Не упоминайте вещи, которых нет в ответе. Раскрывайте аббревиатуры. Не упоминай клиента или меня.
        Ответ на вопрос ДОЛЖЕН БЫТЬ СВЯЗАН С проблемой, поставленной в вопросе:
        <Problem>
        {query['problem']}
        </Problem>
        Используйте структуру ответа:
        <Structure>
        {query['answer_structure']}
        </Structure>
        <Question>
        {question}
        </Question>
        <Knowledge>
        {knowledge}
        </Knowledge>
        """
        answer = self.model.invoke(prompt).content
        result = {
            'question': question,
            'query': query,
            'result': answer,
            'knowledge': knowledge,
            'prompt': prompt
        }
        self.results.append(result)
        return result

In [66]:
async def answer_questions_with_executor(kag, questions, max_workers=20):
    loop = asyncio.get_running_loop()
    executor = ThreadPoolExecutor(max_workers=max_workers)

    tasks = []
    for i, q in enumerate(questions, start=1):
        fut = loop.run_in_executor(executor, kag.answer, q)
        tasks.append(fut)
        print(f"[SCHED] Task {i}/{len(questions)} scheduled")

    results = []
    for f in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Answering questions"):
        try:
            res = await f
            results.append(res)
            print("[OK] one result collected")
        except Exception as e:
            print("[ERROR] task raised:", repr(e))
            results.append(None)

    executor.shutdown(wait=True)
    return results

In [67]:
async def main(questions, vectorstore, reranker_api_key):
    model = ChatOpenAI(base_url=BASE_URL, model=GROK, api_key=LLM_API_KEY)
    kag = MyKAG(
        model=model,
        store=vectorstore,
        reranker_api_key=reranker_api_key,
        reranker_top_k=1,  
        reranker_threshold=0.0,  
        truncation_limit=1500  
    )
    
    results = await answer_questions_with_executor(kag, questions, max_workers=20)
    return results

In [68]:
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
EMBEDDER_API_KEY = os.getenv("EMBEDDER_API_KEY")
RERANKER_API_KEY = os.getenv("RERANKER_API_KEY") 

In [69]:
RERANKER_API_KEY

'sk-_C1tbXiFrFI_9OX_0U_ylg'

In [70]:
# BASE_URL = 'https://ai-for-finance-hack.up.railway.app/' 
BASE_URL = 'https://openrouter.ai/api/v1'
client = OpenAI(
    base_url=BASE_URL,
    api_key=LLM_API_KEY,
)

In [55]:
questions = pd.read_csv('questions.csv').iloc[:10]
corpus = pd.read_csv('train_data.csv').iloc[:10]

In [56]:
embeddings = OpenAIEmbeddings(model="text-embedding-3-small", base_url=BASE_URL, api_key=EMBEDDER_API_KEY)
# model = ChatOpenAI(base_url=BASE_URL, model="x-ai/grok-3-mini", api_key=LLM_API_KEY)

In [17]:
GROK = 'x-ai/grok-3-mini'

In [76]:
model = ChatOpenAI(base_url=BASE_URL, model=GROK, api_key=LLM_API_KEY, temperature=0.2)

In [None]:
# chunker = LLMChunker(model, corpus)
# asyncio.run(chunker.chunk_corpus())
# my_chunks = chunker.chunks

In [None]:
# vectorstore = asyncio.run(InMemoryVectorStore.afrom_texts(
#     my_chunks,
#     embedding=embeddings,
# ))

In [57]:
vectorstore = InMemoryVectorStore.load('context_chunking_grok.db', embeddings)

In [22]:
partial_answers = []
unanswered_questions = list(questions['Вопрос'])[:10]
answers = pd.DataFrame([], columns=['question', 'query', 'result', 'knowledge', 'prompt'])
attempts = 0

In [None]:
while len(partial_answers) < len(questions) and attempts < 5:
    partial_answers = asyncio.run(
        main(
            unanswered_questions,
            vectorstore,
            RERANKER_API_KEY
        )
    )
    partial_answers = pd.DataFrame(partial_answers)
    unanswered_questions = list(set(partial_answers['question']) ^ set(unanswered_questions))
    answers = pd.concat([answers, partial_answers], axis=0)
    attempts += 1

result = questions.merge(answers, left_on='Вопрос', right_on='question', how='left').drop_duplicates('result')
final_result = result.drop(columns=['question', 'query', 'knowledge', 'prompt']).rename({'result': 'Ответы на вопрос'})
final_result.to_csv('submission.csv', index=False)

In [47]:
MyKAG?

[31mInit signature:[39m
MyKAG(
    model,
    store,
    reranker_api_key,
    reranker_top_k=[32m1[39m,
    reranker_threshold=[32m0.0[39m,
    truncation_limit=[32m1500[39m,
)
[31mDocstring:[39m      <no docstring>
[31mType:[39m           type
[31mSubclasses:[39m     

In [120]:
kag = MyKAG(model, vectorstore, RERANKER_API_KEY, reranker_top_k=10, reranker_threshold=0.2)

In [121]:
kag.answer('Зачем проверять членство МФО в СРО перед оформлением займа?')

Reranker applied: selected 10 documents with scores: [0.9986329674720764, 0.9886682629585266, 0.9755768775939941, 0.9609517455101013, 0.949669361114502, 0.8807970285415649, 0.851952850818634, 0.43206343054771423, 0.42441198229789734, 0.36116471886634827]


{'question': 'Зачем проверять членство МФО в СРО перед оформлением займа?',
 'query': {'question': 'Why is it important to verify if a microfinance organization (MFO) is a member of a self-regulatory organization (SRO) before obtaining a loan?',
  'problem': 'The problem is that borrowers may face risks such as fraud, lack of legal protection, or non-compliance with regulations when dealing with unregistered MFOs. This question seeks to understand the necessity of this verification to ensure safe and reliable lending practices.',
  'answer_structure': 'The response should cover: 1. Definitions of MFO and SRO. 2. Benefits of membership (e.g., regulatory compliance, borrower rights protection). 3. Risks of not verifying (e.g., potential scams, financial losses). 4. Steps to verify membership. The client wants to learn the reasons and implications for personal financial safety.'},
 'result': '### Зачем проверять членство микрофинансовой организации (МФО) в саморегулируемой организации (СР

In [122]:
print(kag.results[-1]['result'])

### Зачем проверять членство микрофинансовой организации (МФО) в саморегулируемой организации (СРО) перед оформлением займа?

Микрофинансовые организации (МФО) — это компании, такие как микрофинансовые компании (МФК) и микрокредитные компании (МКК), которые выдают небольшие займы физическим лицам и предприятиям. Саморегулируемые организации (СРО) — это объединения, такие как Союз "Микрофинансовый альянс" или СРО "МиР", которые контролируют деятельность МФО и обеспечивают соблюдение законов.

**Преимущества членства МФО в СРО:**  
Членство в СРО гарантирует, что МФО соблюдает регуляторные требования, включая предоставление финансовой отчетности в Банк России. Это защищает права заемщиков, так как СРО осуществляет контроль за деятельностью организаций. Например, для МФК действует более строгий надзор, включая проверки аудиторов и Банка России, что снижает риски для заемщиков и обеспечивает прозрачность условий займа.

**Риски, если не проверить членство:**  
Если МФО не входит в СРО, это

In [123]:
print(kag.results[-1]['knowledge'])

<title>
Проверка саморегулируемой организации
</title>
<context>
МФО должны быть членами саморегулируемых организаций (СРО), таких как Союз 'Микрофинансовый альянс' или СРО 'МиР'.
</context>
<problem>
Отсутствие в СРО может указывать на нарушения или нелегальный статус, что увеличивает риск обмана.
</problem>
<solution>
Выясните, в какую СРО входит МФО, проверив списки на сайтах СРО, и избегайте организаций, не входящих в них.
</solution>
<title>
Проверка в государственном реестре
</title>
<context>
Перед заключением договора с МФО важно убедиться, что организация легальна.
</context>
<problem>
Мошенники могут маскироваться под МФО, что приводит к финансовым потерям и рискам для личных данных.
</problem>
<solution>
Проверьте, входит ли МФО в государственный реестр, и убедитесь в наличии синего кружочка с галочкой на сайте в поисковых системах.
</solution>
<title>
Как убедиться в legitimate нового кредитора
</title>
<context>
МФО может уступить долг другой организации, и заемщик получае

In [124]:
print(kag.results[-1]['query'])

{'question': 'Why is it important to verify if a microfinance organization (MFO) is a member of a self-regulatory organization (SRO) before obtaining a loan?', 'problem': 'The problem is that borrowers may face risks such as fraud, lack of legal protection, or non-compliance with regulations when dealing with unregistered MFOs. This question seeks to understand the necessity of this verification to ensure safe and reliable lending practices.', 'answer_structure': 'The response should cover: 1. Definitions of MFO and SRO. 2. Benefits of membership (e.g., regulatory compliance, borrower rights protection). 3. Risks of not verifying (e.g., potential scams, financial losses). 4. Steps to verify membership. The client wants to learn the reasons and implications for personal financial safety.'}


In [38]:
fa = pd.read_csv('full_answers.parquet')

In [40]:
fa['query'].iloc[0]

"{'question': 'Как просрочка по «беспроцентному» займу скажется на переплате/ПСК?', 'problem': 'Проблема заключается в понимании влияния просрочки платежа по беспроцентному займу на переплату (дополнительные расходы) и на ПСК (полную стоимость кредита), включая возможные штрафы и изменение условий.', 'answer_structure': 'Структура ответа должна включать: 1. Определение беспроцентного займа и его ключевых условий. 2. Объяснение последствий просрочки (штрафы, начисление процентов). 3. Влияние на переплату (расчет дополнительных платежей). 4. Влияние на ПСК (изменение эффективной ставки). 5. Примеры или расчеты для иллюстрации. Это обеспечит полный охват того, что клиент хочет узнать: как просрочка влияет на финансовые аспекты займа.'}"