In [None]:
import streamlit as st
import pandas as pd
import numpy as np
from faker import Faker
import random
import json

from snowflake.snowpark.context import get_active_session


session = get_active_session()

In [None]:
#Synthetically create a dataframe of customers, products, contract val, and received revenue
fake = Faker()

def generate_customer_data(num_customers=100):
    """Generates mock customer data."""

    data = []
    product_list =["Cortex Search","Cortex Analyst",
                "DocAI","Feature Store",
                "Model Registry","Cortex Agents"]
    
    for _ in range(num_customers):
        customer_name = fake.company()
        products = random.sample(product_list
,
            random.randint(1, 4),
        )
        total_contract_value = round(random.uniform(1000, 100000), 2)
        revenue_received = round(random.uniform(0, total_contract_value), 2)

        data.append(
            {
                "Customer": customer_name,
                "Total_Contract_Value": total_contract_value,
                "Revenue_Received": revenue_received,
                "Products_List": products,

            }
        )
    df = pd.DataFrame(data)

    for product in product_list:
        df[f"{product.replace(' ', '_')}_User"] = df['Products_List'].apply(lambda x: 'Y' if product in x else 'N')

    # df = df[["Customer","Products_List", "Cortex_Search_user", "Cortex_Analyst_user", 
    # "DocAI_user", "Feature_Store_user", "Model_Registry_user", "Cortex_Agents_user",
    # "Total_Contract_Value", "Revenue_Received"]]

    # df = df[["Customer","Products_List","Model_Registry_User",
    #         "Cortex_Search_User","Feature_Store_User","Cortex_Agents_User",
    #         "Total_Contract_Value","Revenue_Received"]]

    return df

# Generate and display the DataFrame
df = generate_customer_data()
df.head()
df.columns

In [None]:
df.head()

In [None]:
session.write_pandas(df, "CUSTOMER_PRODUCT_DATA", auto_create_table=True, quote_identifiers = False, overwrite=True)

In [None]:
SHOW TABLES;

In [None]:
CREATE OR REPLACE STAGE SEMANTIC DIRECTORY = ( ENABLE = true );

# We will now create the semantic model to query our new data with natural language

* To do so, first go to the AI Studio in snowsight and click the Cortex Analyst Tab

* Choose the appropriate database and schema the select the SEMANTIC stage we just created

* Click Create New

* Fill out the Description - 
    * "Semantic model containing information about customer product data including the customer name, which products they currently use, their total contract value, and the amount of revenue we have received from the customer"

* Select the CUSTOMER_PRODUCT_DATA table and select all columns


In [None]:
data = session.table("CUSTOMER_PRODUCT_DATA")
data.show(3)

In [None]:
## Start testing cortex analyst api call here
from typing import List, Dict, Optional
import _snowflake


