In [1]:
#pip install langchain langchain_community langchain_chroma langchainhub

Note: you may need to restart the kernel to use updated packages.


In [2]:
#pip install pymupdf

In [3]:
pip install -qU langchain-openai

Note: you may need to restart the kernel to use updated packages.


In [8]:
import getpass
import os
from dotenv import load_dotenv
import csv
import re


load_dotenv()

from langchain_openai import ChatOpenAI

MODELNAME = 'gpt-4o-mini'
llm = ChatOpenAI(model=MODELNAME)

In [4]:
import fitz  
import random
from langchain import hub
from langchain_chroma import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

def extract_text_from_pdf(pdf_path):
    doc = fitz.open(pdf_path)
    pdf_text = []
    for page_num, page in enumerate(doc):
        pdf_text.append({"text": page.get_text(), "page": page_num + 1})
    return pdf_text

pdf_path = "Securities_and_Futures_Bill_2001.pdf"
pdf_text = extract_text_from_pdf(pdf_path)

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = []
for page_info in pdf_text:
    page_splits = text_splitter.split_text(page_info["text"])
    splits.extend([{"text": split, "page": page_info["page"]} for split in page_splits])


docs = [Document(page_content=split["text"], metadata={"page": split["page"]}) for split in splits]

vectorstore = Chroma.from_documents(documents=docs, embedding=OpenAIEmbeddings())

retriever = vectorstore.as_retriever()
prompt = hub.pull("rlm/rag-prompt")

def format_docs(docs):
    return "\n\n".join(f"[Page {doc.metadata.get('page', 'Unknown')}] {doc.page_content}" for doc in docs)

llm = ChatOpenAI(model_name=MODELNAME)

def process_llm_response(llm_output):
    answer = llm_output
    page_numbers = set()
    relevant_docs = retriever.invoke(llm_output)
    for doc in relevant_docs:
        if isinstance(doc, Document):
            page_numbers.add(doc.metadata.get('page', 'Unknown'))
    return {"answer": answer, "pages": sorted(list(page_numbers))}

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
    | process_llm_response
)

In [5]:
result = rag_chain.invoke("given the phrase, An inspected person can disclose a written inspection report to another person under subsection (2) circumstances, which include disclosing to officers or auditors of the inspected person solely for performance of duties, to the Authority if requested, or to any other person approved in writing by the Authority. The Authority may impose conditions or restrictions on such disclosure, and failure to comply with these regulations may lead to penalties like fines or imprisonment. The obligation to maintain confidentiality extends even after the termination of employment or appointment., find a verbatim quote pertaining to this phrase. exclude the answer from the explanation")
print(f"Answer: {result['answer']}")
print(f"Relevant pages: {result['pages']}")

Answer: "An inspected person can disclose a written inspection report to another person under subsection (2) circumstances, which include disclosing to officers or auditors of the inspected person solely for performance of duties, to the Authority if requested, or to any other person approved in writing by the Authority."
Relevant pages: [123, 147, 149, 154]


