Installing libraries

In [1]:
pip install -qU langgraph langchain langchain_openai langchain_community qdrant-client PyPDF2 beautifulsoup4 requests

Note: you may need to restart the kernel to use updated packages.


Setting up imports

In [2]:
import os
import getpass
import logging
import json
import uuid
from typing import List, Dict, Any, Union, Annotated, Optional
from pathlib import Path
from tenacity import retry, stop_after_attempt, wait_exponential

from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import BaseTool, tool
from langgraph.graph import StateGraph, END
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.vectorstores import Qdrant
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from langchain.agents import Tool, create_openai_functions_agent, AgentExecutor, LLMSingleActionAgent, AgentOutputParser
from langchain.prompts import StringPromptTemplate
from langchain.chains import LLMChain
from langchain.schema import AgentAction, AgentFinish, HumanMessage
from langchain.chat_models import ChatOpenAI
import re
from typing import List, Union, Tuple

# Set up API keys
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
os.environ["TAVILY_API_KEY"] = getpass.getpass("Tavily API Key:")

Create a working directory

In [3]:
# Create a working directory
WORKING_DIRECTORY = Path("./content/data")
WORKING_DIRECTORY.mkdir(parents=True, exist_ok=True)

Initialize Qdrant Client and OpenAI Embeddings

In [4]:
# Initialize Qdrant client and OpenAI embeddings
qdrant_client = QdrantClient(":memory:")
embeddings = OpenAIEmbeddings()

ValidationError: 1 validation error for OpenAIEmbeddings
__root__
  Did not find openai_api_key, please add an environment variable `OPENAI_API_KEY` which contains it, or pass `openai_api_key` as a named parameter. (type=value_error)

State and Helper functions

In [93]:
class State(Dict[str, Any]):
    messages: Annotated[List[BaseMessage], operator.add]
    next: str
    documents: Dict[str, str]

def get_last_message(state: State) -> str:
    return state["messages"][-1].content

def join_graph(response: dict):
    return {"messages": [response["messages"][-1]]}

def prelude(state):
    written_files = []
    try:
        written_files = [f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.glob("*")]
    except:
        pass
    if not written_files:
        return {**state, "current_files": "No files written."}
    return {
        **state,
        "current_files": "\nBelow are files your team has written to the directory:\n"
        + "\n".join([f" - {f}" for f in written_files]),
    }

Setting up Vector Store

In [94]:
def initialize_vector_store():
    qdrant_client.recreate_collection(
        collection_name="sales_pitch_data",
        vectors_config=rest.VectorParams(size=1536, distance=rest.Distance.COSINE),
    )

def add_to_vector_store(text: str, metadata: Dict[str, Any]):
    vector = embeddings.embed_query(text)
    qdrant_client.upsert(
        collection_name="sales_pitch_data",
        points=[rest.PointStruct(id=uuid.uuid4().hex, vector=vector, payload=metadata)],
    )

def query_vector_store(query: str, limit: int = 5) -> List[Dict[str, Any]]:
    query_vector = embeddings.embed_query(query)
    search_result = qdrant_client.search(
        collection_name="sales_pitch_data",
        query_vector=query_vector,
        limit=limit,
    )
    return [hit.payload for hit in search_result]

Tools

In [102]:
tavily_tool = TavilySearchResults(max_results=5)

@tool
def linkedin_check(name: str, company: str) -> bool:
    """Check if a person works at a specific company on LinkedIn."""
    # Note: This is a mock implementation. In a real scenario, you'd use LinkedIn's API or web scraping.
    url = f"https://www.linkedin.com/search/results/people/?keywords={name} {company}"
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    return company.lower() in soup.text.lower()

