In [1]:
import os
import pdfplumber
import numpy as np
from groq import Groq
import google.generativeai as genai
from dotenv import load_dotenv
from DB.Datasource import VectorDB
from Embedding.embedding import EmbeddingModel
from utils.text_utils import save_source_in_txt
from utils.scoring_utils import AnswerScoring

scoring_system = AnswerScoring()

load_dotenv("C:\\chatbot\\ENGLISHTEST\\.env")
if not os.getenv('GROQ_API_KEY'):
    print("警告: GROQ_API_KEY 未設置")
groq_client = Groq(api_key=os.getenv('GROQ_API_KEY'))

sys_msg = (
    "You are an experienced English teacher specializing in TOEIC preparation. "
    "Your responsibilities include: "
    "1. Creating TOEIC-style questions when requested "
    "2. Providing detailed explanations for answers "
    "3. Teaching relevant grammar points and vocabulary "
    "4. Giving study tips and strategies "
    "5. Correcting student's English mistakes "
    "Please ensure all responses are in proper English. "
    "When explaining, be thorough but easy to understand. "
    "Always maintain a supportive and encouraging teaching tone."
)

convo = [{'role': 'system', 'content': sys_msg}]

generation_config = {
    'temperature': 0.95,
    'top_p': 1,
    'top_k': 5,
    'max_output_tokens': 2048
}

safety_settings = [
    {'category': 'HARM_CATEGORY_HARASSMENT', 'threshold': 'BLOCK_NONE'},
    {'category': 'HARM_CATEGORY_HATE_SPEECH', 'threshold': 'BLOCK_NONE'},
    {'category': 'HARM_CATEGORY_SEXUALLY_EXPLICIT', 'threshold': 'BLOCK_NONE'},
    {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'threshold': 'BLOCK_NONE'},
]

model = genai.GenerativeModel('gemini-1.5-flash-latest',
                              generation_config=generation_config,
                              safety_settings=safety_settings)
MAX_CONVERSATION_LENGTH = 10

def groq_prompt(prompt):
    if len(convo) > MAX_CONVERSATION_LENGTH:
        # 保留系統消息和最近的對話
        convo[:] = [convo[0]] + convo[-MAX_CONVERSATION_LENGTH+1:]

    convo.append({'role': 'user', 'content': prompt})
    try:
        chat_completion = groq_client.chat.completions.create(messages=convo, model='llama3-70b-8192')
        response = chat_completion.choices[0].message
        convo.append(response)
        return response.content
    except Exception as e:
        print(f"調用Groq API時出錯: {e}")
        # 從對話歷史中移除失敗的提問
        convo.pop()
        return f"系統出現錯誤：{str(e)}"



folder_path = "C:\\chatbot\\ENGLISHTEST\\result"
output_txt_path = "C:\\chatbot\\ENGLISHTEST\\result.txt"


def get_relevant_text_from_db(query, top_k=3):
    try:
        distances, indices = vector_db.search(query, top_k=top_k)
        
        # 確保 indices 非空且是有效格式
        if not indices or isinstance(indices, (list, np.ndarray)) == False:
            print(f"警告: 返回的索引格式不正確或無相關文本，請檢查query: {query}")
            return ""

        relevant_texts = []
        if not isinstance(indices, (list, np.ndarray)):
            indices = [indices]  # 確保 indices 是列表

        for idx in indices:
            if isinstance(idx, (list, np.ndarray)):
                idx = idx[0]  # 如果 idx 是嵌套列表，取第一個元素
            if isinstance(idx, int):  # 確保 idx 是整數
                text = vector_db.get_text_by_index(idx)
                if text and text != "未找到相關文本":
                    relevant_texts.append(text)
            else:
                print(f"警告: 無效的索引類型 {type(idx)}，跳過")
        
        combined_text = " ".join(relevant_texts)
        return combined_text[:2000] if combined_text else ""
    
    except Exception as e:
        print(f"獲取相關文本時出錯: {e}")
        return ""

    
def dynamic_weighting(response):
    
    score = len(response)
    return score

