# ビジネス関連法案の RAG 検索 

In [1]:
import logging
import cohere
import requests
from bs4 import BeautifulSoup
import numpy as np
import pandas as pd
from typing import List, Dict
import time
import re
from dotenv import load_dotenv
import os
import uuid
import hnswlib

In [3]:
load_dotenv()
co = cohere.Client(os.environ['COHERE_APIKEY'])

EGOV_SEARCH_URL = "https://elaws.e-gov.go.jp/api/1/lawlists/1"

  from pydantic.v1.datetime_parse import parse_date as parse_date


# 2. e-Gov から法令情報を取得

In [70]:
"""
documents = [
    {
        'title': '昭和六十三年法律第百八号 消費税法',
        'url': 'https://laws.e-gov.go.jp/api/1/lawdata/363AC0000000108', #https://laws.e-gov.go.jp/law/363AC0000000108'
    },
    #{
    #    'title': '昭和四十年法律第三十三号 所得税法',
    #    'url': 'https://laws.e-gov.go.jp/api/1/lawdata/340AC0000000033'
    #},
    #{
    #    'title': '昭和二十八年法律第六号 酒税法',
    #    'url': 'https://laws.e-gov.go.jp/api/1/lawdata/328AC0000000006'
    #},        
]
"""
# 429: trial token rate limit exceeded, limit is 100000 tokens per minute

documents = [
    #{
    #    'title': 'タックスアンサーコード一覧',
    #    'url': 'https://www.nta.go.jp/taxes/shiraberu/taxanswer/code/index.htm'
    #},
    {
        'title': '所得税のしくみ',
        'url': 'https://www.nta.go.jp/taxes/shiraberu/taxanswer/shotoku/1000.htm'
    },
    {
        'title': '消費税の基本的なしくみ',
        'url': 'https://www.nta.go.jp/taxes/shiraberu/taxanswer/shohi/6101.htm'
    },
    {
        'title': '相続税がかかる場合',
        'url': 'https://www.nta.go.jp/taxes/shiraberu/taxanswer/sozoku/4102.htm'
    },
    {
        'title': 'ふるさと納税(寄附金控除)',
        'url': 'https://www.nta.go.jp/taxes/shiraberu/taxanswer/shotoku/1155.htm'
    }
]


In [71]:
# From LLMU

from unstructured.partition.html import partition_html
from unstructured.partition.xml import partition_xml
from unstructured.chunking.title import chunk_by_title

class Vectorstore:
    """
    A class representing a collection of documents indexed into a vectorstore.

    Parameters:
    raw_documents (list): A list of dictionaries representing the sources of the raw documents. Each dictionary should have 'title' and 'url' keys.

    Attributes:
    raw_documents (list): A list of dictionaries representing the raw documents.
    docs (list): A list of dictionaries representing the chunked documents, with 'title', 'text', and 'url' keys.
    docs_embs (list): A list of the associated embeddings for the document chunks.
    docs_len (int): The number of document chunks in the collection.
    idx (hnswlib.Index): The index used for document retrieval.

    Methods:
    load_and_chunk(): Loads the data from the sources and partitions the HTML content into chunks.
    embed(): Embeds the document chunks using the Cohere API.
    index(): Indexes the document chunks for efficient retrieval.
    retrieve(): Retrieves document chunks based on the given query.
    """

    def __init__(self, raw_documents: List[Dict[str, str]]):
        self.raw_documents = raw_documents
        self.docs = []
        self.docs_embs = []
        self.retrieve_top_k = 10
        self.rerank_top_k = 3
        self.load_and_chunk()
        self.embed()
        self.index()


    def load_and_chunk(self) -> None:
        """
        Loads the text from the sources and chunks the HTML content.
        """
        print("Loading documents...")

        for raw_document in self.raw_documents:
            if raw_document['url'].endswith('.htm') or raw_document['url'].endswith('.html'):
                elements = partition_html(url=raw_document['url'])
            else:        
                filename = f"{raw_document['title']}.xml"
                with open(filename, "w") as f:
                    res = requests.get(raw_document["url"])
                    f.write(res.text)
            
                elements = partition_xml(filename=filename)
            chunks = chunk_by_title(elements)
            for chunk in chunks:
                self.docs.append(
                    {
                        "title": raw_document["title"],
                        "text": str(chunk),
                        "url": raw_document["url"],
                    }
                )

    def embed(self) -> None:
        """
        Embeds the document chunks using the Cohere API.
        """
        print("Embedding document chunks...")

        batch_size = 90
        self.docs_len = len(self.docs)
        for i in range(0, self.docs_len, batch_size):
            batch = self.docs[i : min(i + batch_size, self.docs_len)]
            texts = [item["text"] for item in batch]
            docs_embs_batch = co.embed(
                texts=texts, model="embed-v4.0", input_type="search_document"
            ).embeddings
            self.docs_embs.extend(docs_embs_batch)

    def index(self) -> None:
        """
        Indexes the document chunks for efficient retrieval.
        """
        print("Indexing document chunks...")

        self.idx = hnswlib.Index(space="ip", dim=len(self.docs_embs[0]))
        self.idx.init_index(max_elements=self.docs_len, ef_construction=512, M=64)
        self.idx.add_items(self.docs_embs, list(range(len(self.docs_embs))))

        print(f"Indexing complete with {self.idx.get_current_count()} document chunks.")

    def retrieve(self, query: str) -> List[Dict[str, str]]:
        """
        Retrieves document chunks based on the given query.

        Parameters:
        query (str): The query to retrieve document chunks for.

        Returns:
        List[Dict[str, str]]: A list of dictionaries representing the retrieved document chunks, with 'title', 'text', and 'url' keys.
        """

        # Dense retrieval
        query_emb = co.embed(
            texts=[query], model="embed-v4.0", input_type="search_query"
        ).embeddings
        
        doc_ids = self.idx.knn_query(query_emb, k=self.retrieve_top_k)[0][0]

        # Reranking
        rank_fields = ["title", "text"] # We'll use the title and text fields for reranking

        docs_to_rerank = [self.docs[doc_id] for doc_id in doc_ids]
        rerank_results = co.rerank(
            query=query,
            documents=docs_to_rerank,
            top_n=self.rerank_top_k,
            model="rerank-english-v3.0",
            rank_fields=rank_fields
        )

        doc_ids_reranked = [doc_ids[result.index] for result in rerank_results.results]

        docs_retrieved = []
        for doc_id in doc_ids_reranked:
            docs_retrieved.append(
                {
                    "title": self.docs[doc_id]["title"],
                    "text": self.docs[doc_id]["text"],
                    "url": self.docs[doc_id]["url"],
                }
            )

        return docs_retrieved

In [72]:
vectorstore = Vectorstore(documents)

Loading documents...
Embedding document chunks...
Indexing document chunks...
Indexing complete with 107 document chunks.


In [73]:
message = '税について何種類知っている？'

response = co.chat_stream(
    message=message,
    model="command-a-03-2025",
    documents=documents)

In [74]:
for event in response:
    if event.event_type == "text-generation":
        print(event.text, end="")                 # V1

私は、以下の税について知っています。
- 所得税
- 消費税
- 相続税

(まだ途中)