In [None]:
import os
import json
import glob
import math

import numpy as np
import faiss
from collections import Counter, defaultdict

import torch
from transformers import DPRQuestionEncoder, DPRQuestionEncoderTokenizer, DPRContextEncoder, DPRContextEncoderTokenizer

In [None]:
# 폴더 경로 설정
folder_path = 'text_to_json'

# manual_text 리스트 초기화
manual_text = []

# JSON 파일들을 순차적으로 읽기
for file_path in glob.glob(os.path.join(folder_path, '*.json')):
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
        manual_text.append(data)

# 결과 확인
for text in manual_text:
    print(text)

In [None]:
# Example manual text and query

query = "Explain about the image including Safety net"

# Tokenization and preprocessing
def tokenize(text):
    return text.lower().split()

# Calculate IDF for each term
def compute_idf(corpus):
    df = defaultdict(int)
    for document in corpus:
        terms = set(tokenize(document))
        for term in terms:
            df[term] += 1
    total_docs = len(corpus)
    idf = {term: math.log((total_docs + 1) / (df_value + 1)) + 1 for term, df_value in df.items()}
    return idf

# Calculate BM25 score for a document given a query
def compute_bm25(doc, query, idf, avgdl, k1=1.5, b=0.75):
    doc_terms = tokenize(doc)
    query_terms = tokenize(query)
    doc_len = len(doc_terms)
    term_freq = Counter(doc_terms)
    
    score = 0.0
    for term in query_terms:
        if term in term_freq:
            f = term_freq[term]
            idf_term = idf.get(term, 0)
            score += idf_term * ((f * (k1 + 1)) / (f + k1 * (1 - b + b * (doc_len / avgdl))))
    return score

# Preprocess the manual
corpus = [tokenize(doc) for doc in manual_text]
avgdl = sum(len(doc) for doc in corpus) / len(corpus)
idf = compute_idf(manual_text)

# Compute scores for each document
scores = [(doc, compute_bm25(doc, query, idf, avgdl)) for doc in manual_text]
ranked_docs = sorted(scores, key=lambda x: x[1], reverse=True)

# Output the top ranked documents
for doc, score in ranked_docs:
    print(f"Score: {score}\nDocument: {doc}\n")


In [None]:
# DPR 모델과 토크나이저 로드
question_encoder = DPRQuestionEncoder.from_pretrained('facebook/dpr-question_encoder-single-nq-base')
question_tokenizer = DPRQuestionEncoderTokenizer.from_pretrained('facebook/dpr-question_encoder-single-nq-base')
context_encoder = DPRContextEncoder.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base')
context_tokenizer = DPRContextEncoderTokenizer.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base')

# 문서들을 벡터로 인코딩
context_vectors = []
for text in manual_text:
    inputs = context_tokenizer(text, return_tensors='pt', truncation=True, padding=True)
    with torch.no_grad():
        context_vector = context_encoder(**inputs).pooler_output
    context_vectors.append(context_vector.cpu().numpy())

context_vectors = np.vstack(context_vectors)

# FAISS 인덱스 생성
index = faiss.IndexFlatIP(context_vectors.shape[1])
index.add(context_vectors)

# 검색 쿼리
query = "Explain about the image including Safety net"

# 쿼리를 벡터로 인코딩
query_inputs = question_tokenizer(query, return_tensors='pt', truncation=True, padding=True)
with torch.no_grad():
    query_vector = question_encoder(**query_inputs).pooler_output.cpu().numpy()

# FAISS 인덱스를 사용하여 유사한 문서 검색
k = 5  # 상위 5개의 결과를 가져옵니다.
distances, indices = index.search(query_vector, k)

# 검색 결과 출력
print("Top 5 results:")
for idx in indices[0]:
    print(f"Score: {distances[0][idx]}\nDocument: {manual_text[idx]}\n")
