In [None]:
####################################
########  必要なライブラリ群 ##########
####################################
# 標準ライブラリ
import os
import tempfile

# Oracle Database
import oracledb

# OCI
import oci

# LangChain
from langchain_community.embeddings import OCIGenAIEmbeddings
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 設定読み込み
from config_loader import (
    load_config,
    get_db_connection_params,
    get_oci_config,
    get_genai_config,
    get_object_storage_config,
    get_app_config
)

In [None]:
################
#  定数及び変数  # 
################

# .envファイルから環境変数を読み込み
load_config()

# Oracle Database接続
db_params = get_db_connection_params()
connection = oracledb.connect(**db_params)
print(connection)

# OCI設定
config = get_oci_config()
genai_config = get_genai_config()
os_config = get_object_storage_config()
app_config = get_app_config()

# 設定値を変数に展開
region = config.get('region', 'ap-osaka-1')
compartment_id = genai_config['compartment_id']
service_endpoint = genai_config['endpoint']
embedding_model = genai_config['embed_model']
bucket_name = os_config['bucket_name']

# デフォルトのChunking設定（メイン処理ブロックでチューニング可能）
CHUNK_SIZE = app_config['chunk_size']
CHUNK_OVERLAP = app_config['chunk_overlap']

print(f"\n✓ 設定読み込み完了")
print(f"  - Region: {region}")
print(f"  - Embedding Model: {embedding_model}")
print(f"  - Bucket: {bucket_name}")
print(f"  - Chunk Size: {CHUNK_SIZE}")
print(f"  - Chunk Overlap: {CHUNK_OVERLAP}")

In [None]:
########################################
# Object Storage内のファイル一覧を取得する #
########################################
def get_object_list(config, bucket_name):
    """
    OCI Object Storage内のファイル一覧を取得
    
    Args:
        config (dict): OCI設定情報
        bucket_name (str): Object Storageのバケット名
        
    Returns:
        listfiles: Object Storage内のファイルオブジェクトのリスト
    """
    object_storage = oci.object_storage.ObjectStorageClient(config)
    namespace = object_storage.get_namespace().data #type: ignore

    # object Storageのファイル名一覧
    listfiles = object_storage.list_objects(namespace, bucket_name)
    
    return listfiles

In [None]:
###########################################
# ファイルのコンテンツ(バイナリデータ)を取得する #
###########################################
def get_file_content(full_path):
    """
    Object Storageの指定のファイルのコンテンツをバイナリデータ形式で取得する
    
    Args:
        full_path (str): ファイルのフルパス (例: "manual/camera.pdf"）
        
    Returns:
        tuple: (content, content_type, filename, filtering, file_size)
            - content (bytes): ファイルの内容（バイナリデータ）
            - content_type (str): ファイルタイプ（pdf, txt, json など）
            - filename (str): ファイル名のみ（拡張子含む）
            - filtering (str): フォルダパス（ファイル種別ごとに格納されている)
            - file_size (int): ファイルサイズ（バイト単位）
    """
    
    object_storage = oci.object_storage.ObjectStorageClient(config)
    namespace = object_storage.get_namespace().data #type: ignore

    # ファイルコンテンツを取得（フルパスを使用）
    response = object_storage.get_object(namespace, bucket_name, full_path)

    # バイナリデータ
    content = response.data.content #type: ignore
    
    # ファイルサイズ（バイト単位）
    file_size = len(content)

    # フォルダとファイル名を分離
    filtering = os.path.dirname(full_path)  # フォルダパス（例: "documents/pdf"）
    filename = os.path.basename(full_path)    # ファイル名のみ（例: "report.pdf"）
    
    # フォルダがない場合（ルートレベル）は空文字列
    if not filtering:
        filtering = ""
    
    # ファイルタイプ（拡張子から取得）
    content_type = filename.rsplit('.', 1)[-1].lower() if '.' in filename else 'unknown'

    return content, content_type, filename, filtering, file_size

