<a href="https://colab.research.google.com/github/navneetkrc/Open_LLM_Apps/blob/main/QA_PAir_Generation_via_CLI_using_Ollama_with_Google_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Cell 1 - Install Required Libraries:
!pip install -q streamlit pymupdf4llm==0.0.17 pandas openpyxl pyngrok requests langchain-community PyPDF2  # Added 'requests' for Ollama API calls

# Install Ollama (using a convenient installation script)
!curl -fsSL https://ollama.com/install.sh | sh

# Install and setup ngrok (as in your original code)
!curl -s https://ngrok-agent.s3.amazonaws.com/ngrok.asc | sudo tee /etc/apt/trusted.gpg.d/ngrok.asc >/dev/null && echo "deb https://ngrok-agent.s3.amazonaws.com buster main" | sudo tee /etc/apt/sources.list.d/ngrok.list && sudo apt update && sudo apt install ngrok

In [None]:
# Cell 2 - Import Libraries and Set Up Environment:
import streamlit as st
# import fitz  # PyMuPDF4LLM
import pymupdf4llm
import pandas as pd
import os
import time
import requests  # For making requests to the Ollama API
from google.colab import files

from google.colab import userdata
# Remove Groq API key setup
# os.environ["GROQ_API_KEY"] = userdata.get('groq_colab_key') #krch
# os.environ["GROQ_API_KEY"] = userdata.get('groq_604779')
os.environ["NGROK_AUTH_TOKEN"] = userdata.get('NGROK_AUTH_TOKEN')

# Get ngrok auth token from environment
ngrok_token = userdata.get('NGROK_AUTH_TOKEN')
!ngrok authtoken {ngrok_token}

# Start Ollama in the background (replace 'model_one' and 'model_two' with your actual model names)
!nohup ollama serve > ollama.log 2>&1 &
!ollama pull gemma2  # Example - replace with your actual model names
!ollama pull llama3.2 # Example - replace with your actual model names
time.sleep(10) # Give Ollama time to start

In [6]:
%%writefile app.py

import streamlit as st
from langchain_community.llms import Ollama

# Configure page settings
st.set_page_config(page_title="Text Processor", page_icon="📝")

# Define constants
OLLAMA_BASE_URL = "http://localhost:11434/"
OLLAMA_MODEL_NAME = "gemma2"

def generate_questions(content):
    """
    Generates questions from the given content using Ollama model
    Args:
        content (str): Text content to generate questions from
    Returns:
        list: List of generated questions
    """
    try:
        # Creating Ollama client
        ollama = Ollama(base_url=OLLAMA_BASE_URL, model=OLLAMA_MODEL_NAME)

        # Improved prompt design for question generation
        prompt = f"""
        Based on the following text, generate exactly 3 meaningful questions that test understanding
        of the key concepts. The questions should be diverse and cover different aspects of the content.

        Text:
        {content.strip()}

        Please provide exactly 3 questions, each on a new line.
        """

        # Making the API call
        response = ollama.invoke(prompt)

        # Split the response into individual questions
        questions = [q.strip() for q in response.strip().split('\n') if q.strip()]

        # Ensure we have exactly 3 questions
        return questions[:3]
    except Exception as e:
        return [f"Error: An error occurred during question generation: {e}"]

def process_text_chunks(content, chunk_size=500):
    """
    Splits text into chunks and generates questions for each chunk
    Args:
        content (str): Text content to process
        chunk_size (int): Size of each chunk in characters
    Returns:
        dict: Dictionary mapping chunk numbers to lists of questions
    """
    # Split text into chunks (simple splitting by characters)
    chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]

    # Generate questions for each chunk
    questions_by_chunk = {}
    for i, chunk in enumerate(chunks, 1):
        questions = generate_questions(chunk)
        questions_by_chunk[f"Chunk {i}"] = questions

    return questions_by_chunk

