In [1]:
!pip install langchain
!pip install langchain_openai
!pip install langchain-google-vertexai
!pip install tqdm


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [1]:
from operator import itemgetter
from typing import Dict, Any, List, Callable, Iterable, Tuple
from functools import partial
from dataclasses import dataclass, field, asdict
import os
import json
import re

from langchain_core.runnables import Runnable, RunnableMap, RunnableLambda
from langchain_core.documents import Document
from langchain.embeddings import HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer 
from langchain_community.vectorstores import FAISS, VectorStore
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain.output_parsers import OutputFixingParser
from langchain.prompts import (
    PromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
    ChatPromptTemplate,
    MessagesPlaceholder,
)
from langchain_google_vertexai import ChatVertexAI
from tqdm import tqdm



PROJECT_ID = 'axial-chemist-425510-p2'
LOCATION = "europe-west4"

# Укажите путь к вашему JSON-файлу с кредами
SERVICE_ACCOUNT_FILE = "../secrets/axial-chemist-425510-p2-b4eb1d622fbe.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = SERVICE_ACCOUNT_FILE


llm_params = {
    'model_name': 'gemini-2.0-flash-001',
    'project': PROJECT_ID,
    'location': LOCATION
}

LS_API_KEY = 'cb4fde840c4404906bbf7848800e03d2b0d5e98f'
LABEL_STUDIO_URL = 'https://custom-servers.t1v.scibox.tech/label-studio/30'
RAG_DATA_PROJECT_ID = 3


@dataclass
class ParamsJsonFixingParser:
    json_input_variables: list = field(default_factory=dict)
    json_fixing_template: str = field(default_factory=str)
    max_retries: int = field(default_factory=int)
    dict = asdict



def convert_attrs_string2dict(str_with_attrs: str) -> str:
    """
    ПЕРВОНАЧАЛЬНЫЙ ВАРИАНТ С РЕГУЛЯРКАМИ
    Преобразует строку с атрибутами в формате 'attr1: val1, attr2: val2, ...' 
    в json строку со словарем.
    
    Args:
        str_with_attrs (str): строка в формате 'attr1: val1, attr2: val2, ...'
        
    Returns:
        str: Словарь, в формате json
    """
    attrs_dict = {}
    if str_with_attrs:
        # Разбиваем по запятым, за которыми следует паттерн "что-то до двоеточия :"
        parts = re.split(r',\s*(?=[^:]+:)', str_with_attrs)
        for part in parts:
            # Теперь на каждой части ищем ключ:значение
            match = re.match(r'([^:]+):\s*(.*)', part)
            if match:
                key = match.group(1).strip().strip("'").strip('"')
                value = match.group(2).strip().strip("'").strip('"')
                attrs_dict[key] = value
    return json.dumps(attrs_dict, ensure_ascii=False, indent=None)


def create_model_prompts(system_prompt: str,
                         user_prompt: str) -> ChatPromptTemplate:
    system_prompt = SystemMessagePromptTemplate.from_template(system_prompt)
    user_prompt = HumanMessagePromptTemplate.from_template(user_prompt)
    chat_prompt = ChatPromptTemplate.from_messages(
        [system_prompt,
         user_prompt]
    )
    return chat_prompt