In [None]:
##################################
# PDFをText化して文字列変数に格納する #
##################################
def pdf_to_text(pdf_content):
    """
    PDFコンテンツをテキストに変換
    
    Args:
        pdf_content (bytes): PDFファイルの内容（バイナリデータ）
        
    Returns:
        text (str): PDFの全テキスト内容
    """
    
    with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_file:
        try:
            temp_file.write(pdf_content)
            temp_file.flush()
            
            loader = PyMuPDFLoader(temp_file.name)
            documents = loader.load()
            
            # 全ページのテキストを結合
            text = ""
            for doc in documents:
                text += doc.page_content            
            
            return text
            
        finally:
            os.unlink(temp_file.name)

In [None]:
#########################
# txtを文字列変数に格納する #
#########################
def txt_to_text(txt_content, filename):
    """
    txt, csv形式のファイルのバイナリデータを文字列に変換

    Args:
        txt_content (bytes): テキストファイルの内容（バイナリデータ）
        filename (str): ファイル名（表示用）

    Returns:
        text (str): テキストファイルの内容
    """
    try:
        # UTF-8でデコード
        text = txt_content.decode('utf-8')
        return text
        
    except UnicodeDecodeError:
        # UTF-8で失敗した場合、Shift-JISを試す
        try:
            text = txt_content.decode('shift_jis')
            return text
        except UnicodeDecodeError:
            raise ValueError(f"ファイル {filename} はUTF-8またはShift-JISではありません")

