#Запуск алгоритма

In [None]:
#@title Установка пакетов
!pip  install langchain==0.0.335 openai==1.2.3 tiktoken==0.5.1 pydantic==1.10.8 faiss-cpu==1.7.4 nltk oauth2client >/dev/null
!pip install gspread

In [2]:
#@title Импорт библиотек

import os
import getpass
import requests

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.document_loaders import TextLoader
import openai
from openai import OpenAI
import tiktoken
import re
import requests
from langchain.docstore.document import Document

import pandas as pd
from google.oauth2.service_account import Credentials

from oauth2client.service_account import ServiceAccountCredentials
import gspread
import json
import numpy as np


In [3]:
#@title Опредение функций

def load_text_from_github(kdb_link):
## правильная ссылка: https://github.com/terrainternship/GPT_LaserLove/raw/main/instruction_1.txt
## неправильная ссылка:  https://github.com/terrainternship/GPT_LaserLove/blob/main/instruction_1.txt
  response = requests.get(kdb_link)
  txt = response.text
  return txt

def load_googledoc_by_url(doc_url) -> str: # Функция load_googledoc_by_url предназначена для загрузки текста из гуглдока, по ссылке (doc_url)
  # Extract the document ID from the URL
  match_ = re.search('/document/d/([a-zA-Z0-9-_]+)', doc_url)
  if match_ is None:
    raise ValueError('Invalid Google Docs URL')
  doc_id = match_.group(1)

  # Download the document as plain text
  response = requests.get(f'https://docs.google.com/document/d/{doc_id}/export?format=txt')
  response.raise_for_status()
  return response.text

def load_document_text(file_path) -> str:   # Функция load_document_text предназначена для загрузки текста из файла, расположенного по указанному пути (file_path)
#    with open(file_path, 'r', encoding='windows-1251') as file:
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    text_encoded = text.encode('utf-8')
    text = text_encoded.decode('utf-8')
    return text

def load_text(any_link):
  if len(any_link)==0:
    text=''
  else:
    if "github.com" in any_link:
      if "blob" in any_link: any_link=any_link.replace("blob", "raw")
      text = load_text_from_github(any_link)
    elif "docs.google.com" in any_link:
      text = load_googledoc_by_url(any_link)
    else:
      text = load_document_text(any_link)
    return text


def num_tokens_from_string(string: str, encoding_name: str) -> int:
      """Возвращает количество токенов в строке"""
      encoding = tiktoken.get_encoding(encoding_name)
      num_tokens = len(encoding.encode(string))
      return num_tokens

def split_text(text, max_count, chunk_overlap, verbose=0, double_split=1):
    # Функция для подсчета количества токенов в фрагменте
    def num_tokens(fragment):
        return num_tokens_from_string(fragment, "cl100k_base")

    headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
                          ]
    markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
    fragments = markdown_splitter.split_text(text)

    # Создание объекта разделителя текста
    splitter = RecursiveCharacterTextSplitter(chunk_size=max_count, chunk_overlap=chunk_overlap, length_function=num_tokens)

    # Список для хранения фрагментов текста
    source_chunks = []

    # Обработка каждого фрагмента текста
    for fragment in fragments:

        if verbose:
            # Вывод количества слов/токенов в фрагменте, если включен режим verbose
            count = num_tokens(fragment.page_content)
            print(f"Tokens in text fragment = {count}\n{'-' * 5}\n{fragment.page_content}\n{'=' * 20}")
        if double_split:
          # Разбиение фрагмента текста на части заданной длины с помощью разделителя
          # и добавление каждой части в список source_chunks  и передача в чанк метадата из маркдауновскго сплиттера
          source_chunks.extend(Document(page_content=chunk, metadata=fragment.metadata) for chunk in splitter.split_text(fragment.page_content))
        else:
          source_chunks = fragments

    # Возвращение списка фрагментов текста
    return source_chunks