def generate_response_with_context(query):
    try:
        # 生成回應的過程
        relevant_texts = get_relevant_text_from_db(query)
        if not relevant_texts:
            print("警告: 未找到相關文本，直接使用查詢生成回應")
            response = groq_prompt(query)
        else:
            prompt = (
                f"As an English teacher, please help with the following request while considering "
                f"this reference material: {relevant_texts}\n\n"
                f"Student's request: {query}\n\n"
                f"Please provide a comprehensive response that includes:\n"
                f"1. Direct answer to the student's question\n"
                f"2. Relevant explanations or teaching points\n"
                f"3. Examples if applicable\n"
                f"4. Study tips or suggestions when appropriate"
            )
            response = groq_prompt(prompt)

        if "A)" in response and "B)" in response:  
            question_id = len(scoring_system.correct_answers) + 1
            correct_answer = "A"  
            process_user_answer(question_id, correct_answer)

            scoring_system.add_question(question_id, correct_answer, weight)  # 加入題目和答案
            process_user_answer(question_id, correct_answer)  # 讓用戶回答並進行評分

        score = dynamic_weighting(response)
        print(f"生成的答案: {response}")
        print(f"答案得分: {score}")

        return response
    except Exception as e:
        print(f"生成回應時出錯: {e}")
        return groq_prompt(query)
    
def save_response_to_txt(query, response, output_file="responses.txt"):
    output_dir = os.path.dirname(output_file)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)
    if not os.path.exists(output_file):
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write("# GenAI 回應記錄\n\n")
    with open(output_file, 'a', encoding='utf-8') as f:
        f.write(f"\n{'='*50}\n")
        f.write(f"用戶提問: {query}\n") 
        f.write(f"{'='*50}\n")
        f.write(response)
        f.write("\n\n")

def process_user_answer(question_id, correct_answer):
    """處理用戶答案輸入並累積評分"""
    while True:
        user_answer = input(f"請輸入您的答案 (A/B/C/D) 問題 {question_id}，或輸入 'skip' 跳過: ").strip().upper()
        if user_answer == 'SKIP':
            return False
        if user_answer in ['A', 'B', 'C', 'D']:
            scoring_system.record_user_answer(question_id, user_answer)
            if user_answer == correct_answer:
                print("正確答案！")
            else:
                print(f"錯誤答案！正確答案是: {correct_answer}")
            return True
        print("無效的輸入，請輸入 A、B、C、D 或 'skip'")


def show_results():
    """顯示測驗結果"""
    total_score, percentage_score, wrong_questions = scoring_system.calculate_score()
    stats = scoring_system.get_statistics()
    
    print("\n" + "="*50)
    print("測驗結果:")
    print(f"總分: {percentage_score:.2f}%")
    print(f"答題數: {stats['answered_questions']}/{stats['total_questions']}")
    print(f"正確率: {stats['accuracy']:.2f}%")
    
    if wrong_questions:
        print("\n錯誤題目:")
        for wrong in wrong_questions:
            print(f"題號 {wrong['question_id']}:")
            print(f"您的答案: {wrong['user_answer']}")
            print(f"正確答案: {wrong['correct_answer']}\n")
    print("="*50 + "\n")
    
try:
    vector_db = VectorDB()
    pdf_texts = vector_db.read_pdfs_in_folder(folder_path)
    if pdf_texts:
        save_source_in_txt(pdf_texts, output_txt_path) 
        for text in pdf_texts:
            if vector_db.add_text(text):
                print("添加了新文本")
            else:
                print("文本已存在，跳過處理")
    else:
        print("警告: 沒有找到PDF文件或文件讀取失敗")

    while True:
        try:
            query = input("請輸入您的問題（輸入'quit'退出，'score'查看成績）: ")
            if query.lower() == 'quit':
                break
            elif query.lower() == 'score':
                show_results()
                continue
            elif query.lower() == 'reembedding':
                vector_db.reembedding(folder_path)
                continue
            
            response = generate_response_with_context(query)
            save_response_to_txt(query, response)
            print("\nGenAI 回覆:", response)
            
            if "A)" in response and "B)" in response: 
                question_id = len(scoring_system.correct_answers) + 1
                
                correct_answer = "A"  
                weight = 1.0
                
                scoring_system.add_question(question_id, correct_answer, weight)
                process_user_answer(question_id, correct_answer)
            
            print("\n" + "="*50 + "\n")
        except Exception as e:
            print(f"處理查詢時出錯: {str(e)}")