@tool
def read_document(
    file_name: Annotated[str, "File path to read the document."],
    start: Annotated[Optional[int], "The start line. Default is 0"] = None,
    end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
    """Read the specified document."""
    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()
    if start is None:
        start = 0
    return "".join(lines[start:end])

@tool
def write_document(
    content: Annotated[str, "Text content to be written into the document."],
    state: Annotated[dict, "The current state of the conversation."],
) -> Annotated[str, "Path of the saved document file or error message."]:
    """Create and save a text document."""
    file_name = state.get("output_file", "output.txt")
    file_path = WORKING_DIRECTORY / file_name
    logging.info(f"Attempting to write document: {file_path}")
    try:
        logging.debug(f"Content to write: {content[:100]}...")  # Log first 100 chars
        
        # Ensure the directory exists
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        
        with file_path.open("w") as file:
            file.write(content)
        logging.info(f"Successfully wrote content to {file_path}")
        return f"Document saved to {file_path}"
    except IOError as e:
        error_msg = f"IOError writing to {file_path}: {e}"
        logging.error(error_msg)
        return error_msg
    except Exception as e:
        error_msg = f"Unexpected error writing to {file_path}: {e}"
        logging.error(error_msg)
        return error_msg

@tool
def upload_document(file_path: str) -> str:
    """Upload and read a document (PDF or text)."""
    file_extension = Path(file_path).suffix.lower()
    if file_extension == '.pdf':
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            text = ""
            for page in pdf_reader.pages:
                text += page.extract_text()
    elif file_extension in ['.txt', '.md']:
        with open(file_path, 'r') as file:
            text = file.read()
    else:
        raise ValueError("Unsupported file type. Please upload a PDF or text file.")
    
    # Save the extracted text to a file in the working directory
    output_file = WORKING_DIRECTORY / f"uploaded_document_{uuid.uuid4()}.txt"
    with output_file.open("w") as file:
        file.write(text)
    
    return f"Document uploaded and saved as {output_file.name}"

@tool
def query_vector_db(query: str) -> str:
    """Query the vector database for relevant information."""
    results = query_vector_store(query)
    return "\n".join([f"{item['type']}: {item['content']}" for item in results])

In [None]:
def create_agent(name, tools, prompt):
    # Define which tools the agent can use
    agent_tools = tools

    # Set up the base template
    template = prompt + """

    Human: {human_input}

    Assistant: Certainly! I'll use the available tools to gather information and complete the task. Let's get started.

    {agent_scratchpad}"""

    # Set up a prompt template
    class CustomPromptTemplate(StringPromptTemplate):
        template: str
        tools: List[Tool]

        def format(self, **kwargs) -> str:
            intermediate_steps = kwargs.pop("intermediate_steps")
            thoughts = ""
            for action, observation in intermediate_steps:
                thoughts += action.log
                thoughts += f"\nObservation: {observation}\nThought: "
            kwargs["agent_scratchpad"] = thoughts
            kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
            return self.template.format(**kwargs)

    prompt_template = CustomPromptTemplate(
        template=template,
        tools=agent_tools,
        input_variables=["human_input", "intermediate_steps"]
    )

    # Define how the agent should output its actions
    class CustomOutputParser(AgentOutputParser):
        def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
            if "Final Answer:" in llm_output:
                return AgentFinish(
                    return_values={"output": llm_output.split("Final Answer:")[-1].strip()},
                    log=llm_output,
                )
            regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
            match = re.search(regex, llm_output, re.DOTALL)
            if not match:
                raise ValueError(f"Could not parse LLM output: `{llm_output}`")
            action = match.group(1).strip()
            action_input = match.group(2)
            return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)

    output_parser = CustomOutputParser()

    # Define the LLM to use
    llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0613")

    # Create the LLM chain
    llm_chain = LLMChain(llm=llm, prompt=prompt_template)

    # Define the agent
    agent = LLMSingleActionAgent(
        llm_chain=llm_chain,
        output_parser=output_parser,
        stop=["\nObservation:"],
        allowed_tools=[tool.name for tool in agent_tools]
    )

    # Create the agent executor
    agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=agent_tools, verbose=True)

    return agent_executor

Agent Creation

In [103]:
def create_agent_node(name, tools, prompt):
    def wrapper(state):
        try:
            agent = create_agent(name, tools, prompt)
            agent_output = agent.run(state["messages"][-1].content)
            
            next_step_index = node_order.index(name) + 1
            next_step = node_order[next_step_index] if next_step_index < len(node_order) else "END"
            
            # Write the output to a file
            output_file = f"{name}.txt"
            with open(output_file, 'w') as f:
                f.write(agent_output)
            
            return {
                "messages": state["messages"] + [HumanMessage(content=agent_output)],
                "next": next_step,
                "documents": {**state.get("documents", {}), name: output_file},
                "last_output_file": output_file
            }
        except Exception as e:
            logging.error(f"Error in {name}: {str(e)}")
            return {
                "messages": state["messages"] + [HumanMessage(content=f"Error in {name}: {str(e)}")],
                "next": "END",
                "documents": {**state.get("documents", {}), name: f"Error: {str(e)}"},
                "last_output_file": None
            }
    
    return wrapper

ComplanyResearchAlpha Function Definition