In [24]:
class QuestionBank:
    def __init__(self, save_path: str = '.'):
        self.topics = {"organised markets": [], 
                       "approved clearing house": [], 
                       "recognised market operator": [],
                       "licensed trade repository": [], 
                       "power of Authority to revoke approval and recognition": [], 
                       "regulation of approved exchanges": [],
                       "regulation of licensed trade repositories": [], 
                       "supervisory powers": [], 
                       "investigative powers of Authority": [],
                       "prohibited conduct": [],
                       "insider trading": [], 
                       "civil liability": [], 
                       "voluntary transfer of business": [],
                       "disclosure of interests": [], 
                       "short selling": [], 
                       "take-over offers": [], 
                       "supervision and investigation": [],
                       "market conduct": [], 
                       "offers of investments": []}
        
        self.save_path = save_path
    
    def get_available_topics(self):
        """Return available topics

        Returns:
            _type_: List[str]
        """
        return list(self.topics.keys())

    """def append_to_question_bank(self, question_data, filename='question_bank.csv'):
        with open(filename, 'a', newline='', encoding='utf-8') as file:"""


    def save_to_csv(self, question_data, filename):
        with open(filename, 'w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=['question', 'answer 1', 'answer 2', 'answer 3', 'answer 4', 'correct letter', 'explanation', 'explanation_pages'])
            writer.writeheader()
            writer.writerow(question_data)

        print(f"Question saved to '{filename}'")
        
    def generate_questions(self, num_questions_per_topic=15):
        count = 0
        for topic in self.topics.keys():
            for _ in range(num_questions_per_topic):
                question = rag_chain.invoke(f"please generate a relevant question about {topic} from the pdf")
                right_answer = rag_chain.invoke(f"please generate the correct answer to the question '{question}' from the pdf")
                wrong_answer_1 = rag_chain.invoke(f"please generate an incorrect misleading answer to '{question}' from the pdf, without stating that it is incorrect or misleading")
                wrong_answer_2 = rag_chain.invoke(f"please generate an incorrect misleading answer to '{question}' from the pdf, without stating that it is incorrect or misleading")
                wrong_answer_3 = rag_chain.invoke(f"please generate an incorrect misleading answer to '{question}' from the pdf, without stating that it is incorrect or misleading")

                answers = [right_answer, wrong_answer_1, wrong_answer_2, wrong_answer_3]
                random.shuffle(answers)

                explanation_result = rag_chain.invoke(f"given the phrase,{right_answer}, find a verbatim quote from the PDF pertaining to this phrase, without repeating the phrase.")
                explanation = explanation_result['answer']
                explanation_pages = explanation_result['pages']
                
                letters = ["A","B","C","D"]

                question_data = {
                    'question': question,
                    'answer 1': answers[0],
                    'answer 2': answers[1],
                    'answer 3': answers[2],
                    'answer 4': answers[3],
                    'correct letter': letters[answers.index(right_answer)],
                    'explanation': explanation,
                    'explanation_pages': explanation_pages
                }
                
                count += 1
                
                QUESTIONS_SAVE_PATH = os.path.join(self.save_path, f'questions_{count}.csv')
                self.save_to_csv(question_data, filename=QUESTIONS_SAVE_PATH)

                # create question_bank.csv same time
                # read and write permissions so we can check if header is in file or not
                with open('question_bank.csv', 'a+', newline='', encoding='utf-8') as f_out:
                    right_answer_index = answers.index(right_answer)
                    wrong_answer_indexes = [i for i in range(len(answers)) if i != right_answer_index]
                    bank_data = {'topic': topic,
                                 'question': question_data['question'],
                                 'correct_answer': answers[right_answer_index],
                                 'wrong_answer_1': answers[wrong_answer_indexes[0]],
                                 'wrong_answer_2': answers[wrong_answer_indexes[1]],
                                 'wrong_answer_3': answers[wrong_answer_indexes[2]],
                                 'explanation': explanation,
                                 'explanation_pages': explanation_pages
                                 }
                    
                    header = f'{','.join(bank_data.keys())}\n'

                    # read first line
                    f_out.seek(0)
                    first_line = f_out.readline()
                    HEADER_EXISTS = header == first_line

                    if not HEADER_EXISTS:
                        f_out.write(header)

                    writer = csv.DictWriter(f_out, fieldnames=bank_data.keys())
                    writer.writerow(bank_data)

            print(f"Generated {num_questions_per_topic} questions about '{topic}'")

    def batch_generate_questions(self, num_questions_per_topic=15):
        count = 0
        for topic in self.topics.keys():
            for _ in range(num_questions_per_topic):
                question = rag_chain.invoke(f"please generate a relevant question about {topic} from the pdf")
                right_answer = rag_chain.invoke(f"please generate the correct answer to the question '{question}' from the pdf")
                wrong_answer_1 = rag_chain.invoke(f"please generate an incorrect misleading answer to '{question}' from the pdf, without stating that it is incorrect or misleading")
                wrong_answer_2 = rag_chain.invoke(f"please generate an incorrect misleading answer to '{question}' from the pdf, without stating that it is incorrect or misleading")
                wrong_answer_3 = rag_chain.invoke(f"please generate an incorrect misleading answer to '{question}' from the pdf, without stating that it is incorrect or misleading")

                answers = [right_answer, wrong_answer_1, wrong_answer_2, wrong_answer_3]
                random.shuffle(answers)

                explanation_result = rag_chain.invoke(f"given the phrase,{right_answer}, find a verbatim quote from the PDF pertaining to this phrase, without repeating the phrase.")
                explanation = explanation_result['answer']
                explanation_pages = explanation_result['pages']
                
                letters = ["A","B","C","D"]

                question_data = {
                    'question': question,
                    'answer 1': answers[0],
                    'answer 2': answers[1],
                    'answer 3': answers[2],
                    'answer 4': answers[3],
                    'correct letter': letters[answers.index(right_answer)],
                    'explanation': explanation,
                    'explanation_pages': explanation_pages
                }
                
                count += 1
                
                QUESTIONS_SAVE_PATH = os.path.join(self.save_path, f'questions_{count}.csv')
                self.save_to_csv(question_data, filename=QUESTIONS_SAVE_PATH)

                # create question_bank.csv same time
                # read and write permissions so we can check if header is in file or not
                with open('question_bank.csv', 'a+', newline='', encoding='utf-8') as f_out:
                    right_answer_index = answers.index(right_answer)
                    wrong_answer_indexes = [i for i in range(len(answers)) if i != right_answer_index]
                    bank_data = {'topic': topic,
                                 'question': question_data['question'],
                                 'correct_answer': answers[right_answer_index],
                                 'wrong_answer_1': answers[wrong_answer_indexes[0]],
                                 'wrong_answer_2': answers[wrong_answer_indexes[1]],
                                 'wrong_answer_3': answers[wrong_answer_indexes[2]],
                                 'explanation': explanation,
                                 'explanation_pages': explanation_pages
                                 }
                    
                    header = f'{','.join(bank_data.keys())}\n'

                    # read first line
                    f_out.seek(0)
                    first_line = f_out.readline()
                    HEADER_EXISTS = header == first_line

                    if not HEADER_EXISTS:
                        f_out.write(header)

                    writer = csv.DictWriter(f_out, fieldnames=bank_data.keys())
                    writer.writerow(bank_data)

            print(f"Generated {num_questions_per_topic} questions about '{topic}'")

    def get_questions_from_topics(self, selected_topics, num_questions):
        all_questions = []
        for topic in selected_topics:
            if topic in self.topics and self.topics[topic]:
                all_questions.extend(self.topics[topic])
        if not all_questions:
            raise ValueError("No questions available for the selected topics.")
        return random.sample(all_questions, min(num_questions, len(all_questions)))

In [25]:
question_bank = QuestionBank()
question_bank.generate_questions(2)

Question saved to './questions_1.csv'
topic,question,correct_answer,wrong_answer_1,wrong_answer_2,wrong_answer_3,explanation,explanation_pages


False
Question saved to './questions_2.csv'
topic,question,correct_answer,wrong_answer_1,wrong_answer_2,wrong_answer_3,explanation,explanation_pages

topic,question,correct_answer,wrong_answer_1,wrong_answer_2,wrong_answer_3,explanation,explanation_pages

True
Generated 2 questions about 'organised markets'


KeyboardInterrupt: 

In [10]:
import csv
import ast

class QuestionBank:
    def __init__(self, csv_file='question_bank.csv'):
        self.topics = {}
        self.load_questions_from_csv(csv_file)

    def load_questions_from_csv(self, csv_file):
        with open(csv_file, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row in reader:
                topic = row['topic']
                if topic not in self.topics:
                    self.topics[topic] = []
                
                for key in ['question', 'correct_answer', 'wrong_answer_1', 'wrong_answer_2', 'wrong_answer_3']:
                    try:
                        row[key] = ast.literal_eval(row[key])
                    except (ValueError, SyntaxError):
                        pass
                
                row['explanation'] = row['explanation']
                
                try:
                    row['explanation_pages'] = [int(page) for page in ast.literal_eval(row['explanation_pages'])]
                except (ValueError, SyntaxError):
                    row['explanation_pages'] = []
                
                self.topics[topic].append(row)

    def get_questions_from_topics(self, selected_topics, num_questions):
        all_questions = []
        for topic in selected_topics:
            if topic in self.topics and self.topics[topic]:
                all_questions.extend(self.topics[topic])
        if not all_questions:
            raise ValueError("No questions available for the selected topics.")
        return random.sample(all_questions, min(num_questions, len(all_questions)))

    def get_available_topics(self):
        return list(self.topics.keys())


In [11]:
import datetime
import ipywidgets as widgets

class Quiz:
    def __init__(self, name, question_bank):
        self.name = name
        self.question_bank = question_bank
        self.score = []
        self.current_question = 0
        self.questions = []
        self.selected_topics = []
        self.start_time = None
        self.end_time = None

    def run(self):
        try:
            self.start_time = datetime.now()
            self.questions = self.question_bank.get_questions_from_topics(self.selected_topics, 10)
            self.current_question = 0
            self.score = []
            self.display_question()
        except ValueError as e:
            print(f"Error: {str(e)}")
            print("Please make sure you've selected topics with available questions.")

    def display_question(self):
        if self.current_question < len(self.questions):
            q = self.questions[self.current_question]

            question_text = widgets.HTML(value=f"<b>{self.current_question + 1}. {q['question']['answer'] if isinstance(q['question'], dict) else q['question']}</b>")

            answers = [
                q['correct_answer']['answer'] if isinstance(q['correct_answer'], dict) else q['correct_answer'],
                q['wrong_answer_1']['answer'] if isinstance(q['wrong_answer_1'], dict) else q['wrong_answer_1'],
                q['wrong_answer_2']['answer'] if isinstance(q['wrong_answer_2'], dict) else q['wrong_answer_2'],
                q['wrong_answer_3']['answer'] if isinstance(q['wrong_answer_3'], dict) else q['wrong_answer_3']
            ]
            random.shuffle(answers)

            options = ['A', 'B', 'C', 'D']
            radio_options = [f"{opt}. {ans}" for opt, ans in zip(options, answers)]

            radio = widgets.RadioButtons(options=radio_options, layout={'width': 'max-content'})

            submit_button = widgets.Button(description="Submit")
            output = widgets.Output()

            def on_submit(b):
                with output:
                    clear_output()
                    if not radio.value:
                        print("Please select an answer.")
                        return

                    selected_option = radio.value[0]  
                    correct_answer = q['correct_answer']['answer'] if isinstance(q['correct_answer'], dict) else q['correct_answer']
                    is_correct = answers[options.index(selected_option)] == correct_answer

                    if is_correct:
                        print("Correct!")
                        self.score.append(True)
                    else:
                        print(f"Wrong! The correct answer was: {correct_answer}")
                        self.score.append(False)

                    print("\nQuote from text:")
                    print(q['explanation'])

                    if q['explanation_pages']:
                        print("\nRelevant Pages:", ", ".join(map(str, q['explanation_pages'])))

                    self.current_question += 1
                    if self.current_question < len(self.questions):
                        print("\nMoving to next question...")
                        self.display_question()
                    else:
                        self.show_results()

            submit_button.on_click(on_submit)

            display(question_text, radio, submit_button, output)


    def show_results(self):
        self.end_time = datetime.now()
        correct_answers = sum(self.score)
        if correct_answers>=6:
            pass_or_fail = "passed"
        else:
            pass_or_fail = "failed"
        result_text = f"{self.name}, you got {correct_answers} out of 10 correct!"
        display(widgets.HTML(f"<h3>{result_text}</h3>"))
        
        self.save_results_to_csv(correct_answers, pass_or_fail)

    def save_results_to_csv(self, correct_answers, pass_or_fail):
        results_file = 'quiz_results.csv'
        file_exists = os.path.isfile(results_file)
        
        with open(results_file, 'a', newline='') as csvfile:
            fieldnames = ['Name', 'Date', 'Time', 'Score', 'Status']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            
            if not file_exists:
                writer.writeheader()
            
            writer.writerow({
                'Name': self.name,
                'Date': self.end_time.strftime('%Y-%m-%d'),
                'Time': self.end_time.strftime('%H:%M:%S'),
                'Score': f"{correct_answers}/10",
                'Status': pass_or_fail
            })

In [16]:
def run_quiz_application():
    question_bank = QuestionBank('question_bank.csv')

    print("Welcome to the Securities and Futures Act 2001 Quiz. This quiz contains 10 questions. You may change your answer before entering 'Submit'. all questions are pulled from the document: https://datahub.ucsd.edu/user/a7garg/files/private/Securities%20and%20Futures%20Act%202001.pdf.You may use it as reference. Good luck!")
    output = widgets.Output()

    name_input = widgets.Text(description="Your Name:")
    start_quiz_button = widgets.Button(description="Start Quiz")
    start_quiz_button.disabled = True  

    def on_selection_confirmed(b):
        with output:
            clear_output()
            print("Topics selected. Please enter your name and click 'Start Quiz'.")
        name_input.layout.visibility = 'visible'
        start_quiz_button.disabled = False

    def start_quiz(b):
        name = name_input.value
        if name:
            with output:
                clear_output()
                print(f"Starting quiz for {name}")
                quiz = Quiz(name, question_bank)
                quiz.selected_topics = [widget.description for widget in topic_widgets if widget.value]
                quiz.run()
        else:
            with output:
                print("Please enter your name before starting the quiz.")

    def confirm_selection(b):
        selected_topics = [widget.description for widget in topic_widgets if widget.value]
        with output:
            clear_output()
            print(f"Selected topics: {', '.join(selected_topics)}")
        on_selection_confirmed(b)

    available_topics = question_bank.get_available_topics()
    topic_widgets = [widgets.Checkbox(value=False, description=topic) for topic in available_topics]
    select_all_button = widgets.Button(description="Select All")
    deselect_all_button = widgets.Button(description="Deselect All")
    confirm_button = widgets.Button(description="Confirm Selection")

    def select_all(b):
        for widget in topic_widgets:
            widget.value = True

    def deselect_all(b):
        for widget in topic_widgets:
            widget.value = False

    select_all_button.on_click(select_all)
    deselect_all_button.on_click(deselect_all)
    confirm_button.on_click(confirm_selection)
    start_quiz_button.on_click(start_quiz)

    name_input.layout.visibility = 'hidden'

    display(widgets.VBox([
        widgets.HBox([select_all_button, deselect_all_button, confirm_button]),
        widgets.VBox(topic_widgets),
        name_input,
        start_quiz_button,
        output
    ]))

In [17]:
if __name__ == "__main__":
    run_quiz_application()

KeyError: 'topic'