def prepare_retriever_results_to_prompt(examples: List[Tuple[float, Dict]],
                                        key_name_problem: str = 'page_content',
                                        key_name_information: str = 'metadata',
                                        string_pattern_per_example: str = 'Name:\n{problem}\nExtracted information:\n{information}\n',
                                        key_name_problem_in_pattern: str = 'problem',
                                        key_name_information_in_pattern: str = 'information') -> str:
    """Подготавливаем строчку с few_shot (в текущем пайплайне используется для подготовки
       примеров полученных из vector_store)

    Args:
        examples (List[Tuple[float, Dict]]): найденные в векторной базе примеры с скорами
        key_name_problem (str, optional): ключ в словаре каждого примера, в котором содержится наименование проблемы
                                          (в нашем случае строчка с описанием продукта). Defaults to 'page_content'.
        key_name_information (str, optional): ключ в словаре каждого примера, в котором содержится пример как надо обрабатать проблему
                                              (в нашему случае по ключу содержиться разложение на атрибутный состав). Defaults to 'metadata'.
        string_pattern_per_example (str, optional): шаблон того, как будет выглядить каждый пример. В шаблон должна вставляться информация
                                                    полученная по `key_name_problem` и информация полученная по `key_name_information`. Defaults to 'Name:\n{problem}\nExtracted information:\n{information}\n'.
        key_name_problem_in_pattern (str, optional): ключ в шаблоне `string_pattern_per_example` куда вставить значение полученное
                                                     по `key_name_problem`. Defaults to 'problem'.
        key_name_information_in_pattern (str, optional): ключ в шаблоне `string_pattern_per_example` куда вставить значение полученное
                                                         по `key_name_information`. Defaults to 'information'.

    Returns:
        str: подготовленная строка с примерами (в нашем случае полученными при помощи RAG)
    """
    few_shot_prompt_with_nearest_examples: str = ''
    for i, (search_sim, example) in enumerate(examples, start=1):
        prepared_string_with_example: str = string_pattern_per_example.format(**{
            key_name_problem_in_pattern: example[key_name_problem],
            key_name_information_in_pattern: example[key_name_information]
        })
        few_shot_prompt_with_nearest_examples += f'{str(i)}. {prepared_string_with_example}'
    if not few_shot_prompt_with_nearest_examples:
        few_shot_prompt_with_nearest_examples = 'Without examples'
    
    return few_shot_prompt_with_nearest_examples


def get_vector_store(
        texts: Iterable[str] | Iterable[Document],
        model_name: str,
        model_kwargs: dict = {'device': 'cpu'},
        encode_kwargs: dict = {'normalize_embeddings': True},
        **kwargs: dict,
        ) -> tuple[
            VectorStore, 
            Callable, 
            Callable
            ]:
    """
    Создаем векторное хранилище и возвращаем триплет:
    - векторное хранилище
    - функция для поиска по векторному хранилищу
    - функция для добавления данных в хранилище
    При поиске используется similarity_search_with_relevance_score и числом текстов к = 5
    """
    if not isinstance(texts, Iterable):
        raise TypeError('texts must be a Iterable!')
    elif not all([isinstance(text, str) for text in texts]) \
        and not all([isinstance(doc, Document) for doc in texts]):
        raise TypeError('texts must contain only strings or Documents!')

    emb_model = HuggingFaceEmbeddings(
        model_name=model_name,
        model_kwargs=model_kwargs,
        encode_kwargs=encode_kwargs
    )

    if isinstance(next(iter(texts)), str):
        vectorstore = FAISS.from_texts(
            texts, embedding=emb_model, **kwargs
        )
    else:
        vectorstore = FAISS.from_documents(
            texts, embedding=emb_model, **kwargs
        )

    return vectorstore, partial(vectorstore.similarity_search_with_relevance_scores, k=5), vectorstore.add_texts



def create_json_fixing_parser(client_parser_init_params: Dict[str, Any],
                              json_input_variables: List[str],
                              json_fixing_template: str,
                              max_retries: int) -> Runnable:
    """Создает Цепочку с Fixing Output Parser, который пытается извлечь
    результаты JSON объекта из строки. Если извлечение не будет произведено,
    то вызовется ошибка и мы попросить у другой модели исправить ответ предыдущей таким образом,
    чтобы json стал корректным.

    Args:
        client_parser_init_params (Dict[str, Any]): параметры модели, которой будет отправлен запрос, если результаты парсинга будут отрицательные;
        json_input_variables (List[str]): переменные, которые принимает json_input_variables;
        json_fixing_template (str): промпт для запроса, который исправляет результаты парсинга json;
        max_retries (int): количество попыток исправления;

    Returns:
        Runnable: Цепочка пытающаяся спарсить ответ другой модели в формате JSON.
    """
    client_parser = ChatVertexAI(**client_parser_init_params)
    json_fixing_parser = OutputFixingParser.from_llm(
        parser=JsonOutputParser(),
        llm=client_parser,
        prompt=PromptTemplate(
            input_variables=json_input_variables,
            template=json_fixing_template
        ),
        max_retries=max_retries
    )
    return json_fixing_parser