except Exception as e:
    print(f"初始化過程出錯: {str(e)}")

  from .autonotebook import tqdm as notebook_tqdm


正在讀取PDF：C:\chatbot\ENGLISHTEST\result\1. 精選單字20題\Chris多益精選單字題.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\10. 多益分類單字本｜25主題篇章\10 TOEIC VoCAb 多益衝刺單字分類整理 Chris.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\11 多益文法綜合模擬考50題\50題 文法綜合模擬考題.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\2. 易混淆詞彙10題\Chris易混淆詞.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\3. 各種代名詞20題\各類代名詞考題.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\4. 精選詞性考題20題\精選難的詞性考題20題.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\5. 時態語態考題大統整20題\Chris 時態語態考題大統整.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\6. 介係詞考題25題\Chris 介係詞考題.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\7. 分詞與動名詞混淆考法20題\Chris易混淆詞分詞動名詞考法.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\8. 連接副詞10題\Chris連接副詞統整.pdf
正在讀取PDF：C:\chatbot\ENGLISHTEST\result\9. 假設語態考題10題\Chris 假設語態考題.pdf
文本已保存至 C:\chatbot\ENGLISHTEST\result.txt
索引已保存到 c:\chatbot\ENGLISHTEST\DB\vector_index.faiss
添加了新文本
索引已保存到 c:\chatbot\ENGLISHTEST\DB\vector_index.faiss
添加了新文本
索引已保存到 c:\chatbot\ENGLISHTEST\DB\vector_index.faiss
添加了新文本
索引已保存到 c:\

In [1]:

import os
import pdfplumber
from sentence_transformers import SentenceTransformer
from groq import Groq
import google.generativeai as genai
import openai
import numpy as np
import os
import sys
import faiss
from dotenv import load_dotenv

load_dotenv("C:\\chatbot\\ENGLISHTEST\\.env")
if not os.getenv('GROQ_API_KEY'):
    print("警告: GROQ_API_KEY 未設置")
groq_client = Groq(api_key=os.getenv('GROQ_API_KEY'))

sys_msg = (
    "You are an experienced English teacher specializing in TOEIC preparation. "
    "Your responsibilities include: "
    "1. Creating TOEIC-style questions when requested "
    "2. Providing detailed explanations for answers "
    "3. Teaching relevant grammar points and vocabulary "
    "4. Giving study tips and strategies "
    "5. Correcting student's English mistakes "
    "6. Providing detailed explanations for answers "
    "7. Teaching relevant grammar points and vocabulary "
    "8. Giving study tips and strategies "
    "9. Correcting student's English mistakes "
    "Please ensure all responses are in proper English. "
    "When explaining, be thorough but easy to understand. "
    "Always maintain a supportive and encouraging teaching tone."
)

convo = [{'role': 'system', 'content': sys_msg}]

generation_config = {
    'temperature': 0.95,
    'top_p': 1,
    'top_k': 5,
    'max_output_tokens': 2048
}

safety_settings = [
    {'category': 'HARM_CATEGORY_HARASSMENT', 'threshold': 'BLOCK_NONE'},
    {'category': 'HARM_CATEGORY_HATE_SPEECH', 'threshold': 'BLOCK_NONE'},
    {'category': 'HARM_CATEGORY_SEXUALLY_EXPLICIT', 'threshold': 'BLOCK_NONE'},
    {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'threshold': 'BLOCK_NONE'},
]

model = genai.GenerativeModel('gemini-1.5-flash-latest',
                              generation_config=generation_config,
                              safety_settings=safety_settings)

def groq_prompt(prompt):
    convo.append({'role': 'user', 'content': prompt})
    try:
        chat_completion = groq_client.chat.completions.create(messages=convo, model='llama3-70b-8192')
        response = chat_completion.choices[0].message
        convo.append(response)
        return response.content
    except Exception as e:
        print(f"Error calling Groq API: {e}")
        return "系統出現錯誤"



