<a href="https://colab.research.google.com/github/vermakiran/4kai/blob/main/RAG_Presentation_Generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

!sudo apt install pandoc
!pip install pypandoc
!pip install torch
!pip install langchain
!pip install langchain-community
!pip install langchain-huggingface
!pip install langchain-vectorstores
!pip install chromadb
!pip install sentence-transformers
!pip install transformers
!pip install pypdf

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libcmark-gfm-extensions0.29.0.gfm.3 libcmark-gfm0.29.0.gfm.3 pandoc-data
Suggested packages:
  texlive-latex-recommended texlive-xetex texlive-luatex pandoc-citeproc
  texlive-latex-extra context wkhtmltopdf librsvg2-bin groff ghc nodejs php
  python ruby libjs-mathjax libjs-katex citation-style-language-styles
The following NEW packages will be installed:
  libcmark-gfm-extensions0.29.0.gfm.3 libcmark-gfm0.29.0.gfm.3 pandoc
  pandoc-data
0 upgraded, 4 newly installed, 0 to remove and 35 not upgraded.
Need to get 20.6 MB of archives.
After this operation, 156 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 libcmark-gfm0.29.0.gfm.3 amd64 0.29.0.gfm.3-3 [115 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 libcmark-gfm-extensions0.29.0.gfm.3 amd64 0.29.0.gfm.3-3 [25.1 kB

In [None]:
!pip install colab-xterm #https://pypi.org/project/colab-xterm/
%load_ext colabxterm

In [None]:
import pypandoc
import os
import logging
import torch
from tqdm import tqdm, trange
from typing import List, Dict, Any
from langchain_community.llms import Ollama
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain_core.runnables import RunnableSequence, RunnableLambda
from langchain.schema import BaseOutputParser
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel

In [None]:
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.disabled = True
#logger.disabled = False

class PresentationGenerator:
    def __init__(self, pdf_path: str, output_file: str, ollama_model: str = "gemma2:9b", topic: str = "general", output_format: list[str] = ["pptx"]):
        self.pdf_path = pdf_path
        self.output_file = f"{output_file}.md"
        self.output_format = output_format
        self.persist_directory = f"./chromadb_{topic}"
        self.collection_name = f"slidev_content_{topic}"
        self.topic = topic
        self.llm = Ollama(model=ollama_model)

        # Load the model and tokenizer with AutoModel
        model = AutoModel.from_pretrained(
            'intfloat/multilingual-e5-large',
            torch_dtype=torch.float16,
            device_map="auto"  # Use "auto" to assign to GPU if available
        )

        encode_kwargs = {'normalize_embeddings': True}
        self.embeddings = HuggingFaceEmbeddings(
            client=model,  # Pass the loaded model to HuggingFaceEmbeddings
            encode_kwargs=encode_kwargs
        )

        self.vectordb = None
        self.initialize_chains()

    def load_and_process_pdf(self) -> List:
        loader = PyPDFLoader(file_path=self.pdf_path)
        documents = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=200)
        return text_splitter.split_documents(documents)

    def create_or_load_vectordb(self):
        # Check if a database already exists for the topic
        if os.path.exists(self.persist_directory):
            logger.info(f"Loading existing database for the topic: {self.topic}")
            self.vectordb = Chroma(
                persist_directory=self.persist_directory,
                embedding_function=self.embeddings,
                collection_name=self.collection_name
            )
            # Check if the PDF has already been indexed
            if self.vectordb.get(where={"source": self.pdf_path}):
                logger.info(f"The PDF '{self.pdf_path}' already exists in the database for the topic '{self.topic}'.")
                return

        # If not, or if the PDF is not indexed, create/update the database
        logger.info(f"Creating/updating database for the topic: {self.topic}")
        texts = self.load_and_process_pdf()
        logger.info(f"Processed texts: {len(texts)} fragments")

        # Add the PDF path as metadata for future checks
        for text in texts:
            text.metadata["source"] = self.pdf_path

        self.vectordb = Chroma.from_documents(
            documents=texts,
            embedding=self.embeddings,
            persist_directory=self.persist_directory,
            collection_name=self.collection_name
        )

    def initialize_chains(self):
        self.outline_prompt = PromptTemplate(
            input_variables=["topics", "tone"],
            template="""
                      Generate a presentation outline based on the following topics: {topics}
                      The overall tone of the presentation should be: {tone}
                      For each topic, provide a main title and create multiple slides as needed. Each slide should have a subtitle and a brief description that captures the essence of the content. Avoid overcrowding slides with too much text.
                      The outline should have a logical sequence and coherent argumentation, maintaining thematic consistency across slides within each topic.
                      Output format:

                      1 [Title 1]
                      Description: [Brief description of the slide content]
                      2 [Title 2]
                      Description: [Brief description of the slide content]
                      ...

                      Continue this format for all topics, ensuring a sequential and thematically coherent presentation structure.
            """
        )

        self.slide_prompt = PromptTemplate(
            input_variables=["title", "description", "context", "tone"],
            template="""
            Generate a slide in Markdown format for a presentation with the following title and description:
            Title: {title}
            Description: {description}

            Additional context: {context}
            General tone of the presentation: {tone}

            The slide should be consistent with the provided title and description, and maintain the overall tone of the presentation.

            Slide format:
            ## {title}

            [Slide content in Markdown]
            """
        )

        self.evaluation_prompt = PromptTemplate(
            input_variables=["slide_content", "goal"],
            template="""
            Evaluate whether the following slide meets the established goal:
            Slide:
            {slide_content}

            Goal: {goal}

            Does the slide meet the goal? Answer with 'Yes' or 'No' and provide a brief justification.
            """
        )

        self.final_evaluation_prompt = PromptTemplate(
            input_variables=["slide_content", "title", "description"],
            template="""
            Evaluate whether the following slide is significant given the provided title and description, avoiding slides with content like: "I propose a draft" or "Here is a possible version of the slide", "etcetera"
            Title: {title}
            Description: {description}
            Slide:
            {slide_content}

            Is the slide significant? Answer with 'Yes' or 'No' and provide a brief justification.
            """
        )

        self.outline_chain = self.outline_prompt | self.llm | RunnableLambda(lambda x: {"output": x})
        self.slide_chain = self.slide_prompt | self.llm
        self.evaluation_chain = self.evaluation_prompt | self.llm | YesNoOutputParser() | RunnableLambda(lambda x: {"output": x})
        self.final_evaluation_chain = self.final_evaluation_prompt | self.llm | YesNoOutputParser() | RunnableLambda(lambda x: {"output": x})

    def generate_presentation_outline(self, topics: List[str], tone: str) -> List[Dict[str, str]]:
        outline_response = self.outline_chain.invoke({"topics": ", ".join(topics), "tone": tone})
        logger.info(f"Outline generated (response): {outline_response}")

        outline_text = outline_response['output']
        parsed_outline = self.parse_outline(outline_text)

        logger.info(f"Outline generated (parsed): {parsed_outline}")
        return parsed_outline

    def parse_outline(self, text: str) -> List[Dict[str, str]]:
        lines = text.strip().split("\n")
        outline = []
        current_item = {}

        for line in lines:
            if line.strip().startswith("**") and line.strip().endswith("**"):
                if current_item:
                    outline.append(current_item)
                current_item = {"title": line.strip("** ").strip(), "description": ""}
            elif line.strip().startswith("Description:"):
                if current_item:
                    current_item["description"] = line.split(":", 1)[1].strip()

        if current_item:
            outline.append(current_item)

        logger.info(f"Outline parsed: {outline}")
        return outline

    def generate_slide(self, title: str, description: str, tone: str) -> str:
        context = self.vectordb.similarity_search(title, k=3)
        context_text = "\n".join([doc.page_content for doc in context])
        logger.info(f"Context for {title}: {context_text}")

        for _ in range(5):  # Maximum 5 attempts
            slide_content = self.slide_chain.invoke({
                "title": title,
                "description": description,
                "context": context_text,
                "tone": tone
            })
            logger.info(f"Slide content generated for {title}: {slide_content}")
            evaluation_result = self.evaluation_chain.invoke({"slide_content": slide_content, "goal": description})



            evaluation = evaluation_result['output']['evaluation']
            justification = evaluation_result['output']['justification']

            if evaluation:
                return slide_content

            context_text += f"\nSuggested improvement: {justification}"
            logger.info(f"Retrying generation for {title} with improvement: {justification}")

        return slide_content

    def final_evaluation(self, slides: List[Dict[str, str]]) -> List[str]:
        significant_slides = []

        for slide in slides:
            slide_content = slide["slide_content"]
            title = slide["title"]
            description = slide["description"]
            evaluation_result = self.final_evaluation_chain.invoke({"slide_content": slide_content, "title": title, "description": description})

            if evaluation_result['output']['evaluation']:
                significant_slides.append(slide_content)
            else:
                logger.warning(f"Slide removed: {title} - {evaluation_result['output']['justification']}")

        return significant_slides

    def generate_full_presentation(self, topics: List[str], tone: str) -> None:
        self.create_or_load_vectordb()
        outline = self.generate_presentation_outline(topics, tone)
        all_slides = []

        for slide in outline:
            if 'title' in slide and 'description' in slide:
                slide_content = self.generate_slide(slide['title'], slide['description'], tone)
                all_slides.append({
                    "title": slide['title'],
                    "description": slide['description'],
                    "slide_content": slide_content
                })
            else:
                logger.warning(f"Incomplete slide found: {slide}")

        if not all_slides:
            logger.error("No slides were generated.")
            return

        significant_slides = self.final_evaluation(all_slides)

        with open(self.output_file, "w") as f:
            f.write("\n\n---\n\n".join(significant_slides))

        logger.info(f"Presentation generated and saved in {self.output_file}")

        for fmt in self.output_format:
            output_document = f"{self.output_file}.{fmt}"  # Create the output file name based on the format
            output = pypandoc.convert_file(
                self.output_file,
                fmt,
                outputfile=output_document,
                extra_args=['-V', 'mainfont="Arial"', '-V', 'fontsize="24"', '-V', 'fontcolor="FF0000"']
            )

        assert output == ""
        logger.info("The presentation has been successfully created in PPTX format.")

class OutlineParser(BaseOutputParser):
    def parse(self, text: str) -> List[Dict[str, str]]:
        if isinstance(text, list):
            return text

        lines = text.strip().split("\n")
        outline = []
        current_item = None

        for line in lines:
            line = line.strip()
            if line.startswith("**") and line.endswith("**"):
                # Detected a new title
                if current_item:
                    outline.append(current_item)
                title_text = line.strip("** ").strip()
                current_item = {"title": title_text, "description": ""}
            elif "Description:" in line:
                if current_item:
                    # Append description to the current item
                    description_text = line.split("Description:", 1)[1].strip()
                    current_item["description"] += description_text
            elif current_item and line.startswith("*"):
                # Handling indented descriptions or additional lines
                current_item["description"] += " " + line.strip("*").strip()

        # Append the last item if it exists
        if current_item:
            outline.append(current_item)

        return outline

    @property
    def _type(self) -> str:
        return "outline"


class YesNoOutputParser(BaseOutputParser):
    def parse(self, text: str) -> Dict:
        text = text.strip().lower()
        if "yes" in text:
            return {"evaluation": True, "justification": text.split("yes", 1)[1].strip()}
        elif "no" in text:
            return {"evaluation": False, "justification": text.split("no", 1)[1].strip()}
        else:
            return {"evaluation": False, "justification": "no justification"}

    @property
    def _type(self) -> str:
        return "yes_no"

In [None]:
import os
import requests

# Define the directory paths
base_dir = 'references'
documents_dir = 'documents'

# Create the directories if they don't exist
os.makedirs(base_dir, exist_ok=True)
os.makedirs(documents_dir, exist_ok=True)

# Define the URL for the PDF and the local file path
pdf_url = "https://assets.openstax.org/oscms-prodcms/media/documents/Principles_Marketing-WEB.pdf"
pdf_path = os.path.join(base_dir, "Principles_Marketing-WEB.pdf")

# Download the PDF if it doesn't already exist
if not os.path.exists(pdf_path):
    print(f"Downloading {pdf_path}...")
    response = requests.get(pdf_url, headers={"User-Agent": "Mozilla/5.0"})
    if response.status_code == 200:
        with open(pdf_path, 'wb') as f:
            f.write(response.content)
        print(f"Downloaded {pdf_path} successfully.")
    else:
        print(f"Failed to download {pdf_path}. Status code: {response.status_code}")
else:
    print(f"{pdf_path} already exists.")


In [None]:
%xterm

In [None]:
#curl -fsSL https://ollama.com/install.sh | sh
#ollama serve &
#ollama pull gemma2:9b

In [None]:
import time

# Start the timer
start_time = time.time()

# Use the presentation generator
pdf_path = "references/Principles_Marketing-WEB.pdf"
output_file = "documents/Introduction_MKT_007"
ollama_model = "gemma2:9b"
topics = ["What is Marketing?",
          "The Marketing Process",
          "The Marketing Mix and the 4 Ps",
          "Strategic Planning in Marketing",
          "Developing a Strategic Marketing Plan", "Conclusions"]
tone = "Educational and clear, focused on explaining concepts in a way that is accessible to beginners."
topic = "marketing"
output_format = ["pptx", "docx"]

generator = PresentationGenerator(pdf_path, output_file, ollama_model, topic, output_format)
generator.create_or_load_vectordb()
generator.generate_full_presentation(topics, tone)

# Stop the timer
end_time = time.time()

# Calculate the elapsed time
elapsed_time = end_time - start_time

print(f"The process took {elapsed_time:.4f} seconds to execute.")