def create_chain_with_rag(llm_params: Dict[str, Any],
                          system_prompt: str,
                          user_prompt: str,
                          params_json_fixing_parser: ParamsJsonFixingParser,
                          embedding_model: str,
                          vectorstore_data: List[Dict[str, Any]],
                          k: int = 5,
                          score_threshold: float = 0.2) -> Runnable:
    """Создает лангчейн цепочку с RAG, где в качестве
    векторной базы используется FAISS.

    Args:
        llm_params (Dict[str, Any]): параметры для инициализации клиента ChatOpenAI;
        system_prompt (str): системный промпт использующий примеры полученные из vector_store;
        user_prompt (str): пользовательский промпт, куда вставляется описание товара;
        params_json_fixing_parser (ParamsJsonFixingParser): параметры для инициализации json_fixing_parser;
        embedding_model (str): наименование модели эмбеддингов, использующаяся для поиска в RAG;
        vectorstore_data (List[Dict[str, Any]]): данные для инициализации faiss vector_store. Каждый элемент списка должен иметь
                                                 ключи (problem, title, category, attributes).
        k (int, optional): максимальное количество примеров во few-shot в RAG. Defaults to 5.
        score_threshold (float, optional): минимальный cosine similarity score для документа полученного из векторного хранилища для few-shot. Defaults to 0.2.

    Returns:
        Runnable: Цепочка с RAG.
    """
    # Создаем векторную базу
    docs = [
        Document(
            page_content=sku['problem'],
            metadata={
                'title': sku['title'], 
                'category': sku['category'],  
                'attributes': sku.get('attributes', ''),
            }) for sku in vectorstore_data
    ]
    vstore, vsearch, vadd = get_vector_store(docs, model_name=embedding_model)

    # Создаем промпты для модели
    prompt_template = create_model_prompts(system_prompt, user_prompt)

    llm = ChatVertexAI(**llm_params)

    # Создаем JsonFixingParser
    json_fixing_parser = create_json_fixing_parser(
        llm_params,
        json_input_variables=params_json_fixing_parser.json_input_variables,
        json_fixing_template=params_json_fixing_parser.json_fixing_template,
        max_retries=params_json_fixing_parser.max_retries
    )

    embedder = SentenceTransformer('cointegrated/LaBSE-en-ru')
    chain = (
        # Подцепочка, которая делает запрос к векторной базе данных
        RunnableMap({
            'examples': itemgetter('problem') | (RunnableLambda(lambda x: vsearch(x, k=k, score_threshold=score_threshold)) |
                                                RunnableLambda(lambda results: [(score, doc.model_dump()) for (doc,score) in results])), # производим поиск в векторной базе
            'problem': itemgetter('problem'),  # сохраняем описание товара для дальнейшей передачи в промпт
            'service_words': itemgetter('service_words')
        }) |
        RunnableMap({
            'examples': itemgetter('examples'), # производим поиск в векторной базе
            'problem': itemgetter('problem'),  # сохраняем описание товара для дальнейшей передачи в промпт
            'service_words': itemgetter('service_words')
        }) |
        # Подцепочка, которая делает запрос к модели на основе информации из векторной базы данных
        RunnableMap({
            'examples': itemgetter('examples'),
            'problem': itemgetter('problem'),  # сохраняем описание товара для дальнейшей передачи в промпт
            'llm': RunnableMap({
                'examples': itemgetter('examples') | RunnableLambda(prepare_retriever_results_to_prompt),
                'problem': itemgetter('problem'),
            }) | prompt_template | llm | json_fixing_parser
        })
    )


    return chain



  from tqdm.autonotebook import tqdm, trange


In [None]:
llm = ChatVertexAI(**llm_params)

llm.invoke("Привет!")

In [112]:
# загрузим данные из LSs
# чтение размеченных данных из LS
from label_studio_sdk.client import LabelStudio
from ast import literal_eval