folder_path = "C:\\chatbot\\ENGLISHTEST\\result"
output_txt_path = "C:\\chatbot\\ENGLISHTEST\\result.txt"

def read_pdf(pdf_path):
    try:
        with pdfplumber.open(pdf_path) as pdf:
            text = ""
            for page in pdf.pages:
                text += page.extract_text() + "\n" 
        return text
    except Exception as e:
        print(f"讀取PDF時出錯 {pdf_path}: {str(e)}")
        return ""

def read_pdfs_in_folder(folder_path):
    if not os.path.exists(folder_path):
        print(f"錯誤: 文件夾不存在 {folder_path}")
        return []
    all_texts = []
    
    try:
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                if file.lower().endswith('.pdf'):
                    pdf_path = os.path.join(root, file)
                    print(f"正在讀取PDF：{pdf_path}")
                    text = read_pdf(pdf_path)
                    if text:
                        all_texts.append(text)
    except Exception as e:
        print(f"讀取文件夾時出錯: {str(e)}")
    
    return all_texts

def save_sourse_in_txt(texts, output_txt_path):
    with open(output_txt_path, 'w', encoding='utf-8') as f:
        for i, text in enumerate(texts, 1):
            f.write(f"--- PDF {i} ---\n")
            f.write(text)
            f.write("\n\n") 
    # print(f"所有文字已儲存至 {output_txt_path}")


embed_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
def generate_embedding(text):
    return embed_model.encode(text)

def reembedding():
    try:
        print("開始重新embedding...")
        vector_db.clear_all()
        
        pdf_texts = read_pdfs_in_folder(folder_path)
        if not pdf_texts:
            print("警告: 沒有讀取到任何PDF文本")
            return
        save_sourse_in_txt(pdf_texts, output_txt_path)
        
        for text in pdf_texts:
            if vector_db.add_text(text):
                print("添加了新文本")
        print("重新embedding完成")
    except Exception as e:
        print(f"重新embedding時出錯: {str(e)}")


class VectorDB:
    def __init__(self, dimension=384, index_file="vector_index.faiss"):
        self.dimension = dimension
        self.index_file = os.path.join(os.path.dirname(__file__), index_file)
        self.text_hash_file = os.path.join(os.path.dirname(__file__), "text_hash_file.txt")
        self.index = faiss.IndexFlatL2(self.dimension)
        self.texts = []
        self.text_hashes = set() 
        self._load_index()
        self._load_text_hashes()

    def _load_text_hashes(self):
        if os.path.exists(self.text_hash_file):
            with open(self.text_hash_file, 'r') as f:
                self.text_hashes = set(f.read().splitlines())

    def _save_text_hashes(self):
        with open(self.text_hash_file, 'w') as f:
            f.write('\n'.join(self.text_hashes))

    def _load_index(self):
        if os.path.exists(self.index_file):
            try:
                self.index = faiss.read_index(self.index_file) 
                # print(f"Loaded existing index from {self.index_file}")
            except Exception as e:
                print(f"Error loading index: {e}")
        else:
            print(f"No existing index found. Creating a new one.")
            self.index = faiss.IndexFlatL2(self.dimension)
            self.texts = [] 

    def _save_index(self):
        faiss.write_index(self.index, self.index_file)
        # print(f"Index saved to {self.index_file}")

    def add_text(self, text, metadata=None):
        text_hash = str(hash(text))
        if text_hash in self.text_hashes:
            return False
        embedding = generate_embedding(text)
        embedding = np.array(embedding, dtype=np.float32)
        self.index.add(np.array([embedding], dtype=np.float32))
        self.texts.append(text)
        self.text_hashes.add(text_hash)
        self._save_text_hashes()
        self._save_index()
        return True

    def search(self, query, top_k=5):
        query_embedding = generate_embedding(query)
        distances, indices = self.index.search(np.array([query_embedding], dtype=np.float32), top_k)
        return distances, indices
    
    def get_text_by_index(self, index):
        try:
            return self.texts[index]
        except IndexError:
            return "未找到相關文本"
        
    def _save_index(self):
        try:
            faiss.write_index(self.index, self.index_file)
            print(f"索引已保存到 {self.index_file}")
        except Exception as e:
            print(f"保存索引時出錯: {e}")    
        
    def clear_all(self):
        self.index = faiss.IndexFlatL2(self.dimension)
        self.texts = []
        self.text_hashes = set()
        if os.path.exists(self.index_file):
            os.remove(self.index_file)
        if os.path.exists(self.text_hash_file):
            os.remove(self.text_hash_file)
        print("已清除所有向量数据")

    def close(self):
        self._save_index()
        self._save_text_hashes()