def create_search_index(data, chunk_size, chunk_overlap, verbo, double_split):
    source_chunks = []
    source_chunks = split_text(text=data, max_count=chunk_size, chunk_overlap=chunk_overlap, verbose=verbo, double_split=double_split)
    return FAISS.from_documents(source_chunks, OpenAIEmbeddings())

def num_tokens_from_messages(messages, model="gpt-3.5-turbo-0301"):
    """Returns the number of tokens used by a list of messages."""
    try:
        encoding = tiktoken.encoding_for_model(model)
    except KeyError:
        encoding = tiktoken.get_encoding("cl100k_base")
    if model == "gpt-3.5-turbo-0301":  # note: future models may deviate from this
        num_tokens = 0
        for message in messages:
            num_tokens += 4  # every message follows <im_start>{role/name}\n{content}<im_end>\n
            for key, value in message.items():
                num_tokens += len(encoding.encode(value))
                if key == "name":  # if there's a name, the role is omitted
                    num_tokens += -1  # role is always required and always 1 token
        num_tokens += 2  # every reply is primed with <im_start> assistant
        return num_tokens
    else:
        raise NotImplementedError(f"""num_tokens_from_messages() is not presently implemented for model {model}.""")

def answer_user_question(system_doc_text, knowledge_base_url, topic, instructions, temperature, verbose, k, chunk_size, chunk_overlap, model, double_split):
    if "github.com" in knowledge_base_url:
      knowledge_base_text = load_text_from_github(knowledge_base_url)
    elif "docs.google.com" in knowledge_base_url:
      knowledge_base_text = load_googledoc_by_url(knowledge_base_url)
    else:
      knowledge_base_text = load_document_text(knowledge_base_url)

    knowledge_base_index = create_search_index(knowledge_base_text, chunk_size, chunk_overlap, verbose, double_split)
    return answer_index(system_doc_text, topic, instructions, knowledge_base_index, temperature, verbose, k, model)


def answer_index(system, topic, instructions, search_index, temp, verbose, k, model):
    docs = search_index.similarity_search_with_score(topic, k=k)
    message_content = '\n '.join([f'Отрывок текста №{i+1}\n{doc[0].page_content}' for i, doc in enumerate(docs)])
    messages = [{"role": "system", "content": system}, {"role": "user", "content": f"{instructions}\n\nТексты для анализа:\n{message_content}\n\nОбращение клиента: {topic}"}]

    completion = openai.chat.completions.create(model=model, messages=messages, temperature=temp)
    return message_content, completion.choices[0].message.content


#Получение ключа API

In [4]:
#@title Получение ключа API от пользователя и установка его как переменной окружения
openai_key = getpass.getpass("OpenAI API Key:")
os.environ["OPENAI_API_KEY"] = openai_key
openai.api_key = openai_key

OpenAI API Key:··········


# Тест моделей

In [None]:
#@title 1. Тест по вопросам к БЗ - вопрос задаем вручную
model = "gpt-3.5-turbo-1106" #@param ["gpt-3.5-turbo", "gpt-3.5-turbo-16k", "gpt-3.5-turbo-1106"]
knowledge_base_link = "https://github.com/terrainternship/GPT_LaserLove/blob/main/knowledge.md?raw=true" #@param {type:"string"}
#chunk_size = 1000 #@param {type: "slider", min: 200, max: 1024, step:8}
#chunk_overlap = 0 #@param {type: "slider", min: 0, max: 256, step:8}
temperature = 0.4 #@param {type: "slider", min: 0, max: 1, step:0.1}
num_fragment = 5 #@param {type:"integer"}
verbose = "0" #@param [0,1]
system_prompt_link= "https://github.com/terrainternship/GPT_LaserLove/raw/main/%D0%9F%D0%A0%D0%9E%D0%9C%D0%A2%20LaserLove%20Svetl.txt" #@param {type:"string"}
instructions_link = "" #@param {type:"string"}
Вопрос_клиента = "\u041A\u0430\u043A\u043E\u0439 \u0441\u043E\u0441\u0442\u0430\u0432 \u0443 \u043C\u043E\u043B\u043E\u0447\u043A\u0430 \u0441 \u044D\u0444\u0444\u0435\u043A\u0442\u043E\u043C \u043B\u0438\u0444\u0442\u0438\u043D\u0433\u0430?" #@param {type:"string"}
#content_base = load_googledoc_by_url(kb_url)