class CortexAnalyst():
    def __init__(self, db: str, schema: str, stage: str, semantic_model_file_path: str):
        self.db = db
        self.schema = schema
        self.stage = stage
        self.semantic_model_file_path = semantic_model_file_path


    # @instrument
    def send_message(self, prompt: str) -> dict:

        """Calls the REST API and returns the response."""
        
        messages = []
        messages.append({"role": "user", "content": [{"type": "text", "text": prompt}]})
        request_body = {
            "messages": messages, #need to wrap in a list?
            "semantic_model_file": f"@{self.db}.{self.schema}.{self.stage}/{self.semantic_model_file_path}",
        }

        print(request_body)

        resp = _snowflake.send_snow_api_request(
            "POST",
            f"/api/v2/cortex/analyst/message",
            {},
            {},
            request_body,
            {},
            30000,
        )
        if resp["status"] < 400:
            return json.loads(resp["content"])
        else:
            # messages.pop()
            raise Exception(
                f"Failed request with status {resp['status']}: {resp}"
            )
    # @instrument
    def process_message(self, prompt: str) -> None:
        """Processes a message and adds the response to the chat."""
        messages=[]
        messages.append(
            {"role": "user", "content": [{"type": "text", "text": prompt}]}
        )
        with st.chat_message("user"):
            st.markdown(prompt)
        with st.chat_message("assistant"):
            with st.spinner("Generating response..."):
                # response = "who had the most rec yards week 10"
                response = self.send_message(prompt=prompt)
                request_id = response["request_id"]
                content = response["message"]["content"]
                messages.append(
                    {**response['message'], "request_id": request_id}
                )
        return self.display_content(content=content, request_id=request_id)  # type: ignore[arg-type]
        # return response
    
    # @instrument
    def display_content(self,
        content: List[Dict[str, str]],
        request_id: Optional[str] = None,
        message_index: Optional[int] = None,
    ) -> None:
        """Displays a content item for a message."""
        message_index = message_index or len(prompt)
        if request_id:
            with st.expander("Request ID", expanded=False):
                st.markdown(request_id)
        for item in content:
            if item["type"] == "text":
                st.markdown(item["text"])
            elif item["type"] == "suggestions":
                with st.expander("Suggestions", expanded=True):
                    for suggestion_index, suggestion in enumerate(item["suggestions"]):
                        if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
                            st.session_state.active_suggestion = suggestion
            elif item["type"] == "sql":
                return self.display_sql(item["statement"])

    # @instrument
    def display_sql(self, sql: str) -> None:
        with st.expander("SQL Query", expanded=False):
            st.code(sql, language="sql")
        with st.expander("Results", expanded=True):
            with st.spinner("Running SQL..."):
                session = get_active_session()
                df = session.sql(sql).to_pandas()
                if len(df.index) > 1:
                    data_tab, line_tab, bar_tab = st.tabs(
                        ["Data", "Line Chart", "Bar Chart"]
                    )
                    data_tab.dataframe(df)
                    if len(df.columns) > 1:
                        df = df.set_index(df.columns[0])
                    with line_tab:
                        st.line_chart(df)
                    with bar_tab:
                        st.bar_chart(df)
                else:
                    st.dataframe(df)

        return df

CA = CortexAnalyst(db='TR_MULTI_AGENT', schema='PUBLIC', stage='SEMANTIC', semantic_model_file_path='customer_product_data_model.yaml')

In [None]:
CA_response = CA.process_message(prompt='Show me all the products for Morgan-Love')

In [None]:
# from googlesearch import search
import requests
from bs4 import BeautifulSoup

def get_top_article_text(company_name):
    """
    Performs a Google search for a company name and returns the raw text of the
    top three articles.

    Args:
        company_name (str): The name of the company to search for.

    Returns:
        list: A list of strings, where each string is the raw text of an article,
              or an empty list if an error occurs.
    """
    try:

        article_texts = []
        for url in search_results:
            try:
                response = requests.get(url, timeout=10) #added timeout
                response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
                soup = BeautifulSoup(response.content, 'html.parser')
                text = soup.get_text(separator=' ', strip=True) #added separator and strip
                article_texts.append(text)
            except requests.exceptions.RequestException as e:
                print(f"Error fetching URL {url}: {e}")
            except Exception as e:
                print(f"Error processing URL {url}: {e}")

        return article_texts

    except Exception as e:
        print(f"Error during search: {e}")
        return []

# Example usage:
company = "Snowflake"
article_texts = get_top_article_text(company)

if article_texts:
    for i, text in enumerate(article_texts):
        print(f"Article {i+1}:\n{text}\n{'-'*40}\n")
else:
    print("Could not retrieve article text.")