def chunk_text(text, max_length=512):
    sentences = text.split('.') 
    chunks = []
    current_chunk = []
    current_length = 0
    for sentence in sentences:
        sentence = sentence.strip()
        sentence_length = len(sentence.split())
        if current_length + sentence_length <= max_length:
            current_chunk.append(sentence)
            current_length += sentence_length
        else:
            if current_chunk:
                chunks.append(' '.join(current_chunk))
            current_chunk = [sentence]
            current_length = sentence_length
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    return chunks

def process_text_and_generate_embeddings(text):
    chunks = chunk_text(text) 
    embeddings = []
    for chunk in chunks:
        embedding = generate_embedding(chunk)
        embeddings.append(embedding)
        # print(f"生成的嵌入: {embedding}")
    return embeddings



def get_relevant_text_from_db(query, top_k=3):
    distances, indices = vector_db.search(query, top_k=top_k)
    # 限制返回文本的長度
    relevant_text = vector_db.get_text_by_index(0)
    # 限制文本長度為 2000 字符
    return relevant_text[:2000] if relevant_text else ""

def generate_response_with_context(query):
    relevant_texts = get_relevant_text_from_db(query)
    prompt = (
        f"As an English teacher, please help with the following request while considering "
        f"this reference material: {relevant_texts}\n\n"
        f"Student's request: {query}\n\n"
        f"Please provide a comprehensive response that includes:\n"
        f"1. Direct answer to the student's question\n"
        f"2. Relevant explanations or teaching points\n"
        f"3. Examples if applicable\n"
        f"4. Study tips or suggestions when appropriate"
    )
    try:
        response = groq_prompt(prompt)
        return response
    except Exception as e:
        print(f"API 錯誤: {e}")
        return groq_prompt(query)
    
def save_response_to_txt(query, response, output_file="responses.txt"):
    output_dir = os.path.dirname(output_file)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)
    if not os.path.exists(output_file):
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write("# GenAI 回應記錄\n\n")
    with open(output_file, 'a', encoding='utf-8') as f:
        f.write(f"\n{'='*50}\n")
        f.write(f"用戶提問: {query}\n") 
        f.write(f"{'='*50}\n")
        f.write(response)
        f.write("\n\n")

try:
    vector_db = VectorDB()
    pdf_texts = read_pdfs_in_folder(folder_path)
    if pdf_texts:
        save_sourse_in_txt(pdf_texts, output_txt_path)
        for text in pdf_texts:
            if vector_db.add_text(text):
                print("添加了新文本")
            else:
                print("文本已存在，跳過處理")
    else:
        print("警告: 沒有找到PDF文件或文件讀取失敗")
except Exception as e:
    print(f"初始化過程出錯: {str(e)}")

    while True:
        try:
            query = input("請輸入您的問題（輸入'quit'退出）: ")
            if query.lower() == 'quit':
                break
            elif query.lower() == 'reembedding':
                reembedding()
                continue
            
            response = generate_response_with_context(query)
            save_response_to_txt(query, response)
            print("\nGenAI 回覆:", response)
            print("\n" + "="*50 + "\n")
        except Exception as e:
            print(f"處理查詢時出錯: {str(e)}")

finally:
    vector_db.close()
    print("程序結束，索引已保存")


  from tqdm.autonotebook import tqdm, trange


初始化過程出錯: name '__file__' is not defined
處理查詢時出錯: name 'vector_db' is not defined
處理查詢時出錯: name 'vector_db' is not defined
處理查詢時出錯: name 'vector_db' is not defined


NameError: name 'vector_db' is not defined