def main():
    # Add title and description
    st.title("📝 Generating Questions from Text Chunks")
    st.markdown("Upload a text file or enter text directly to generate questions from the content.")

    # Create tabs for different input methods
    tab1, tab2 = st.tabs(["Enter Text", "Upload File"])

    with tab1:
        # Text input
        text_input = st.text_area("Enter your text here:", height=200)
        chunk_size = st.number_input("Chunk size (characters):", min_value=100, value=500, step=100)

        if st.button("Generate Questions", key="process_text"):
            if text_input:
                with st.spinner("Generating questions..."):
                    questions_by_chunk = process_text_chunks(text_input, chunk_size)

                    # Display questions for each chunk
                    for chunk_num, questions in questions_by_chunk.items():
                        st.subheader(f"Questions for {chunk_num}:")
                        for i, question in enumerate(questions, 1):
                            st.write(f"{i}. {question}")
                        st.markdown("---")
            else:
                st.error("Please enter some text to process.")

    with tab2:
        # File upload
        uploaded_file = st.file_uploader("Choose a text file", type=['txt'])
        if uploaded_file is not None:
            content = uploaded_file.read().decode()
            st.text_area("File content:", content, height=200)
            chunk_size = st.number_input("Chunk size (characters):", min_value=100, value=500, step=100, key="file_chunk_size")

            if st.button("Generate Questions", key="process_file"):
                with st.spinner("Generating questions..."):
                    questions_by_chunk = process_text_chunks(content, chunk_size)

                    # Display questions for each chunk
                    for chunk_num, questions in questions_by_chunk.items():
                        st.subheader(f"Questions for {chunk_num}:")
                        for i, question in enumerate(questions, 1):
                            st.write(f"{i}. {question}")
                        st.markdown("---")

if __name__ == "__main__":
    main()

Overwriting app.py


In [27]:
from pyngrok import ngrok
import time

# Kill any existing Streamlit processes
!kill -9 $(pgrep streamlit) 2>/dev/null
!nohup ollama serve > ollama.log 2>&1 & #uncomment from 2nd run as ollama closes
time.sleep(5)

# Start Streamlit
!streamlit run app.py &>/content/logs.txt &
time.sleep(5)

# Create ngrok tunnel with correct configuration
ngrok_tunnel = ngrok.connect(addr="8501", proto="http", bind_tls=True)
print(f"Streamlit app URL: {ngrok_tunnel.public_url}")

# Keep the tunnel open
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Closing ngrok tunnel...")
    ngrok.kill()

Streamlit app URL: https://97c5-34-125-243-139.ngrok-free.app
Closing ngrok tunnel...


In [33]:
%%writefile app_with_pdf_text.py
import streamlit as st
from langchain_community.llms import Ollama
import pandas as pd
import os
from datetime import datetime
import pymupdf4llm
import io

# Configure page settings
st.set_page_config(page_title="Generate QA Pairs from text Chunks", page_icon="📝")

# Define constants
OLLAMA_BASE_URL = "http://localhost:11434/"
OLLAMA_MODEL_NAME = "gemma2"

