In [117]:
import os
from dotenv import load_dotenv
import operator
from typing import Annotated, List, TypedDict

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_community.document_loaders import PyPDFLoader
from langchain.chains.combine_documents.reduce import (
    acollapse_docs,
    split_list_of_docs,
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.constants import Send
from langgraph.graph import END, START, StateGraph

In [118]:
load_dotenv()

True

In [119]:
file_path = '/Users/luiginoto/Documents/Courses and projects/Flashcard system/biology-essay-example-pdf.pdf'
loader = PyPDFLoader(file_path)

In [120]:
documents = loader.load_and_split(text_splitter=RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=1000, chunk_overlap=0))

In [121]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4-turbo",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)

In [122]:
map_template = """Based on the provided documents, please write a summary by picking out the major CONCEPTS, TERMS, DEFINITIONS,
and ACRONYMS that are important in the documents.

Prioritize clarity and brevity while retaining the essential information.

Aim to convey any supporting details that contribute to a comprehensive understanding of each CONCEPT, TERM, DEFINITION and ACRONYM. 

Do not focus on historical context (when something was introduced or implemented). Ignore anything that looks like source code.

DOCUMENTS:
{docs}

Helpful Answer:
"""

map_prompt = ChatPromptTemplate([
    ("system", "You are a helpful assistant specialized in effectively summarizing any kind of text"),
    ("user", map_template)
])

reduce_template: str = """The following is set of definitions/concepts:
{docs}
Take these and distill it into a final, consolidated list of at least twenty (20) definitions/concepts.

For each of these, generate a question and an answer. The goal is that these tuples of questions and answers will
be used to create flashcards.

Please provide the result in a JSON format, using questions as keys and answers as values.

Helpful Answer:"
"""

reduce_prompt = ChatPromptTemplate([("human", reduce_template)])

In [123]:
map_chain = map_prompt | llm | StrOutputParser()
reduce_chain = reduce_prompt | llm | JsonOutputParser()

In [124]:
token_max = 100000

In [125]:

def length_function(documents: List[Document]) -> int:
    """Get number of tokens for input contents."""
    return sum(llm.get_num_tokens(doc.page_content) for doc in documents)


# This will be the overall state of the main graph.
# It will contain the input document contents, corresponding
# summaries, and a final summary.
class OverallState(TypedDict):
    # Notice here we use the operator.add
    # This is because we want combine all the summaries we generate
    # from individual nodes back into one list - this is essentially
    # the "reduce" part
    contents: List[str]
    summaries: Annotated[list, operator.add]
    collapsed_summaries: List[Document]
    final_summary: str


# This will be the state of the node that we will "map" all
# documents to in order to generate summaries
class SummaryState(TypedDict):
    content: str


# Here we generate a summary, given a document
async def generate_summary(state: SummaryState):
    response = await map_chain.ainvoke(state["content"])
    return {"summaries": [response]}


# Here we define the logic to map out over the documents
# We will use this an edge in the graph
def map_summaries(state: OverallState):
    # We will return a list of `Send` objects
    # Each `Send` object consists of the name of a node in the graph
    # as well as the state to send to that node
    return [
        Send("generate_summary", {"content": content}) for content in state["contents"]
    ]


def collect_summaries(state: OverallState):
    return {
        "collapsed_summaries": [Document(summary) for summary in state["summaries"]]
    }


# Add node to collapse summaries
async def collapse_summaries(state: OverallState):
    doc_lists = split_list_of_docs(
        state["collapsed_summaries"], length_function, token_max
    )
    results = []
    for doc_list in doc_lists:
        results.append(await acollapse_docs(doc_list, map_chain.ainvoke))

    return {"collapsed_summaries": results}


# This represents a conditional edge in the graph that determines
# if we should collapse the summaries or not
def should_collapse(
    state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
    num_tokens = length_function(state["collapsed_summaries"])
    if num_tokens > token_max:
        return "collapse_summaries"
    else:
        return "generate_final_summary"


# Here we will generate the final summary
async def generate_final_summary(state: OverallState):
    response = await reduce_chain.ainvoke(state["collapsed_summaries"])
    return {"final_summary": response}


# Construct the graph
# Nodes:
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary)  # same as before
graph.add_node("collect_summaries", collect_summaries)
graph.add_node("collapse_summaries", collapse_summaries)
graph.add_node("generate_final_summary", generate_final_summary)

# Edges:
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", should_collapse)
graph.add_conditional_edges("collapse_summaries", should_collapse)
graph.add_edge("generate_final_summary", END)

app = graph.compile()

In [126]:
async for step in app.astream(
    {"contents": [doc.page_content for doc in documents]},
    {"recursion_limit": 10},
):
    print(step)

{'generate_summary': {'summaries': ['**Major Concepts and Terms:**\n\n1. **Biofilm**: A complex aggregation of microorganisms growing on a solid substrate. Biofilms are characterized by cells that are embedded in a self-produced matrix of extracellular polymeric substances (EPS).\n\n2. **3-D Imaging Technique**: A method used to visualize and analyze the structure of biofilms in three dimensions, revealing intricate details such as connections between cells.\n\n3. **Bacterial Social Network**: Refers to the interconnected system within a biofilm where cells communicate and interact through various means, including physical bridges and chemical signals.\n\n4. **Fluid Bridges**: Structures identified between cells in a biofilm that facilitate the exchange of substances like proteins and hydrophobic molecules, functioning similarly to communication channels.\n\n5. **Selective Permeability**: A property of cell membranes (or structures mimicking them) that allows only specific substances t