In [None]:
class Multi_Agent_App:

    def agent_1_cortex_analyst(self, client: str) -> list:
        """
        takes client and inserts into predefined prompt and passes to cortex analyst to return list of products client uses today
        """
        resp = CA.process_message(f"What products are being used by our client {client}")

        products = resp.iloc[:,0].to_list()

        return products

    def agent_2_client_web_serach(self, client: str) -> list:
        """ 
        calls api to google client name and return top three news articles. 
        Potentially use bs4 or similar library to parse web results
        Return list of web page text (or potentially pass to LLM to generate summary)
        """

        api_key = os.environ.get("GOOGLE_API_KEY")
        cse_id = os.environ.get("GOOGLE_CSE_ID")

        web_query = f"top news for {client}"
        url = "https://www.googleapis.com/customsearch/v1"
        params = {
            "key": api_key,
            "cx": cse_id,
            "q": web_query,
            "num": 3
        }

        top_3_results = requests.get(url, params=params)

        news_summary_list = []

        for result in top_3_result:
            news_summaries_list.append(complete('claude-3-5-sonnet', f'summarize the following web page {result}'))

        return news_summaries_list


    def agent_3_market_web_serach(self, client: str) -> list:
        """ 
        uses an llm to determine client industry, then ues api to google client industry and return top three articles. 
        Potentially use bs4 or similar library to parse web results
        Return list of web page text (or potentially pass to LLM to generate summary)
        """
        api_key = os.environ.get("GOOGLE_API_KEY")
        cse_id = os.environ.get("GOOGLE_CSE_ID")

        client_market = complete('claude-3-5-sonnet', f'what is the most applicable market for the company {client}')

        web_query = f"market news for {client_market}"
        url = "https://www.googleapis.com/customsearch/v1"
        params = {
            "key": api_key,
            "cx": cse_id,
            "q": web_query,
            "num": 3
        }

        top_3_results = requests.get(url, params=params)

        market_news_summary_list = []

        for result in top_3_result:
            market_news_summary_list.append(complete('claude-3-5-sonnet', f'summarize the following web page {result}'))

        return market_news_summary_list

    def agent_4_prep_pitch(self, client: str) -> str:
        """ 
        passes products, client news, and market news to an LLM to come up with a rough prep pitch

        """
        products = self.agent_1_cortex_analyst(client)
        client_news = self.agent_2_client_web_serach(client)
        market_news = self.agent_3_market_web_serach(client)
        prep_pitch = complete('claude-3-5-sonnet', 
                              f"""
                              prepare a product pitch based on the following products being used and the client and market news 
                              products_used: {products}
                              client_news: {client_news}
                              market_news: {market_news}
                              """)
        return prep_pitch
    
    def agent_5_disco_questions(self, client: str) -> str:
        """ 
        passes products, client news, and market news to an LLM to come up with a list of appropriate discovery questions
        """
        products = self.agent_1_cortex_analyst(client)
        client_news = self.agent_2_client_web_serach(client)
        market_news = self.agent_3_market_web_serach(client)
        prep_pitch = complete('claude-3-5-sonnet', 
                              f"""
                              prepare a set of discovery questions for a client the following products being used and the client and market news 
                              products_used: {products}
                              client_news: {client_news}
                              market_news: {market_news}
                              """)
        return prep_pitch


    def orchestrate_all_agents(self, client: str, output_file_name: str):
        """ 
        primary function that 
        """
        products = self.agent_1_cortex_analyst(client)
        client_news = self.agent_2_client_web_serach(client)
        market_news = self.agent_3_market_web_serach(client)
        pitch = self.agent_4_prep_pitch(client)
        disco_questions = self.agent_5_disco_questions(client)

        markdown_doc = "Products \n" + ' '.join(products) + \
                       "\n\n Client News \n " + ' '.join(client_news)+ \
                       "\n\n Market News \n " + ' '.join(market_news) + \
                       "\n\n Pitch \n " + pitch + \
                       "\n\n Disco Questions \n " + disco_questions

        with open(output_file_name, "w", encoding="utf-8") as f:
            f.write(markdown_doc)

        print(f"Client prep file written to {output_file_name}!")

        return products, client_news, market_news, pitch, disco_questions