In [104]:
# Define the tools that CompanyResearchAlpha will use
company_research_tools = [
    Tool(
        name="Tavily Search",
        func=tavily_tool.run,
        description="Useful for searching the internet for up-to-date information on companies and industries."
    ),
    Tool(
        name="Read Document",
        func=read_document,
        description="Useful for reading the content of a document."
    ),
    Tool(
        name="Write Document",
        func=write_document,
        description="Useful for writing content to a document."
    ),
    Tool(
        name="Upload Document",
        func=upload_document,
        description="Useful for uploading a document to the vector database."
    ),
    Tool(
        name="Query Vector DB",
        func=query_vector_db,
        description="Useful for querying the vector database for relevant information."
    )
]

# Create the CompanyResearchAlpha node
company_research_alpha = create_agent_node(
    "CompanyResearchAlpha",
    company_research_tools,
    """You are a company research specialist. Your task is to gather initial information about the target company.
    Use the available tools to research the company and compile a brief summary including:
    1. Company name and basic description
    2. Industry
    3. Key products or services
    4. Recent news or developments

    After gathering this information, use the write_document tool to save your findings in a file named 'CompanyResearchAlpha.txt'.
    Your response should include confirmation that the file was saved."""
)

# Add this to your node_functions dictionary
node_functions["CompanyResearchAlpha"] = company_research_alpha

Defining Agent Nodes

In [105]:
# Define agent nodes
company_research_alpha = create_agent_node(
    "CompanyResearchAlpha",
    [tavily_tool, read_document, write_document, upload_document, query_vector_db],
    "You are a company research specialist. Your task is to gather initial information about a target company. "
    "Ask the user for the company name or to upload a document with company info. "
    "Create initial documents for company name, description, execs, priorities, and industry priorities."
)

# Company Description Team
company_description_research = create_agent_node(
    "CompanyDescriptionResearch",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are a company description researcher. Your task is to find and compile a comprehensive description of the company."
)

company_description_writing = create_agent_node(
    "CompanyDescriptionWriting",
    [read_document, write_document],
    "You are a professional writer specializing in company descriptions. Your task is to take the research and craft a well-written company description."
)

company_description_fact_checker = create_agent_node(
    "CompanyDescriptionFactChecker",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are a fact-checker. Your task is to verify the accuracy of the company description."
)

company_description_copy_editor = create_agent_node(
    "CompanyDescriptionCopyEditor",
    [read_document, write_document],
    """You are a copy editor. Your task is to ensure the company description has correct grammar, spelling, and tone.
    Read the existing company description, make necessary edits, and save the improved version.
    If you encounter any issues with the input, please provide a clear explanation and save it in the output file."""
)

# Company Execs Team
company_execs_research = create_agent_node(
    "CompanyExecsResearch",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are an executive research specialist. Your task is to find the top 5 C-level executives of the company."
)

company_execs_linkedin_check = create_agent_node(
    "CompanyExecsLinkedInCheck",
    [linkedin_check, read_document, write_document],
    "You are a LinkedIn verification specialist. Your task is to verify if the executives are currently working at the company."
)

company_execs_editor = create_agent_node(
    "CompanyExecsEditor",
    [read_document, write_document],
    "You are an editor. Your task is to finalize the list of company executives, excluding any that are not currently with the company."
)

# Company Priorities Team
company_priorities_research = create_agent_node(
    "CompanyPrioritiesResearch",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are a company priorities researcher. Your task is to identify the top 3 priorities of the company."
)

company_priorities_fact_checker = create_agent_node(
    "CompanyPrioritiesFactChecker",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are a fact-checker. Your task is to verify the accuracy of the identified company priorities."
)

company_priorities_copy_editor = create_agent_node(
    "CompanyPrioritiesCopyEditor",
    [read_document, write_document],
    "You are a copy editor. Your task is to ensure the company priorities are clearly and correctly stated."
)

# Industry Trends Team
industry_trends_research = create_agent_node(
    "IndustryTrendsResearch",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are an industry trends researcher. Your task is to identify the industry of the company and its top trends and challenges."
)

industry_trends_fact_checker = create_agent_node(
    "IndustryTrendsFactChecker",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are a fact-checker. Your task is to verify the accuracy of the identified industry trends and challenges."
)

industry_trends_copy_editor = create_agent_node(
    "IndustryTrendsCopyEditor",
    [read_document, write_document],
    "You are a copy editor. Your task is to ensure the industry trends and challenges are clearly and correctly stated."
)

# Sales Pitch Team
value_proposition_research = create_agent_node(
    "ValuePropositionResearch",
    [tavily_tool, read_document, write_document, upload_document, query_vector_db],
    "You are a value proposition researcher. Your task is to research and outline the value proposition of the user's company."
)

value_proposition_fact_checker = create_agent_node(
    "ValuePropositionFactChecker",
    [tavily_tool, read_document, write_document, query_vector_db],
    "You are a fact-checker. Your task is to verify the accuracy of the value proposition."
)

