### Multi-agent Marketing Content Creation using Semantic Kernel

- Make sure you have populated the `.env` file.
- You need to create a Google API key and search engine ID for Google Search. Follow this link: [Google Custom Search API Introduction](https://developers.google.com/custom-search/v1/introduction).
- Feel free to change the agent instructions under `utils/agent_instructions.py` if you want to create content for different topics.



In [31]:
import os
import asyncio
import json
import re
import textwrap
from dotenv import load_dotenv
from semantic_kernel import Kernel
from semantic_kernel.agents import ChatCompletionAgent, AgentGroupChat
from semantic_kernel.agents.strategies import KernelFunctionTerminationStrategy, KernelFunctionSelectionStrategy
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.functions import KernelFunctionFromPrompt, KernelArguments
from pydantic import BaseModel, Field
from typing import List
from IPython.display import display, Markdown



load_dotenv()

True

In [32]:

# import logging
# logging.basicConfig(level=logging.INFO)

### Observability

The following cell demonstrates observability for Semantic Kernel. There are different choices available, and you can follow the link [Semantic Kernel Observability](https://learn.microsoft.com/en-us/semantic-kernel/concepts/enterprise-readiness/observability/?pivots=programming-language-python) to learn about various options. 

In this notebook, we are showcasing observability using the Aspire Dashboard. To set up the Aspire Dashboard, follow this link: [Start the Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone?tabs=bash#start-the-dashboard).

In [33]:
observability = False
if observability :
    import asyncio
import logging

from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import set_tracer_provider




# Endpoint to the Aspire Dashboard
endpoint = "http://localhost:4317"

# Create a resource to represent the service/sample
resource = Resource.create({ResourceAttributes.SERVICE_NAME: "telemetry-aspire-dashboard-quickstart"})


def set_up_logging():
    exporter = OTLPLogExporter(endpoint=endpoint)

    # Create and set a global logger provider for the application.
    logger_provider = LoggerProvider(resource=resource)
    # Log processors are initialized with an exporter which is responsible
    # for sending the telemetry data to a particular backend.
    logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
    # Sets the global default logger provider
    set_logger_provider(logger_provider)

    # Create a logging handler to write logging records, in OTLP format, to the exporter.
    handler = LoggingHandler()
    # Add filters to the handler to only process records from semantic_kernel.
    handler.addFilter(logging.Filter("semantic_kernel"))
    # Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
    # Events from all child loggers will be processed by this handler.
    logger = logging.getLogger()
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)


def set_up_tracing():
    exporter = OTLPSpanExporter(endpoint=endpoint)

    # Initialize a trace provider for the application. This is a factory for creating tracers.
    tracer_provider = TracerProvider(resource=resource)
    # Span processors are initialized with an exporter which is responsible
    # for sending the telemetry data to a particular backend.
    tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
    # Sets the global default tracer provider
    set_tracer_provider(tracer_provider)


def set_up_metrics():
    exporter = OTLPMetricExporter(endpoint=endpoint)

    # Initialize a metric provider for the application. This is a factory for creating meters.
    meter_provider = MeterProvider(
        metric_readers=[PeriodicExportingMetricReader(exporter, export_interval_millis=5000)],
        resource=resource,
        views=[
            # Dropping all instrument names except for those starting with "semantic_kernel"
            View(instrument_name="*", aggregation=DropAggregation()),
            View(instrument_name="semantic_kernel*"),
        ],
    )
    # Sets the global default meter provider
    set_meter_provider(meter_provider)


# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()



  

In [34]:
class SocialMediaPost(BaseModel):
    platform: str = Field(..., description="The social media platform where the post will be published (e.g., Twitter, LinkedIn).")
    content: str = Field(..., description="The content of the social media post, including any hashtags or mentions.")

class Article(BaseModel):
    title: str
    content: str
    call_to_action: str

class ContentOutput(BaseModel):
    article: Article
    social_media_posts: List[SocialMediaPost]

In [35]:
service_id: str = "default_service_id"
def create_kernel(service_id= service_id) -> Kernel:
    kernel = Kernel()
    kernel.add_service(AzureChatCompletion(service_id=service_id))


    return kernel

kernel = create_kernel()

In [36]:
from utils.agent_instructions import (
    MARKET_NEWS_MONITOR_AGENT_INSTRUCTIONS,
    DATA_ANALYST_AGENT_INSTRUCTIONS,
    CONTENT_CREATOR_AGENT_INSTRUCTIONS,
    QUALITY_ASSURANCE_AGENT_INSTRUCTIONS,
)

def format_instruction(instruction_template: str, subject: str) -> str:
    return instruction_template.format(subject=subject)

subject = "Inflation in the US and the impact on the stock market in 2024"

market_news_instr = format_instruction(MARKET_NEWS_MONITOR_AGENT_INSTRUCTIONS, subject)
data_analyst_instr = format_instruction(DATA_ANALYST_AGENT_INSTRUCTIONS, subject)
content_creator_instr = format_instruction(CONTENT_CREATOR_AGENT_INSTRUCTIONS, subject)
quality_assurance_instr = format_instruction(QUALITY_ASSURANCE_AGENT_INSTRUCTIONS, subject)


In [37]:
# google_scraper_plugin.py
from semantic_kernel.functions import kernel_function
from googleapiclient.discovery import build
import requests
from bs4 import BeautifulSoup
from typing import List
import os

class WebScraperPlugin:
    def __init__(self, api_key: str, cse_id: str):
        self.api_key = api_key
        self.cse_id = cse_id

    @kernel_function(
        name="google_search",
        description="Performs a Google search and returns a list of URLs related to the query."
    )
    async def google_search(self, query: str, num_results: int = 5) -> List[str]:
        service = build("customsearch", "v1", developerKey=self.api_key)
        res = service.cse().list(q=query, cx=self.cse_id, num=num_results).execute()
        urls = [item['link'] for item in res.get("items", [])]
        return urls

    @kernel_function(
        name="scrape_url",
        description="Scrapes a given URL and returns the title and the first few paragraphs of text."
    )
    async def scrape_url(self, url: str) -> dict:
        try:
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
            }
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, "html.parser")

            title = soup.title.string.strip() if soup.title else "No Title"
            paragraphs = soup.find_all("p")
            text_content = "\n".join(p.get_text(strip=True) for p in paragraphs[:3])  # First 3 paragraphs

            return {
                "url": url,
                "title": title,
                "text": text_content
            }
        except Exception as e:
            return {
                "url": url,
                "error": str(e)
            }