In [None]:
##########################
# テキストをチャンクに分割する #
##########################
def split_text_into_chunks(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
    """
    テキストをチャンクに分割
    
    Args:
        text (str): 分割対象のテキスト
        chunk_size (int): チャンクサイズ(文字数)
        chunk_overlap (int): オーバーラップ(文字数)
        
    Returns:
        list: チャンクのリスト
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", "。", "、", " ", ""],
        length_function=len
    )
    
    chunks = text_splitter.split_text(text)
    return chunks

In [None]:
################
# Embeddingする #
################
def embed_text(text):
    """
    単一の文字列をEmbeddingしてベクトルデータを返す
    
    Args:
        text (str): Embedding対象の文字列
        
    Returns:
        embedding (str_vector): Embeddingベクトル文字列（浮動小数点数のリスト文字列）
    """
    generative_ai_inference_client = oci.generative_ai_inference.GenerativeAiInferenceClient(config=config, service_endpoint=service_endpoint, retry_strategy=oci.retry.NoneRetryStrategy(), timeout=(10,240))

    embeddings = OCIGenAIEmbeddings(
    model_id         = embedding_model,
    service_endpoint = service_endpoint,
    truncate         = "NONE",
    compartment_id   = compartment_id,
    auth_type        = "API_KEY",
    client=generative_ai_inference_client
    )

    # Embeddingの実行 (pythonのリスト形式): [0.1, 0.2, 0.3,...]
    embedding_ = embeddings.embed_query(text)

    # データベースへのロード用に型変換: [0.1, 0.2, 0.3,...] → "[0.1, 0.2, 0.3,...]"
    embedding = str(embedding_)
        
    return embedding

In [None]:
#######################################
# source_documentsテーブルにデータを保存 # 
#######################################
def save_source_document(filename, filtering, content_type, file_size, text_length):
    """
    source_documentsテーブルに新しいドキュメント情報を保存
    
    Args:
        filename (str): ファイル名
        filtering (str): ソースの種類（フォルダ名など）
        content_type (str): ファイルタイプ（pdf, txt, csvなど）
        file_size (int): ファイルサイズ（バイト単位）
        text_length (int): テキストの文字数
    
    Returns:
        bytes: 作成されたdocument_id
    """
    sql_insert = """
        INSERT INTO source_documents (
            filename,
            filtering,
            content_type,
            file_size,
            text_length,
            registered_date
        ) VALUES (
            :filename,
            :filtering,
            :content_type,
            :file_size,
            :text_length,
            CAST(systimestamp AT TIME ZONE 'Asia/Tokyo' AS timestamp)
        )
        RETURNING document_id INTO :document_id
    """

    document_id_var = connection.cursor().var(oracledb.DB_TYPE_RAW)
    
    with connection.cursor() as cursor:
        cursor.execute(
            sql_insert,
            filename=filename,
            filtering=filtering,
            content_type=content_type,
            file_size=file_size,
            text_length=text_length,
            document_id=document_id_var
        )
        connection.commit()
        document_id = document_id_var.getvalue()[0]
        print(f"  ✓ source_documentsテーブルに保存完了")
        return document_id

In [None]:
##############################
# chunksテーブルにデータを保存 # 
##############################
def save_chunks(document_id, chunks_list, embeddings_list):
    """
    チャンクとそのベクトルをchunksテーブルに一括保存
    
    Args:
        document_id (bytes): ドキュメントID
        chunks_list (list): チャンクテキストのリスト
        embeddings_list (list): ベクトルのリスト
    """
    sql_insert = """
        INSERT INTO chunks (
            document_id,
            chunk_text,
            embedding,
            registered_date
        ) VALUES (
            :document_id,
            :chunk_text,
            TO_VECTOR(:embedding),
            CAST(systimestamp AT TIME ZONE 'Asia/Tokyo' AS timestamp)
        )
    """
    
    with connection.cursor() as cursor:
        for i, (chunk, embedding) in enumerate(zip(chunks_list, embeddings_list)):
            cursor.execute(
                sql_insert,
                document_id=document_id,
                chunk_text=chunk,
                embedding=embedding
            )
            print(f"    - チャンク {i+1}/{len(chunks_list)} を保存")
        connection.commit()
        print(f"  ✓ chunksテーブルに {len(chunks_list)} 件のチャンクを保存完了")

In [None]:
############
# メイン処理 #
############

# チューニング・パラメータ
chunk_size = CHUNK_SIZE          # チャンクサイズ（文字数）。デフォルトから変更する場合は、数値に変更(例: chunk_size = 1000)
chunk_overlap = CHUNK_OVERLAP    # オーバーラップ（文字数）。デフォルトから変更する場合は、数値に変更(例: chunk_overlap = 100)

# Object Storage Clientを用意
object_storage = oci.object_storage.ObjectStorageClient(config)
namespace = object_storage.get_namespace().data #type: ignore

# ファイルリストを取得
file_list = get_object_list(config, bucket_name)

# 各ファイルを処理
for file_obj in file_list.data.objects: #type: ignore
    full_path = file_obj.name
    print(f"\n対象ファイル: {full_path}")
    
    try:
        # ファイルコンテンツを取得
        content, content_type, filename, filtering, file_size = get_file_content(full_path)
        print(f"  - ファイルタイプ: {content_type}")
        print(f"  - ファイル種別: {filtering}")
        print(f"  - サイズ: {file_size} bytes")
        
        # ファイルタイプに応じてテキスト抽出
        text = None
        
        if content_type == 'pdf':
            text = pdf_to_text(content)
            print(f"  - PDFテキスト抽出完了: {len(text)} 文字")
            
        elif content_type in ['txt', 'csv']:
            text = txt_to_text(content, filename)
            
        else:
            print(f"  - 未対応のファイルタイプ: {content_type}")
            print("  このファイルのデータロードをスキップします")
            continue
        
        # テキストが取得できた場合、処理を続行
        if text:
            # 1. source_documentsテーブルに保存
            text_length = len(text)
            document_id = save_source_document(filename, filtering, content_type, file_size, text_length)
            
            # 2. テキストをチャンクに分割
            chunks = split_text_into_chunks(text, chunk_size, chunk_overlap)
            print(f"  - チャンク分割完了: {len(chunks)} チャンク")
            
            # 3. 各チャンクをEmbedding
            print(f"  - Embedding処理中...")
            embeddings_list = []
            for i, chunk in enumerate(chunks):
                embedding = embed_text(chunk)
                embeddings_list.append(embedding)
                if (i + 1) % 10 == 0 or (i + 1) == len(chunks):
                    print(f"    - {i+1}/{len(chunks)} チャンクのEmbedding完了")
            
            # 4. chunksテーブルに保存
            save_chunks(document_id, chunks, embeddings_list)
            
    except Exception as e:
        print(f"  ✗ エラー: {e}")
        continue

# 処理完了
print(f"\n処理完了")