value_proposition_copy_editor = create_agent_node(
    "ValuePropositionCopyEditor",
    [read_document, write_document],
    "You are a copy editor. Your task is to ensure the value proposition is clearly and correctly stated."
)

sales_pitch_creator = create_agent_node(
    "SalesPitchCreator",
    [read_document, write_document, query_vector_db],
    """You are a sales pitch creator. Your task is to create a compelling sales pitch based on the target company's information and the user's company value proposition.
    After creating the pitch, you MUST use the write_document tool to save it as 'final_sales_pitch.txt'.
    Your response should include the content of the sales pitch and confirmation that it was saved."""
)

sales_pitch_fact_checker = create_agent_node(
    "SalesPitchFactChecker",
    [read_document, write_document, query_vector_db],
    "You are a fact-checker. Your task is to verify the accuracy of the sales pitch."
)

sales_pitch_copy_editor = create_agent_node(
    "SalesPitchCopyEditor",
    [read_document, write_document],
    "You are a copy editor. Your task is to ensure the sales pitch has correct grammar, spelling, and tone."
)

Graph Creation

In [106]:
from typing import Dict, List, TypedDict
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage

# Define the state
class State(TypedDict):
    messages: List[HumanMessage]
    next: str

# Create the graph
graph = StateGraph(State)

# Define a dictionary mapping node names to their corresponding functions
node_functions = {
    "CompanyResearchAlpha": company_research_alpha,
    "CompanyDescriptionResearch": company_description_research,
    "CompanyDescriptionWriting": company_description_writing,
    "CompanyDescriptionFactChecker": company_description_fact_checker,
    "CompanyDescriptionCopyEditor": company_description_copy_editor,
    "CompanyExecsResearch": company_execs_research,
    "CompanyExecsLinkedInCheck": company_execs_linkedin_check,
    "CompanyExecsEditor": company_execs_editor,
    "CompanyPrioritiesResearch": company_priorities_research,
    "CompanyPrioritiesFactChecker": company_priorities_fact_checker,
    "CompanyPrioritiesCopyEditor": company_priorities_copy_editor,
    "IndustryTrendsResearch": industry_trends_research,
    "IndustryTrendsFactChecker": industry_trends_fact_checker,
    "IndustryTrendsCopyEditor": industry_trends_copy_editor,
    "ValuePropositionResearch": value_proposition_research,
    "ValuePropositionFactChecker": value_proposition_fact_checker,
    "ValuePropositionCopyEditor": value_proposition_copy_editor,
    "SalesPitchCreator": sales_pitch_creator,
    "SalesPitchFactChecker": sales_pitch_fact_checker,
    "SalesPitchCopyEditor": sales_pitch_copy_editor
}

# Define the node order
node_order = [
    "CompanyResearchAlpha",
    "CompanyDescriptionResearch",
    "CompanyDescriptionWriting",
    "CompanyDescriptionFactChecker",
    "CompanyDescriptionCopyEditor",
    "CompanyExecsResearch",
    "CompanyExecsLinkedInCheck",
    "CompanyExecsEditor",
    "CompanyPrioritiesResearch",
    "CompanyPrioritiesFactChecker",
    "CompanyPrioritiesCopyEditor",
    "IndustryTrendsResearch",
    "IndustryTrendsFactChecker",
    "IndustryTrendsCopyEditor",
    "ValuePropositionResearch",
    "ValuePropositionFactChecker",
    "ValuePropositionCopyEditor",
    "SalesPitchCreator",
    "SalesPitchFactChecker",
    "SalesPitchCopyEditor"
]

# Add nodes to the graph
for node in node_order:
    graph.add_node(node, node_functions[node])

# Add edges between nodes
for i in range(len(node_order) - 1):
    current_node = node_order[i]
    next_node = node_order[i + 1]
    graph.add_edge(current_node, next_node)

# Add conditional edges for each node
for i, node in enumerate(node_order):
    next_nodes = {next_node: next_node for next_node in node_order[i+1:]}
    next_nodes["END"] = END
    
    graph.add_conditional_edges(
        node,
        lambda x: x["next"],
        next_nodes
    )

# Set the entry point
graph.set_entry_point("CompanyResearchAlpha")

# Compile the graph
workflow = graph.compile()

Sales Pitch Generator Function