def generate_qa_pairs_new(content):
    """
    Generates question-answer pairs from the given content using Ollama model
    Args:
        content (str): Text content to generate QA pairs from
    Returns:
        list: List of dictionaries containing questions, answers, and context
    """
    try:
        # Creating Ollama client
        ollama = Ollama(base_url=OLLAMA_BASE_URL, model=OLLAMA_MODEL_NAME)

        # Modified prompt design with stricter formatting requirements
        prompt = f"""
        Generate 3 question-answer pairs based on the following text. Make sure each question is specific and answerable from the given context.
        Use this exact format, starting each pair with "Q:" and "A:" on new lines:

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Text to use:
        {content.strip()}
        """

        # Making the API call
        response = ollama.invoke(prompt)

        # Parse the response into QA pairs using a more robust method
        qa_pairs = []
        lines = response.strip().split('\n')
        current_question = None
        current_answer = None

        for line in lines:
            line = line.strip()
            if not line:  # Skip empty lines
                continue

            if line.startswith('Q:'):
                # If we have a complete pair, add it
                if current_question and current_answer:
                    qa_pairs.append({
                        'question': current_question,
                        'answer': current_answer,
                        'context': content.strip()
                    })
                current_question = line[2:].strip()
                current_answer = None
            elif line.startswith('A:'):
                current_answer = line[2:].strip()

        # Add the last pair if complete
        if current_question and current_answer:
            qa_pairs.append({
                'question': current_question,
                'answer': current_answer,
                'context': content.strip()
            })

        # If we didn't get any valid pairs, generate a fallback pair
        if not qa_pairs:
            # Generate at least one fallback question-answer pair
            fallback_prompt = f"""
            Generate one clear question and its answer from this text:
            {content.strip()}
            Format as:
            Q: [question]
            A: [answer]
            """
            fallback_response = ollama.invoke(fallback_prompt)
            lines = fallback_response.strip().split('\n')
            for line in lines:
                if line.startswith('Q:'):
                    current_question = line[2:].strip()
                elif line.startswith('A:'):
                    current_answer = line[2:].strip()

            if current_question and current_answer:
                qa_pairs.append({
                    'question': current_question,
                    'answer': current_answer,
                    'context': content.strip()
                })

        # Validate and clean up the QA pairs
        validated_pairs = []
        for pair in qa_pairs:
            # Ensure both question and answer exist and are not empty
            if pair['question'] and pair['answer'] and \
               len(pair['question'].strip()) > 0 and \
               len(pair['answer'].strip()) > 0:
                validated_pairs.append(pair)

        return validated_pairs if validated_pairs else [{
            'question': 'What is the main topic of this text?',
            'answer': 'The text discusses ' + content[:50] + '...',
            'context': content.strip()
        }]

    except Exception as e:
        st.error(f"Error in QA generation: {str(e)}")
        return [{
            'question': 'What is the main topic of this text?',
            'answer': 'Error occurred during processing. Please try again.',
            'context': content.strip()
        }]

# [Rest of the code remains the same]
def extract_text_from_pdf(pdf_file):
    """
    Extracts text from a PDF file
    Args:
        pdf_file: Uploaded PDF file object
    Returns:
        str: Extracted text content
    """
    try:
        # Create a PDF reader object
        pdf_reader = pymupdf4llm.to_markdown(io.BytesIO(pdf_file.read()))

        # Extract text from all pages
        text = ""
        for page in pdf_reader.pages:
            text += page.extract_text() + "\n"

        return text.strip()
    except Exception as e:
        st.error(f"Error extracting text from PDF: {str(e)}")
        return ""

def process_text_chunks(content, filename, chunk_size=500):
    """
    Splits text into chunks and generates QA pairs for each chunk
    Args:
        content (str): Text content to process
        filename (str): Name of the input file
        chunk_size (int): Size of each chunk in characters
    Returns:
        pd.DataFrame: DataFrame containing all QA pairs with metadata
    """
    # Split text into chunks
    chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]

    # Generate QA pairs for each chunk and collect data for DataFrame
    data = []
    for chunk_num, chunk in enumerate(chunks, 1):
        qa_pairs = generate_qa_pairs_new(chunk)
        for qa_pair in qa_pairs:
            data.append({
                'pdf_name': filename,
                'chunk_number': chunk_num,
                'question': qa_pair.get('question', ''),
                'answer': qa_pair.get('answer', ''),
                'context': qa_pair.get('context', '')
            })

    return pd.DataFrame(data)

def save_to_csv(df):
    """
    Saves the DataFrame to a CSV file with timestamp
    Args:
        df (pd.DataFrame): DataFrame to save
    Returns:
        str: Path to the saved file
    """
    # Create 'outputs' directory if it doesn't exist
    if not os.path.exists('outputs'):
        os.makedirs('outputs')

    # Generate filename with timestamp
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = f'outputs/qa_pairs_{timestamp}.csv'

    # Save DataFrame to CSV
    df.to_csv(filename, index=False)
    return filename