web_scraper_tool = WebScraperPlugin(
    api_key=os.getenv("GOOGLE_API_KEY"),
    cse_id=os.getenv("GOOGLE_SEARCH_ENGINE_ID")
)


In [38]:
selection_function = KernelFunctionFromPrompt(
    function_name="selection",
    prompt="""
    Based on the conversation history, determine the next agent to take a turn. 
    Only return the name of the agent from the following list:
    - MarketNewsMonitor
    - DataAnalyst
    - ContentCreator
    - ContentOfficer

    Rules:
    - MarketNewsMonitor starts first.
    - After MarketNewsMonitor, DataAnalyst speaks.
    - After DataAnalyst, ContentCreator speaks.
    - After ContentCreator, ContentOfficer speaks.
    - If ContentOfficer requests changes, restart the cycle with MarketNewsMonitor.

    History:
    {{$history}}
    """,
)

In [39]:
termination_function = KernelFunctionFromPrompt(
    function_name="termination",
    prompt="""
    Determine if the structured content output (article and social_media_posts) is complete and valid JSON. 
    If the output is complete and valid, respond with "yes". Otherwise, respond with "no".

    History:
    {{$history}}
    """
)

In [40]:
output_settings = AzureChatPromptExecutionSettings(response_format=ContentOutput)


In [41]:
market_news_agent = ChatCompletionAgent(kernel=kernel,
                                         name="MarketNewsMonitor",
                                           instructions=market_news_instr,
                                            plugins=[web_scraper_tool]
                                           )
data_analyst_agent = ChatCompletionAgent(kernel=kernel, 
                                         name="DataAnalyst",
                                          instructions=data_analyst_instr,
                                          plugins=[web_scraper_tool])