In [107]:
def run_sales_pitch_generator(target_company: str, user_company_url: str, debug: bool = False):
    if debug:
        logging.getLogger().setLevel(logging.DEBUG)
    
    initial_message = f"Generate a sales pitch for {target_company}. Our company website is {user_company_url}."
    
    state = {"messages": [HumanMessage(content=initial_message)], "next": "CompanyResearchAlpha", "documents": {}}
    
    for step, node_name in enumerate(node_order):
        logging.info(f"Executing step {step + 1}/{len(node_order)}: {node_name}")
        
        if node_name not in node_functions:
            logging.error(f"Node '{node_name}' not found in node_functions dictionary")
            continue
        
        node_function = node_functions[node_name]
        try:
            new_state = node_function(state)
            if new_state is None or not isinstance(new_state, dict) or 'next' not in new_state:
                raise ValueError(f"Invalid state returned by {node_name}")
            logging.debug(f"State after {node_name}:")
            logging.debug(json.dumps(new_state, default=str, indent=2))
            state = new_state
        except Exception as e:
            logging.error(f"Error in {node_name}: {str(e)}")
            state = {
                "messages": state["messages"] + [AIMessage(content=f"Error in {node_name}: {str(e)}")],
                "next": "END",
                "documents": {**state.get("documents", {}), node_name: f"Error: {str(e)}"},
                "last_output_file": None
            }
        
        if state['next'] == "END":
            logging.warning(f"Process ended early at step {step + 1} ({node_name})")
            break

    # After the chain completes, compile all the information into a final sales pitch
    final_pitch = compile_final_sales_pitch(state['documents'], target_company)
    
    # Save the final pitch
    final_pitch_path = WORKING_DIRECTORY / "final_sales_pitch.txt"
    try:
        with final_pitch_path.open("w") as file:
            file.write(final_pitch)
        logging.info("Final sales pitch saved successfully.")
    except IOError as e:
        logging.error(f"Error saving final sales pitch: {e}")
    
    return final_pitch

Main Chain and Execution

In [108]:
if __name__ == "__main__":
    try:
        logging.info("Initializing vector store...")
        initialize_vector_store()
        
        target_company = input("Enter the name of the company you want to prospect: ").strip()
        user_company_url = input("Enter your company's URL so we can understand your value proposition: ").strip()
        
        logging.info(f"Generating sales pitch for {target_company}...")
        final_pitch = run_sales_pitch_generator(target_company, user_company_url)
        
        logging.info("\nFinal Sales Pitch:")
        print(final_pitch)
        
        # Print contents of all generated files
        for node_name in node_order:
            file_name = f"{node_name}.txt"
            file_path = WORKING_DIRECTORY / file_name
            if file_path.exists():
                try:
                    with file_path.open("r") as file:
                        content = file.read().strip()
                        logging.info(f"\nContents of {file_name}:\n{content[:500]}...\n---")
                except IOError as e:
                    logging.error(f"Error reading {file_name}: {e}")
            else:
                logging.warning(f"File {file_name} was not created during the process")
        
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
        logging.exception("Detailed traceback:")
    
    finally:
        logging.info("Cleaning up temporary files...")
        for node_name in node_order:
            file_name = f"{node_name}.txt"
            file_path = WORKING_DIRECTORY / file_name
            try:
                file_path.unlink()
                logging.info(f"Deleted {file_name}")
            except FileNotFoundError:
                logging.warning(f"File {file_name} not found during cleanup")
        
        logging.info("Process completed.")

2024-09-03 16:12:17,704 - INFO - Initializing vector store...
  qdrant_client.recreate_collection(
2024-09-03 16:12:23,600 - INFO - Generating sales pitch for Amazon...
2024-09-03 16:12:23,601 - INFO - Executing step 1/20: CompanyResearchAlpha
2024-09-03 16:12:23,601 - ERROR - Error in CompanyResearchAlpha: name 'create_agent' is not defined
2024-09-03 16:12:23,601 - DEBUG - State after CompanyResearchAlpha:
2024-09-03 16:12:23,602 - DEBUG - {
  "messages": [
    "content='Generate a sales pitch for Amazon. Our company website is www.snowflake.com.'",
    "content=\"Error in CompanyResearchAlpha: name 'create_agent' is not defined\""
  ],
  "next": "END",
  "documents": {
    "CompanyResearchAlpha": "Error: name 'create_agent' is not defined"
  },
  "last_output_file": null
}
2024-09-03 16:12:23,603 - INFO - Final sales pitch saved successfully.
2024-09-03 16:12:23,603 - INFO - 
Final Sales Pitch:
2024-09-03 16:12:23,611 - INFO - Cleaning up temporary files...
2024-09-03 16:12:23,618 -

Final Sales Pitch for Amazon

Company Overview:
Error: name 'create_agent' is not defined

Key Executives: Information not available.

Company Priorities: Information not available.

Value Proposition: Information not available.

Final Pitch: Information not available.