def main():
    # Add title and description
    st.title("📝 Text Processor with Q&A Generation")
    st.markdown("Upload a text file or enter text directly to generate question-answer pairs and save to CSV.")

    # Create tabs for different input methods
    tab1, tab2 = st.tabs(["Enter Text", "Upload File"])

    with tab1:
        # Text input
        text_input = st.text_area("Enter your text here:", height=200)
        chunk_size = st.number_input("Chunk size (characters):", min_value=100, value=500, step=100)

        if st.button("Generate Q&A Pairs", key="process_text"):
            if text_input:
                with st.spinner("Generating Q&A pairs..."):
                    df = process_text_chunks(text_input, "manual_input.txt", chunk_size)

                    # Display the Q&A pairs
                    st.subheader("Generated Q&A Pairs:")
                    for chunk_num in df['chunk_number'].unique():
                        chunk_data = df[df['chunk_number'] == chunk_num]
                        st.write(f"### Chunk {chunk_num}")
                        for _, row in chunk_data.iterrows():
                            st.write(f"**Q:** {row['question']}")
                            st.write(f"**A:** {row['answer']}")
                            st.write("**Context:**")
                            st.write(row['context'])
                            st.markdown("---")

                    # Save to CSV
                    csv_path = save_to_csv(df)
                    st.success(f"Q&A pairs saved to: {csv_path}")

                    # Add download button
                    with open(csv_path, 'rb') as file:
                        st.download_button(
                            label="Download CSV",
                            data=file,
                            file_name=os.path.basename(csv_path),
                            mime='text/csv'
                        )
            else:
                st.error("Please enter some text to process.")

    with tab2:
        # File upload with support for PDF and TXT
        uploaded_file = st.file_uploader("Choose a file", type=['txt', 'pdf'])

        if uploaded_file is not None:
            # Extract text based on file type
            file_extension = uploaded_file.name.split('.')[-1].lower()

            if file_extension == 'pdf':
                content = extract_text_from_pdf(uploaded_file)
                if not content:
                    st.error("Unable to extract text from PDF. Please check if the PDF is text-based and not scanned.")
                    st.stop()
            else:  # txt file
                content = uploaded_file.read().decode('utf-8', errors='ignore')

            # Display extracted content
            st.text_area("Extracted content:", content, height=200)

            chunk_size = st.number_input("Chunk size (characters):", min_value=100, value=500, step=100, key="file_chunk_size")

            if st.button("Generate Q&A Pairs", key="process_file"):
                with st.spinner("Generating Q&A pairs..."):
                    df = process_text_chunks(content, uploaded_file.name, chunk_size)

                    # Display the Q&A pairs
                    st.subheader("Generated Q&A Pairs:")
                    for chunk_num in df['chunk_number'].unique():
                        chunk_data = df[df['chunk_number'] == chunk_num]
                        st.write(f"### Chunk {chunk_num}")
                        for _, row in chunk_data.iterrows():
                            st.write(f"**Q:** {row['question']}")
                            st.write(f"**A:** {row['answer']}")
                            st.write("**Context:**")
                            st.write(row['context'])
                            st.markdown("---")

                    # Save to CSV
                    csv_path = save_to_csv(df)
                    st.success(f"Q&A pairs saved to: {csv_path}")

                    # Add download button
                    with open(csv_path, 'rb') as file:
                        st.download_button(
                            label="Download CSV",
                            data=file,
                            file_name=os.path.basename(csv_path),
                            mime='text/csv'
                        )

if __name__ == "__main__":
    main()

Overwriting app_with_pdf_text.py


In [34]:
from pyngrok import ngrok
import time

# Kill any existing Streamlit processes
!kill -9 $(pgrep streamlit) 2>/dev/null
time.sleep(10)

# Start Streamlit
!streamlit run app_with_pdf_text.py &>/content/logs.txt &
!nohup ollama serve > ollama.log 2>&1 & #Uncomment this from 2nd run onwards as ollama serve is closed/interrupted
time.sleep(10)
# Create ngrok tunnel with correct configuration
ngrok_tunnel = ngrok.connect(addr="8501", proto="http", bind_tls=True)
print(f"Streamlit app URL: {ngrok_tunnel.public_url}")

# Keep the tunnel open
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Closing ngrok tunnel...")
    ngrok.kill()