content_creator_agent = ChatCompletionAgent(kernel=kernel, name="ContentCreator", instructions=content_creator_instr)
quality_assurance_agent = ChatCompletionAgent(kernel=kernel,
                                                 name="ContentOfficer",
                                                 instructions=quality_assurance_instr,
                                                 arguments=KernelArguments(settings=output_settings))

agent_group = AgentGroupChat(
    agents=[market_news_agent, data_analyst_agent, content_creator_agent, quality_assurance_agent],
    termination_strategy = KernelFunctionTerminationStrategy(
        agents=[quality_assurance_agent],
        function=termination_function,
        kernel=kernel,
        result_parser=lambda result: any("yes" in item.content.lower() for item in result.value) if isinstance(result.value, list) else "yes" in result.value.content.lower(),
        history_variable_name="history",
        maximum_iterations=10
    ),
    selection_strategy=KernelFunctionSelectionStrategy(
        function=selection_function,
        kernel=kernel,
        result_parser=lambda result: result.value[0].content.strip() if isinstance(result.value, list) and result.value else result.value.content.strip(),
        agent_variable_name="agents",
        history_variable_name="history"
    )
)

In [42]:
async def main(subject: str):

    initial_message = f"Subject: {subject}\n\nCollaborate to produce structured content output matching the provided schema."
    await agent_group.add_chat_message(message=initial_message)

    final_response = ""
    async for content in agent_group.invoke():
        print(f"# {content.name}: {content.content}\n")
        final_response = content.content

    return final_response

In [None]:
final_output = await main("Inflation in the US and the impact on the stock market in 2024")

# MarketNewsMonitor: ## Summary Report: US Inflation and Impact on Stock Market (2024)

### 1. Introduction
This report provides an overview of the latest developments related to inflation in the US and its potential effects on the stock market heading into 2024. As inflation continues to be a pivotal factor in economic policymaking and market behavior, understanding its trajectory is crucial for investors and stakeholders.

### 2. Current Status of US Inflation
**Consumer Price Index (CPI)**:
- Recent updates indicate discussions surrounding the Consumer Price Index, which serves as a key indicator of inflationary trends. The CPI is foundational for monitoring price changes in a basket of goods and services, making it significant for gauging inflation rates.
- The [Bureau of Labor Statistics](https://www.bls.gov/news.release/cpi.htm) remains the primary source for detailed CPI reports, providing timely updates on inflation statistics.

**Economic Indicators**:
- The [U.S. Bureau of Ec

In [None]:
final_output

"### Summary Report: Inflation in the US and the Impact on the Stock Market in 2024\n\n---\n\n#### Overview\nThis report examines the current state of inflation in the United States and its anticipated effects on the stock market as we approach 2024. It incorporates relevant findings from leading financial institutions and recent economic data.\n\n---\n\n#### Key Findings\n\n1. **Inflation Trends**:\n   - Current inflation rates are a pressing concern for U.S. consumers and investors. Recent projections suggest that inflation will maintain upward pressure on prices, which is crucial for understanding market dynamics. Resources like the U.S. Bank analysis emphasize the fluid nature of market conditions due to inflationary pressures.\n\n2. **Investor Sentiment**:\n   - Investors are becoming increasingly cautious as inflation rises. The need for strategic reassessment of portfolios is critical. Financial experts suggest that incorporating inflation-resistant assets—such as real estate, g

In [None]:
def extract_json(text: str) -> dict:
    pattern = re.compile(r"```json\s*(\{.*\})\s*```", re.DOTALL)
    match = pattern.search(text)
    if match:
        return json.loads(match.group(1))
    return json.loads(text.strip())

output_data = extract_json(final_output)
article = output_data.get("article", {})
posts = output_data.get("social_media_posts", [])

article_md = f"# {article.get('title')}\n\n{article.get('content')}\n\n**{article.get('call_to_action')}**"
display(Markdown(article_md))

for post in posts:
    print(f"Platform: {post['platform']}")
    print(textwrap.fill(post['content'], width=60))
    print('-' * 60)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)