system_prompt = load_text(system_prompt_link)
instructions = load_text(instructions_link)

chunks, output1 = answer_user_question(system_prompt, knowledge_base_link, Вопрос_клиента,
                               instructions, temperature, int(verbose), num_fragment,
                               chunk_size, chunk_overlap, model, double_split=0) #ОБЩИЙ

print("\nОтвет:\n", output1)


Ответ:
 Извините, я не могу предоставить информацию о составе продукции. Однако, если вас интересует косметика с эффектом лифтинга, я могу предложить вам ознакомиться с нашим ассортиментом косметических средств. Вы можете узнать больше о наших продуктах и их ценах, нажав на кнопку "Хочу узнать цены на косметику" в меню.


In [None]:
#@title 2. Тест по вопросам к БЗ групповой по вопросам из Гугл таблицы
model = "gpt-3.5-turbo-1106" #@param ["gpt-3.5-turbo", "gpt-3.5-turbo-16k", "gpt-3.5-turbo-1106"]
knowledge_base_link = "https://github.com/terrainternship/GPT_LaserLove/blob/main/knowledge.md?raw=true" #@param {type:"string"}
#chunk_size = 200 #@param {type: "slider", min: 200, max: 1024, step:8}
#chunk_overlap = 0 #@param {type: "slider", min: 0, max: 256, step:8}
temperature = 0.4 #@param {type: "slider", min: 0, max: 1, step:0.1}
num_fragment = 3 #@param {type:"integer"}
#verbose = "0" #@param [0,1]
system_prompt_link= "" #@param {type:"string"}
instructions_link = "" #@param {type:"string"}
GoogleAPI_path = "" #@param {type:"string"}
googlesheet_url = "https://docs.google.com/spreadsheets/d/1ec4f2uwDPs84jzwnVsBDiOus2-UGTO5gnqcc-UCAMGM" #@param {type:"string"}
sheet_name = "" #@param {type:"string"}
первая_строка = 2 #@param {type:"integer"}
последняя_строка = 96 #@param {type:"integer"}

input_column_letter = "B" #@param {type:"string"}
output_column_letter = "J" #@param {type:"string"}
chunk_column_letter = "N" #@param {type:"string"}

system_prompt = load_text(system_prompt_link)
instructions = load_text(instructions_link)

# авторизуемся по токену гугла
credentials = Credentials.from_service_account_file(GoogleAPI_path, scopes=['https://spreadsheets.google.com/feeds'])
client = gspread.authorize(credentials)

# открываем таблицу по URL
spreadsheet = client.open_by_url(googlesheet_url)
# Выбираем лист по имени
worksheet = spreadsheet.worksheet(sheet_name)

# Получаем все вопросы из столбца input_column_letter
data_values = worksheet.get_all_values()[первая_строка-1:]
questions = [row[gspread.utils.a1_to_rowcol(input_column_letter + "1")[1] - 1] for row in data_values]

for i, question in enumerate(questions, start=2):
    if i > последняя_строка:
      break
    if not isinstance(question, str):
      # Пропустить, если question не является строкой
      continue
    message_chunk, output1 = answer_user_question(system_prompt, knowledge_base_link, question,
                                   instructions, temperature, int(verbose), num_fragment,
                                   chunk_size, chunk_overlap, model, double_split=0)

    # Записываем результат
    worksheet.update_cell(i, gspread.utils.a1_to_rowcol(output_column_letter + str(i))[1], output1)
    worksheet.update_cell(i, gspread.utils.a1_to_rowcol(chunk_column_letter + str(i))[1], message_chunk)

print("Данные успешно обработаны и записаны на лист.")