Streamlit app URL: https://c0bd-34-125-243-139.ngrok-free.app
Closing ngrok tunnel...


In [36]:
!nohup ollama serve > ollama.log 2>&1 & #Uncomment this from 2nd run onwards as ollama serve is closed/interrupted

!ollama list

NAME               ID              SIZE      MODIFIED      
gemma2:latest      ff02c3702f32    5.4 GB    5 minutes ago    
llama3.2:latest    a80c4f17acd5    2.0 GB    5 minutes ago    


##CLI Version

In [11]:
%%writefile qa_generator_cli.py
import argparse
from langchain_community.llms import Ollama
import pandas as pd
import os
from datetime import datetime
import PyPDF2
import io
import sys

# Define constants
OLLAMA_BASE_URL = "http://localhost:11434/"
OLLAMA_MODEL_NAME = "gemma2"

def generate_qa_pairs(content):
    """
    Generates question-answer pairs from the given content using Ollama model
    Args:
        content (str): Text content to generate QA pairs from
    Returns:
        list: List of dictionaries containing questions, answers, and context
    """
    try:
        # Creating Ollama client
        ollama = Ollama(base_url=OLLAMA_BASE_URL, model=OLLAMA_MODEL_NAME)

        prompt = f"""
        Generate 3 question-answer pairs based on the following text. Make sure each question is specific and answerable from the given context.
        Use this exact format, starting each pair with "Q:" and "A:" on new lines:

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Text to use:
        {content.strip()}
        """

        response = ollama.invoke(prompt)

        qa_pairs = []
        lines = response.strip().split('\n')
        current_question = None
        current_answer = None

        for line in lines:
            line = line.strip()
            if not line:
                continue

            if line.startswith('Q:'):
                if current_question and current_answer:
                    qa_pairs.append({
                        'question': current_question,
                        'answer': current_answer,
                        'context': content.strip()
                    })
                current_question = line[2:].strip()
                current_answer = None
            elif line.startswith('A:'):
                current_answer = line[2:].strip()

        if current_question and current_answer:
            qa_pairs.append({
                'question': current_question,
                'answer': current_answer,
                'context': content.strip()
            })

        return qa_pairs if qa_pairs else [{
            'question': 'What is the main topic of this text?',
            'answer': 'The text discusses ' + content[:50] + '...',
            'context': content.strip()
        }]

    except Exception as e:
        print(f"Error in QA generation: {str(e)}", file=sys.stderr)
        return [{
            'question': 'What is the main topic of this text?',
            'answer': 'Error occurred during processing. Please try again.',
            'context': content.strip()
        }]

def process_text_chunks(content, chunk_size=500, overlap=100):
    """
    Splits text into chunks with overlap and generates QA pairs for each chunk
    Args:
        content (str): Text content to process
        chunk_size (int): Size of text chunks in characters
        overlap (int): Number of characters to overlap between chunks
    Returns:
        pd.DataFrame: DataFrame containing all QA pairs with metadata
    """
    # Split content into words to ensure we don't cut words in middle
    words = content.split()

    # Initialize variables for chunking
    chunks = []
    chunk_words = []
    current_size = 0
    last_chunk_end = 0

    # Create chunks with overlap
    for i, word in enumerate(words):
        chunk_words.append(word)
        current_size += len(word) + 1  # +1 for space

        # Check if we've reached chunk size
        if current_size >= chunk_size:
            # Join words to form chunk
            chunk = ' '.join(chunk_words)
            chunks.append(chunk)

            # Calculate how many words to keep for overlap
            # First, find the last 'overlap' characters worth of words
            overlap_start = max(0, len(chunk) - overlap)
            overlap_text = chunk[overlap_start:]
            overlap_words = overlap_text.split()

            # Reset for next chunk, starting with overlap words
            chunk_words = overlap_words
            current_size = sum(len(word) + 1 for word in overlap_words)
            last_chunk_end = i - len(overlap_words)

    # Add the remaining text as the last chunk if there's any
    if chunk_words:
        chunks.append(' '.join(chunk_words))

    # Process chunks and generate QA pairs
    data = []
    total_chunks = len(chunks)
    print(f"Processing {total_chunks} chunks...")

    for chunk_num, chunk in enumerate(chunks, 1):
        print(f"Processing chunk {chunk_num}/{total_chunks}")
        qa_pairs = generate_qa_pairs(chunk)
        for qa_pair in qa_pairs:
            data.append({
                'chunk_number': chunk_num,
                'question': qa_pair.get('question', ''),
                'answer': qa_pair.get('answer', ''),
                'context': qa_pair.get('context', ''),
                'chunk_start': chunk[:50] + '...',  # First 50 chars of chunk for reference
                'chunk_end': '...' + chunk[-50:]    # Last 50 chars of chunk for reference
            })

    return pd.DataFrame(data)