ls_client = LabelStudio(
    api_key=LS_API_KEY,
    base_url=LABEL_STUDIO_URL
)
tasks = ls_client.tasks.list(project=RAG_DATA_PROJECT_ID)
rows_for_validations = []
for item in tqdm(tasks):
    single_annotation = dict()
    single_annotation['problem'] = item.data['problem']
    single_annotation['lead_time'] = item.avg_lead_time
    single_annotation['id'] = item.id
    if single_annotation['id'] == 11542:
        print(11542, len(full_annotation))
        # print(item.annotations[0])
    for full_annotation in item.annotations[-1]['result']:
        # атрибуты требуют доп. обработки для устранения escate-символов
        if full_annotation['type'] == 'choices':
            continue
        if full_annotation['from_name'] == 'comment_attrs' and single_annotation['id'] == 11542:
            print(full_annotation)
        single_annotation[full_annotation['from_name']] = \
            json.loads(convert_attrs_string2dict(full_annotation['value']['text'][-1])) \
                if full_annotation['from_name'] == 'comment_attrs' else full_annotation['value']['text'][-1]
    rows_for_validations.append(single_annotation)



<Request('GET', 'https://custom-servers.t1v.scibox.tech/label-studio/30/api/tasks/?project=3&page=1')>
https://custom-servers.t1v.scibox.tech/label-studio/30/api/tasks/?project=3&page=1


JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
# Промпты 

system_prompt = '''Question: Extract the following information from the given name. Ensure that the output is a valid JSON and follows the structure strictly:
    1) Service or product? (1 for service, 0 for product)
    2) Basic name (extract the core name of the product or service, excluding any additional descriptive words, characteristics, or attributes. Ensure that the basic name provides sufficient detail to clearly identify the item, but avoid duplicating information such as size, quantity, or material if it is repeated elsewhere in the attributes. **Importantly, exclude any articles or alphanumeric codes, such as SKUs, from the basic name.** If the original string contains adjectives that give a clearer understanding of the product or service, include them in the basic name. The goal is to capture a name that can stand alone and be easily recognized.)
    3) Article (identify any alphanumeric string that may represent an article, if present in the name. Ensure that the article is not just a combination of letters and numbers that represent other attributes, such as size, volume, weight, or quantity (e.g., "10мл", "100г", "1л" should be ignored). An article typically consists of a unique identifier, such as a SKU or product code, that distinguishes the product. If the article cannot be determined or is ambiguous, leave it empty.)
    4) Category. Select category basing on example categories;
    5) Other attributes as key-value pairs (extract all additional descriptive words, characteristics, or attributes from the original name and assign them with meaningful keys. **The values of the keys can represent single values or lists of values**. Ensure that the keys in the attributes are correctly formed, and the values are the attributes taken from the original name, not indicated as True or False.)

- Ensure that the keys in the attributes are correctly formed, following these rules:
    - Write all keys and values in the **nominative case**.
    - Expand abbreviations wherever they appear in the text.
    - Use **capitalized letters** for the names of attributes and their values.
    - Numerical values of attributes should be separated by spaces.

**Knowledge Base Context:**
The following example has been retrieved from the knowledge base to provide additional context:
{examples}

**Important:** Process only the provided name and do not add any other names or identifiers. Treat the name as a single independent entity.

If any field cannot be determined, leave it empty.

The output must strictly adhere to the following format:
{{% raw %}}
{{{{
    "id_1": {{{{
    "услуга": 1, 
    "базовое_наименование": "", 
    "артикул": "", 
    "категория": "", 
    "атрибуты": {{{{
    }}}}
}}}}
}}}}
{{% endraw %}}
Ensure that:
- No fields are missing.
= Expand abbreviations wherever they appear in the text, if their meaning can be determined.
- If a value cannot be determined, leave it as an empty string or an empty object.
- Most importantly, try to determine the most appropriate category for the product or service based on its meaning.
- **Allow attribute values to be single values or lists of values where applicable.**

**Do not generate any text after the JSON output.**
'''
user_prompt = '''Help me with next problem: {problem}'''