def save_to_csv(df, output_dir="."):
    """
    Saves the DataFrame to a CSV file with timestamp in the specified directory
    Args:
        df (pd.DataFrame): DataFrame to save
        output_dir (str): Directory to save the CSV file
    Returns:
        str: Path to the saved file
    """
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = os.path.join(output_dir, f'qa_pairs_{timestamp}.csv')

    df.to_csv(filename, index=False)
    return filename

def extract_text_from_pdf(pdf_path):
    """
    Extracts text from a PDF file
    Args:
        pdf_path (str): Path to the PDF file
    Returns:
        str: Extracted text content
    """
    try:
        with open(pdf_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            text = ""
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
            return text.strip()
    except Exception as e:
        print(f"Error extracting text from PDF: {str(e)}", file=sys.stderr)
        return ""

def main():
    parser = argparse.ArgumentParser(description='Generate Q&A pairs from text content')
    parser.add_argument('input', help='Input file path (PDF or TXT) or text string')
    parser.add_argument('--chunk-size', type=int, default=500, help='Size of text chunks (default: 500)')
    parser.add_argument('--output-dir', default=".", help='Directory to save the output CSV file (default: current directory)')
    parser.add_argument('--is-file', action='store_true', help='Treat input as a file path')

    args = parser.parse_args()

    try:
        # Process input
        if args.is_file:
            if not os.path.exists(args.input):
                print(f"Error: File not found: {args.input}", file=sys.stderr)
                sys.exit(1)

            file_extension = os.path.splitext(args.input)[1].lower()
            if file_extension == '.pdf':
                content = extract_text_from_pdf(args.input)
                if not content:
                    print("Error: Could not extract text from PDF", file=sys.stderr)
                    sys.exit(1)
            else:  # txt file
                with open(args.input, 'r', encoding='utf-8') as file:
                    content = file.read()

            filename = os.path.basename(args.input)
        else:
            content = args.input
            filename = "input_text"

        # Process the content
        df = process_text_chunks(content, filename, args.chunk_size)

        # Save to CSV
        output_path = save_to_csv(df, args.output_dir)
        print(f"\nQ&A pairs saved to: {output_path}")

    except Exception as e:
        print(f"Error: {str(e)}", file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()

Writing qa_generator_cli.py


In [19]:
!python qa_generator_cli.py
""" Title: Fixing Heating Issues in Samsung Microwave Ovens
Samsung microwaves are designed for efficient cooking and reheating. If the microwave is not heating properly, follow these steps:
Inspect Power Supply:
Ensure the microwave is properly plugged into a functional power outlet.
Check for visible damage to the power cord.
Check Door Seal:
Ensure the microwave door is fully closed and the seals are clean and intact.
Test Cooking Mode:
Place a cup of water inside and run the microwave on high power for 1-2 minutes. If the water doesn’t heat, the issue may be with the magnetron or other internal components.
Reset Microwave:
Unplug the unit for 30 seconds, then plug it back in and restart.
Contact Samsung Service:
If none of these steps work, contact Samsung Support with the model and serial number of your microwave for further assistance."""

usage: qa_generator_cli.py [-h] [--chunk-size CHUNK_SIZE] [--output-dir OUTPUT_DIR] [--is-file]
                           input
qa_generator_cli.py: error: the following arguments are required: input


' Title: Fixing Heating Issues in Samsung Microwave Ovens\nSamsung microwaves are designed for efficient cooking and reheating. If the microwave is not heating properly, follow these steps:\nInspect Power Supply:\nEnsure the microwave is properly plugged into a functional power outlet.\nCheck for visible damage to the power cord.\nCheck Door Seal:\nEnsure the microwave door is fully closed and the seals are clean and intact.\nTest Cooking Mode:\nPlace a cup of water inside and run the microwave on high power for 1-2 minutes. If the water doesn’t heat, the issue may be with the magnetron or other internal components.\nReset Microwave:\nUnplug the unit for 30 seconds, then plug it back in and restart.\nContact Samsung Service:\nIf none of these steps work, contact Samsung Support with the model and serial number of your microwave for further assistance.'

In [37]:
%%writefile qa_generator.py
from langchain_community.llms import Ollama
import pandas as pd
from datetime import datetime

# Define constants
OLLAMA_BASE_URL = "http://localhost:11434/"
OLLAMA_MODEL_NAME = "gemma2"

def generate_qa_pairs(content):
    """
    Generates question-answer pairs from the given content using Ollama model
    Args:
        content (str): Text content to generate QA pairs from
    Returns:
        list: List of dictionaries containing questions, answers, and context
    """
    try:
        # Creating Ollama client
        ollama = Ollama(base_url=OLLAMA_BASE_URL, model=OLLAMA_MODEL_NAME)

        prompt = f"""
        Generate 3 question-answer pairs based on the following text. Make sure each question is specific and answerable from the given context.
        Use this exact format, starting each pair with "Q:" and "A:" on new lines:

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Q: [Write a specific question about the content]
        A: [Write the corresponding answer]

        Text to use:
        {content.strip()}
        """

        response = ollama.invoke(prompt)

        qa_pairs = []
        lines = response.strip().split('\n')
        current_question = None
        current_answer = None

        for line in lines:
            line = line.strip()
            if not line:
                continue

            if line.startswith('Q:'):
                if current_question and current_answer:
                    qa_pairs.append({
                        'question': current_question,
                        'answer': current_answer,
                        'context': content.strip()
                    })
                current_question = line[2:].strip()
                current_answer = None
            elif line.startswith('A:'):
                current_answer = line[2:].strip()

        if current_question and current_answer:
            qa_pairs.append({
                'question': current_question,
                'answer': current_answer,
                'context': content.strip()
            })

        return qa_pairs if qa_pairs else [{
            'question': 'What is the main topic of this text?',
            'answer': 'The text discusses ' + content[:50] + '...',
            'context': content.strip()
        }]

    except Exception as e:
        print(f"Error in QA generation: {str(e)}")
        return [{
            'question': 'What is the main topic of this text?',
            'answer': 'Error occurred during processing. Please try again.',
            'context': content.strip()
        }]

def process_text_chunks(content, chunk_size=500, overlap=100):
    """
    Splits text into chunks with overlap and generates QA pairs for each chunk
    Args:
        content (str): Text content to process
        chunk_size (int): Size of text chunks in characters
        overlap (int): Number of characters to overlap between chunks
    Returns:
        pd.DataFrame: DataFrame containing all QA pairs with metadata
    """
    # Split content into words to ensure we don't cut words in middle
    words = content.split()

    # Initialize variables for chunking
    chunks = []
    chunk_words = []
    current_size = 0
    last_chunk_end = 0

    # Create chunks with overlap
    for i, word in enumerate(words):
        chunk_words.append(word)
        current_size += len(word) + 1  # +1 for space

        # Check if we've reached chunk size
        if current_size >= chunk_size:
            # Join words to form chunk
            chunk = ' '.join(chunk_words)
            chunks.append(chunk)

            # Calculate how many words to keep for overlap
            # First, find the last 'overlap' characters worth of words
            overlap_start = max(0, len(chunk) - overlap)
            overlap_text = chunk[overlap_start:]
            overlap_words = overlap_text.split()

            # Reset for next chunk, starting with overlap words
            chunk_words = overlap_words
            current_size = sum(len(word) + 1 for word in overlap_words)
            last_chunk_end = i - len(overlap_words)

    # Add the remaining text as the last chunk if there's any
    if chunk_words:
        chunks.append(' '.join(chunk_words))

    # Process chunks and generate QA pairs
    data = []
    total_chunks = len(chunks)
    print(f"Processing {total_chunks} chunks...")

    for chunk_num, chunk in enumerate(chunks, 1):
        print(f"Processing chunk {chunk_num}/{total_chunks}")
        qa_pairs = generate_qa_pairs(chunk)
        for qa_pair in qa_pairs:
            data.append({
                'chunk_number': chunk_num,
                'question': qa_pair.get('question', ''),
                'answer': qa_pair.get('answer', ''),
                'context': qa_pair.get('context', ''),
                'chunk_start': chunk[:50] + '...',  # First 50 chars of chunk for reference
                'chunk_end': '...' + chunk[-50:]    # Last 50 chars of chunk for reference
            })

    return pd.DataFrame(data)

def generate_qa(text, chunk_size=500):
    """
    Main function to generate QA pairs from text
    Args:
        text (str): Input text content
        chunk_size (int): Size of text chunks
    Returns:
        str: Path to the saved CSV file
    """
    try:
        # Process the content
        df = process_text_chunks(text, chunk_size)

        # Save to CSV
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        output_path = f'qa_pairs_{timestamp}.csv'
        df.to_csv(output_path, index=False)
        print(f"\nQ&A pairs saved to: {output_path}")
        return output_path
    except Exception as e:
        print(f"Error: {str(e)}")
        return None

# Example usage in Colab:
if __name__ == "__main__":
    # This code won't run when imported as a module
    text = """Your sample text here. This will be processed into Q&A pairs."""
    output_file = generate_qa(text)

Overwriting qa_generator.py


In [40]:
from qa_generator import generate_qa

# Your text content
text = """Title: Fixing Heating Issues in Samsung Microwave Ovens
Samsung microwaves are designed for efficient cooking and reheating. If the microwave is not heating properly, follow these steps:
Inspect Power Supply:
Ensure the microwave is properly plugged into a functional power outlet.
Check for visible damage to the power cord.
Check Door Seal:
Ensure the microwave door is fully closed and the seals are clean and intact.
Test Cooking Mode:
Place a cup of water inside and run the microwave on high power for 1-2 minutes. If the water doesn’t heat, the issue may be with the magnetron or other internal components.
Reset Microwave:
Unplug the unit for 30 seconds, then plug it back in and restart.
Contact Samsung Service:
If none of these steps work, contact Samsung Support with the model and serial number of your microwave for further assistance."""

# Generate Q&A pairs and save to CSV
output_file = generate_qa(text)  # Uses default chunk size of 500
## OR
# output_file = generate_qa(text, chunk_size=1000)  # Specify custom chunk size

Processing 2 chunks...
Processing chunk 1/2
Processing chunk 2/2

Q&A pairs saved to: qa_pairs_20250117_200649.csv


In [41]:
import pandas as pd
qa_pairs = pd.read_csv(output_file)
qa_pairs.head(10)

Unnamed: 0,chunk_number,question,answer,context
0,1,What should you check first when troubleshooti...,Ensure the microwave is plugged into a functio...,Title: Fixing Heating Issues in Samsung Microw...
1,1,What should you inspect for potential issues r...,Make sure the microwave door is fully closed a...,Title: Fixing Heating Issues in Samsung Microw...
2,1,What is a suggested test to perform when tryin...,Place a cup of water inside and run the microw...,Title: Fixing Heating Issues in Samsung Microw...
3,2,What is one possible reason why water might no...,The issue may be with the magnetron or other i...,wer for 1-2 minutes. If the water doesn’t heat...
4,2,How long should you unplug the microwave befor...,For 30 seconds.,wer for 1-2 minutes. If the water doesn’t heat...
5,2,"Besides unplugging the microwave, what other s...",Contact Samsung Support with the model and ser...,wer for 1-2 minutes. If the water doesn’t heat...
