In [1]:
import feedparser
import requests
import random
import time
import html
import json
import re
import os
import io
import spacy
from bs4 import BeautifulSoup
from unstructured.partition.html import partition_html
from collections import Counter
from datetime import datetime, timedelta
from email.utils import parsedate_to_datetime
from openai import OpenAI
from exa_py.api import Exa  # Import Exa from exa_py.api
from PIL import Image
import aiohttp
import asyncio
import logging
import nest_asyncio
import yfinance as yf
from textblob import TextBlob
from forex_python.converter import CurrencyRates
from pycoingecko import CoinGeckoAPI
from tavily import TavilyClient

In [2]:
# Apply asyncio patch to handle nested loops
nest_asyncio.apply()

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

In [8]:
class AIBlogGenerator:
    def __init__(
        self,
        feed_file="rss_feeds.json",
        portfolio_file="portfolio.json",
        exa_api_key=os.getenv("EXA_API_KEY"),
        tavily_api_key=os.getenv("TAVILY_API_KEY"),
    ):
        self.feed_file = feed_file
        self.portfolio_file = portfolio_file
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
        ]
        self.client = OpenAI(
            base_url="https://api.groq.com/openai/v1",
            api_key=os.getenv("GROQ_API_KEY"),
        )
        self.exa_client = Exa(api_key=exa_api_key)
        self.days_back = 7  # Analyze news from the past 7 days
        self.nlp = spacy.load("en_core_web_sm")
        self.currency_rates = CurrencyRates()
        self.cg = CoinGeckoAPI()
        self.tavily_api_key = "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"
        self.tavily_client = TavilyClient(self.tavily_api_key)

    def read_feed_urls(self):
        """Read RSS feed URLs from a JSON file."""
        with open(self.feed_file, "r") as file:
            return json.load(file)

    def read_portfolio(self):
        """Read portfolio data from a JSON file."""
        with open(self.portfolio_file, "r") as file:
            return json.load(file)

    def clean_html(self, content):
        """Remove HTML tags and clean up text."""
        try:
            soup = BeautifulSoup(
                content,
                "html.parser",
                headers={"User-Agent": random.choice(self.user_agents)},
            )
            clean_text = " ".join(soup.stripped_strings)
        except Exception as e:
            logging.error(f"Error cleaning HTML: {str(e)}")
            clean_text = content
        return html.unescape(clean_text)

    async def scrape_article_content(self, url):
        """Scrape article content from a URL."""
        headers = {
            "User-Agent": random.choice(self.user_agents),
            "Accept": "text/html",
        }
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=headers) as response:
                    response.raise_for_status()
                    elements = partition_html(text=await response.text())
                    content = " ".join([str(element) for element in elements])
                    clean_content = self.clean_html(content)
                    logging.info(f"Scraped content for {url}: {clean_content[:500]}...")
                    return clean_content
        except Exception as e:
            logging.error(f"Error scraping {url}: {str(e)}")
            return "Unable to scrape content"

    async def generate_blog_post(
        self, article_content, ticker, ticker_data=None, sentiment=None
    ):
        """Generate a blog post using AI."""

        if ticker_data and sentiment:
            prompt = f"""
            Based on the following article content, ticker data, and sentiment analysis, write a new blog post about {ticker}. 
            The new post should be original, engaging, and informative, while capturing the key 
            points and insights from the source material. It should also incorporate relevant 
            insights and analysis from the ticker data and sentiment analysis, where appropriate.
            Finally, suggest a market action (buy, sell, or hold) for {ticker} based on the analysis.

            Source content: {article_content[:4000]} 

            Ticker Data: {ticker_data}

            Sentiment Analysis: {sentiment}

            New blog post:
            """
        elif ticker_data:
            prompt = f"""
            Based on the following article content and ticker data, write a new blog post about {ticker}. 
            The new post should be original, engaging, and informative, while capturing the key 
            points and insights from the source material. It should also incorporate relevant 
            insights and analysis from the ticker data, where appropriate.

            Source content: {article_content[:4000]} 

            Ticker Data: {ticker_data}

            New blog post:
            """
        else:
            prompt = f"""
            Based on the following article content, write a new blog post about {ticker}. The new post should be 
            original, engaging, and informative, while capturing the key points and insights from 
            the source material.
            Source content: {article_content[:4000]} 
            New blog post:
            """

        max_retries = 5
        for attempt in range(max_retries):
            try:
                response = self.client.chat.completions.create(
                    model="llama-3.2-3b-preview",
                    messages=[
                        {
                            "role": "system",
                            "content": "You are a professional blogger who writes engaging and informative content across various topics, including technology, lifestyle, health, and culture. Adapt your tone and style to suit each subject while ensuring your writing is relatable and thought-provoking. When suggesting market actions, be cautious and avoid making strong recommendations. Use phrases like 'might be a good time to consider', 'could be an opportunity to', or 'investors may want to watch' to express potential actions without giving direct financial advice.",
                        },
                        {"role": "user", "content": prompt},
                    ],
                    max_tokens=4000,
                    temperature=0.5,
                )
                if response.choices and response.choices[0].message.content:
                    blog_post_content = response.choices[0].message.content.strip()

                    # Assuming the new title is returned at the beginning of the content
                    title = (
                        blog_post_content.split("\n")[0] if blog_post_content else ""
                    )  # Handle potential None
                    new_blog_post = (
                        "\n".join(blog_post_content.split("\n")[1:])
                        if blog_post_content
                        else ""
                    )  # Handle potential None

                    return (
                        title,
                        new_blog_post,
                    )  # Return both title and content
                else:
                    logging.error("OpenAI response is empty or incomplete.")
                    return None, None
            except Exception as e:
                if "429" in str(e):
                    wait_time = 2**attempt  # Exponential backoff
                    logging.warning(
                        f"Rate limit exceeded. Waiting for {wait_time} seconds."
                    )
                    time.sleep(wait_time)
                else:
                    logging.error(f"Error generating blog post: {str(e)}")
                    return None

    def find_similar_content(self, url, max_results=5):
        """Find similar articles using the Exa API."""
        try:
            response = self.exa_client.find_similar_and_contents(
                url, text=True, num_results=max_results
            )
            similar_articles = []
            for result in response.results:
                similar_articles.append(
                    {
                        "title": result.title,
                        "url": result.url,
                        "snippet": (
                            result.text[250:] if hasattr(result, "text") else None
                        ),  # snippet attribute may not exist
                        "text": result.text if hasattr(result, "text") else None,
                    }
                )
            return similar_articles
        except Exception as e:
            logging.error(f"Error finding similar content for URL '{url}': {str(e)}")
            return []

    def generate_image(
        self,
        prompt,
        output_dir=".",
        width=1024,
        height=768,
        num_inference_steps=50,
        guidance_scale=9,
        scheduler="heunpp2",
        **kwargs,
    ):
        """Generate an image using the Hugging Face FLUX.1-dev model."""
        API_URL = (
            "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-dev"
        )
        API_TOKEN = os.getenv("HF_API_TOKEN")
        headers = {"Authorization": f"Bearer {API_TOKEN}"}

        payload = {
            "inputs": prompt,
            "parameters": {
                "width": width,
                "height": height,
                "num_inference_steps": num_inference_steps,
                "guidance_scale": guidance_scale,
                "scheduler": scheduler,
                **kwargs,  # Pass additional parameters here
            },
        }

        max_retries = 5
        for attempt in range(max_retries):
            try:
                response = requests.post(API_URL, headers=headers, json=payload)
                response.raise_for_status()

                if response.status_code == 200:
                    try:
                        image = Image.open(io.BytesIO(response.content))
                        timestamp = int(time.time())
                        filename = f"image_{timestamp}.png"
                        output_path = os.path.join(output_dir, filename)
                        image.save(output_path)
                        return image, output_path
                    except Exception as e:
                        logging.error(f"Error processing image data: {str(e)}")
                        return None, None
                else:
                    logging.error(f"Image generation API failed: {response.text}")
                    return None, None
            except requests.exceptions.HTTPError as e:
                if "Model too busy" in str(e):
                    wait_time = 2**attempt  # Exponential backoff
                    logging.warning(
                        f"Model too busy. Waiting for {wait_time} seconds before retrying."
                    )
                    time.sleep(wait_time)
                else:
                    logging.error(f"Error generating image: {str(e)}")
                    return None, None
            except Exception as e:
                logging.error(f"Error generating image: {str(e)}")
                return None, None

    logging.error("Failed to generate image after multiple retries.")
    return None, None

    def extract_nouns(self, text):
        """Extract unique nouns from text using spaCy."""
        doc = self.nlp(text)
        nouns = [token.text for token in doc if token.pos_ == "NOUN"]
        return list(set(nouns))  # Return unique nouns

    def generate_refined_prompt_for_blog_post(self, post):
        """Generate a refined prompt for AI image generation."""
        summary = self.summarize_with_spacy(post["content"])
        nouns = self.extract_nouns(summary)
        key_elements = ", ".join(nouns[:7])

        prompt = f"""
        Create a visually-rich prompt for AI image generation based on the summary of this blog post:

        {summary}

        Key visual themes: {key_elements}

        The prompt should:
        1. Be concise and focused on visual elements.
        2. Seamlessly incorporate key visual concepts without explicitly listing them.
        3. Maintain a professional tone inline with {summary} .
        4. Be optimized for AI image generation models, emphasizing clarity and detail.

        Please provide only the refined prompt without any additional commentary or explanation. The example should serve as inspiration without replicating its structure or content directly.
        """
        try:
            response = self.client.chat.completions.create(
                model="llama-3.1-70b-versatile",
                messages=[
                    {
                        "role": "system",
                        "content": "You are an expert at creating prompts for AI image generation.",
                    },
                    {"role": "user", "content": prompt},
                ],
                max_tokens=150,
                temperature=0.7,
            )
            return (
                response.choices[0].message.content.strip()
                if response.choices[0].message.content
                else "A modern illustration representing key themes from the blog post."
            )  # Handle potential None
        except Exception as e:
            logging.error(f"Error generating refined prompt: {str(e)}")
            return "A modern illustration representing key themes from the blog post."

    def summarize_with_spacy(self, text):
        """Summarize text using spaCy."""
        doc = self.nlp(text)
        sentences = [sent.text for sent in doc.sents if len(sent.text.split()) > 5]
        return " ".join(sentences[:7])

    def extract_tickers(self, text):
        """Extract potential stock tickers from text using regular expressions."""
        ticker_pattern = r"\b([A-Z]{2,}[A-Z0-9\.\-]*|[A-Z]{3,}/[A-Z]{3,})\b"
        tickers = re.findall(ticker_pattern, text)
        return list(set(tickers))  # Return unique tickers

    def fetch_ticker_data(self, ticker):
        """Fetch relevant ticker data using yfinance and CoinGeckoAPI."""
        ticker_data = {}
        try:
            # Check if it's a forex pair (e.g., EUR/USD)
            if "/" in ticker:
                base_currency, quote_currency = ticker.split("/")
                rate = self.currency_rates.get_rate(base_currency, quote_currency)
                ticker_data[ticker] = {"rate": rate}
            else:
                # Try fetching data from yfinance first (for stocks)
                try:
                    ticker_info = yf.Ticker(ticker)
                    data = {
                        "current_price": ticker_info.info.get("currentPrice"),
                        "day_high": ticker_info.info.get("dayHigh"),
                        "day_low": ticker_info.info.get("dayLow"),
                        "volume": ticker_info.info.get("volume"),
                        # Add more fields as needed (e.g., 52-week high/low, market cap, etc.)
                    }
                    ticker_data[ticker] = data
                except Exception as e:
                    # If yfinance fails, try fetching from CoinGeckoAPI (for crypto)
                    try:
                        coin_data = self.cg.get_coin_by_id(ticker.lower())
                        data = {
                            "current_price": coin_data["market_data"]["current_price"][
                                "usd"
                            ],
                            "day_high": coin_data["market_data"]["high_24h"]["usd"],
                            "day_low": coin_data["market_data"]["low_24h"]["usd"],
                            # Add more fields as needed from coin_data
                        }
                        ticker_data[ticker] = data
                    except Exception as e:
                        logging.error(
                            f"Error fetching data for ticker '{ticker}': {str(e)}"
                        )
        except Exception as e:
            logging.error(f"Error fetching data for ticker '{ticker}': {str(e)}")
        return ticker_data

    def analyze_sentiment(self, text):
        """Analyze the sentiment of the text using TextBlob."""
        analysis = TextBlob(text)
        return {
            "polarity": analysis.polarity,
            "subjectivity": analysis.subjectivity,
        }

    async def process_portfolio(self):
        """Process the portfolio and generate blog posts for each ticker."""
        portfolio = self.read_portfolio()

        # Add validation before accessing portfolio data
        if not portfolio or "portfolio" not in portfolio:
            logging.error("Invalid portfolio data structure")
            return

        for category, tickers in portfolio["portfolio"].items():
            output_dir = f"output/{category}"
            os.makedirs(output_dir, exist_ok=True)
            for ticker in tickers:
                output_file = f"{output_dir}/{ticker}_blog_posts_{datetime.now().strftime('%Y-%m-%d')}.md"
                image_dir = f"{output_dir}/images"
                os.makedirs(image_dir, exist_ok=True)

                # Calculate date range for the last 7 days
                end_date = datetime.now()
                start_date = end_date - timedelta(days=self.days_back)

                # Perform Tavily client for news articles about the ticker
                client_response = self.tavily_client.search(
                    query=f"{ticker} news",
                    from_date=start_date.strftime("%Y-%m-%d"),
                    to_date=end_date.strftime("%Y-%m-%d"),
                )

                # Ensure client_response is a dictionary and contains the 'results' key
                if (
                    not isinstance(client_response, dict)
                    or "results" not in client_response
                ):
                    logging.warning(
                        f"Tavily search returned an invalid response for ticker: {ticker}."
                    )
                    continue

                client_results = client_response.get("results", [])

                for result in client_results:
                    try:
                        link = result.get("url", "")
                        title = result.get("title", "Untitled")
                        if not link:
                            logging.warning("Skipping result with missing URL.")
                            continue
                        logging.info(f"Processing article: {title}")

                        article_content = await self.scrape_article_content(link)
                        if article_content is None:
                            logging.warning(
                                f"Failed to scrape content for article: {title}"
                            )
                            continue

                        ticker_data = self.fetch_ticker_data(ticker)
                        if not ticker_data:
                            logging.warning(
                                f"Failed to fetch ticker data for ticker: {ticker}"
                            )
                            continue

                        sentiment = self.analyze_sentiment(article_content)
                        if sentiment is None:
                            logging.warning(
                                f"Failed to analyze sentiment for article: {title}"
                            )
                            continue

                        new_title, new_blog_post = await self.generate_blog_post(
                            article_content, ticker, ticker_data, sentiment
                        )

                        if new_blog_post:
                            refined_prompt = self.generate_refined_prompt_for_blog_post(
                                {"content": article_content}
                            )
                            image, image_path = self.generate_image(
                                refined_prompt, output_dir=image_dir
                            )

                            clean_title = new_title.strip("*") if new_title else ""
                            with open(output_file, "a") as f:
                                f.write(f"# {clean_title}\n\n")
                                f.write(new_blog_post + "\n\n")
                                f.write(f"Original article: {title}\n\n")
                                if image_path:
                                    f.write(f"![Image]({image_path})\n\n")

                            logging.info(f"Generated blog post for {ticker}.")
                        else:
                            logging.warning(
                                f"Failed to generate blog post for {ticker}."
                            )
                    except Exception as e:
                        logging.error(f"Error processing search result: {str(e)}")

In [None]:
if __name__ == "__main__":
    blog_generator = AIBlogGenerator()
    asyncio.run(blog_generator.process_portfolio())

# NEW VERSION


In [63]:
class AIBlogGenerator:
    def __init__(
        self,
        portfolio_file="portfolio.json",
        exa_api_key=os.getenv("EXA_API_KEY"),
        tavily_api_key=os.getenv("TAVILY_API_KEY"),
    ):
        self.portfolio_file = portfolio_file
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
        ]
        self.client = OpenAI(
            base_url="https://api.groq.com/openai/v1",
            api_key=os.getenv("GROQ_API_KEY"),
        )
        self.exa_client = Exa(api_key=exa_api_key)
        self.days_back = 7  # Analyze news from the past 7 days
        self.nlp = spacy.load("en_core_web_sm")
        self.currency_rates = CurrencyRates()
        self.cg = CoinGeckoAPI()
        self.tavily_api_key = "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"
        self.tavily_client = TavilyClient(self.tavily_api_key)

    def read_portfolio(self) -> Dict[str, List[str]]:
        try:
            with open("portfolio.json", "r") as f:
                raw_data = json.load(f)
                portfolio = raw_data.get(
                    "portfolio", {}
                )  # Access nested "portfolio" key
                self.logger.log_operation(
                    "portfolio_loaded",
                    tickers_count=sum(len(v) for v in portfolio.values()),
                )
                return portfolio
        except Exception as e:
            self.metrics.error_count.labels(type="portfolio_read").inc()
            self.logger.log_operation("portfolio_read_failed", error=str(e))
            return {}

    def clean_html(self, content):
        """Remove HTML tags and clean up text."""
        try:
            soup = BeautifulSoup(content, "html.parser")
            clean_text = " ".join(soup.stripped_strings)
        except Exception as e:
            logging.error(f"Error cleaning HTML: {str(e)}")
            clean_text = content
        return html.unescape(clean_text)

    async def scrape_article_content(self, url):
        """Scrape article content from a URL."""
        headers = {
            "User-Agent": random.choice(self.user_agents),
            "Accept": "text/html",
        }
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=headers) as response:
                    logging.info(f"Response headers for {url}: {response.headers}")
                    response.raise_for_status()
                    elements = partition_html(text=await response.text())
                    content = " ".join([str(element) for element in elements])
                    clean_content = self.clean_html(content)
                    logging.info(f"Scraped content for {url}: {clean_content[:500]}...")
                    return clean_content
        except aiohttp.ClientResponseError as e:
            logging.error(
                f"Error scraping {url}: {e.status}, message='{e.message}', url='{e.request_info.url}'"
            )
            return "Unable to scrape content"
        except Exception as e:
            logging.error(f"Error scraping {url}: {str(e)}")
            return "Unable to scrape content"

    async def generate_blog_post(
        self, article_content, ticker, ticker_data=None, sentiment=None
    ):
        """Generate a blog post using AI."""

        if ticker_data and sentiment:
            prompt = f"""
            Based on the following article content, ticker data, and sentiment analysis, write a new blog post about {ticker}. 
            The new post should be original, engaging, and informative, while capturing the key 
            points and insights from the source material. It should also incorporate relevant 
            insights and analysis from the ticker data and sentiment analysis, where appropriate.
            Finally, suggest a market action (buy, sell, or hold) for {ticker} based on the analysis.

            Source content: {article_content[:4000]} 

            Ticker Data: {ticker_data}

            Sentiment Analysis: {sentiment}

            New blog post:
            """
        elif ticker_data:
            prompt = f"""
            Based on the following article content and ticker data, write a new blog post about {ticker}. 
            The new post should be original, engaging, and informative, while capturing the key 
            points and insights from the source material. It should also incorporate relevant 
            insights and analysis from the ticker data, where appropriate.

            Source content: {article_content[:4000]} 

            Ticker Data: {ticker_data}

            New blog post:
            """
        else:
            prompt = f"""
            Based on the following article content, write a new blog post about {ticker}. The new post should be 
            original, engaging, and informative, while capturing the key points and insights from 
            the source material.
            Source content: {article_content[:4000]} 
            New blog post:
            """

        max_retries = 5
        for attempt in range(max_retries):
            try:
                response = self.client.chat.completions.create(
                    model="llama-3.2-3b-preview",
                    messages=[
                        {
                            "role": "system",
                            "content": "You are a professional blogger who writes engaging and informative content across various topics, including technology, lifestyle, health, and culture. Adapt your tone and style to suit each subject while ensuring your writing is relatable and thought-provoking. When suggesting market actions, be cautious and avoid making strong recommendations. Use phrases like 'might be a good time to consider', 'could be an opportunity to', or 'investors may want to watch' to express potential actions without giving direct financial advice.",
                        },
                        {"role": "user", "content": prompt},
                    ],
                    max_tokens=4000,
                    temperature=0.5,
                )
                if response.choices and response.choices[0].message.content:
                    blog_post_content = response.choices[0].message.content.strip()

                    # Assuming the new title is returned at the beginning of the content
                    title = (
                        blog_post_content.split("\n")[0] if blog_post_content else ""
                    )  # Handle potential None
                    new_blog_post = (
                        "\n".join(blog_post_content.split("\n")[1:])
                        if blog_post_content
                        else ""
                    )  # Handle potential None

                    return (
                        title,
                        new_blog_post,
                    )  # Return both title and content
                else:
                    logging.error("OpenAI response is empty or incomplete.")
                    return None, None
            except Exception as e:
                if "429" in str(e):
                    wait_time = 2**attempt  # Exponential backoff
                    logging.warning(
                        f"Rate limit exceeded. Waiting for {wait_time} seconds."
                    )
                    time.sleep(wait_time)
                else:
                    logging.error(f"Error generating blog post: {str(e)}")
                    return None

    def find_similar_content(self, url, max_results=5):
        """Find similar articles using the Exa API."""
        try:
            response = self.exa_client.find_similar_and_contents(
                url, text=True, num_results=max_results
            )
            similar_articles = []
            for result in response.results:
                similar_articles.append(
                    {
                        "title": result.title,
                        "url": result.url,
                        "text": result.text if hasattr(result, "text") else None,
                    }
                )
            return similar_articles
        except Exception as e:
            logging.error(f"Error finding similar content for URL '{url}': {str(e)}")
            return []

    def generate_image(
        self,
        prompt,
        output_dir=".",
        width=1024,
        height=768,
        num_inference_steps=50,
        guidance_scale=9,
        scheduler="heunpp2",
        **kwargs,
    ):
        """Generate an image using the Hugging Face FLUX.1-dev model."""
        API_URL = (
            "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-dev"
        )
        API_TOKEN = os.getenv("HF_API_TOKEN")
        headers = {"Authorization": f"Bearer {API_TOKEN}"}

        payload = {
            "inputs": prompt,
            "parameters": {
                "width": width,
                "height": height,
                "num_inference_steps": num_inference_steps,
                "guidance_scale": guidance_scale,
                "scheduler": scheduler,
                **kwargs,  # Pass additional parameters here
            },
        }

        response = requests.post(API_URL, headers=headers, json=payload)

        if response.status_code == 200:
            try:
                image = Image.open(io.BytesIO(response.content))
                timestamp = int(time.time())
                filename = f"image_{timestamp}.png"
                output_path = os.path.join(output_dir, filename)
                image.save(output_path)
                return image, output_path
            except Exception as e:
                logging.error(f"Error processing image data: {str(e)}")
                return None, None
        else:
            logging.error(f"Image generation API failed: {response.text}")
            return None, None

    def extract_nouns(self, text):
        """Extract unique nouns from text using spaCy."""
        doc = self.nlp(text)
        nouns = [token.text for token in doc if token.pos_ == "NOUN"]
        return list(set(nouns))  # Return unique nouns

    def generate_refined_prompt_for_blog_post(self, post):
        """Generate a refined prompt for AI image generation."""
        summary = self.summarize_with_spacy(post["content"])
        nouns = self.extract_nouns(summary)
        key_elements = ", ".join(nouns[:7])

        prompt = f"""
        Create a visually-rich prompt for AI image generation based on the summary of this blog post:

        {summary}

        Key visual themes: {key_elements}

        The prompt should:
        1. Be concise and focused on visual elements.
        2. Seamlessly incorporate key visual concepts without explicitly listing them.
        3. Maintain a professional tone inline with {summary} .
        4. Be optimized for AI image generation models, emphasizing clarity and detail.

        Please provide only the refined prompt without any additional commentary or explanation. The example should serve as inspiration without replicating its structure or content directly.
        """
        try:
            response = self.client.chat.completions.create(
                model="llama-3.1-70b-versatile",
                messages=[
                    {
                        "role": "system",
                        "content": "You are an expert at creating prompts for AI image generation.",
                    },
                    {"role": "user", "content": prompt},
                ],
                max_tokens=150,
                temperature=0.7,
            )
            return (
                response.choices[0].message.content.strip()
                if response.choices[0].message.content
                else "A modern illustration representing key themes from the blog post."
            )  # Handle potential None
        except Exception as e:
            logging.error(f"Error generating refined prompt: {str(e)}")
            return "A modern illustration representing key themes from the blog post."

    def summarize_with_spacy(self, text):
        """Summarize text using spaCy."""
        doc = self.nlp(text)
        sentences = [sent.text for sent in doc.sents if len(sent.text.split()) > 5]
        return " ".join(sentences[:7])

    def extract_tickers(self, text):
        """Extract potential stock tickers from text using regular expressions."""
        ticker_pattern = r"\b([A-Z]{2,}[A-Z0-9\.\-]*|[A-Z]{3,}/[A-Z]{3,})\b"
        tickers = re.findall(ticker_pattern, text)
        return list(set(tickers))  # Return unique tickers

    def fetch_ticker_data(self, ticker):
        """Fetch relevant ticker data using yfinance and CoinGeckoAPI."""
        ticker_data = {}
        try:
            # Check if it's a forex pair (e.g., EUR/USD)
            if "/" in ticker:
                base_currency, quote_currency = ticker.split("/")
                rate = self.currency_rates.get_rate(base_currency, quote_currency)
                ticker_data[ticker] = {"rate": rate}
            else:
                # Try fetching data from yfinance first (for stocks)
                try:
                    ticker_info = yf.Ticker(ticker)
                    data = {
                        "current_price": ticker_info.info.get("currentPrice"),
                        "day_high": ticker_info.info.get("dayHigh"),
                        "day_low": ticker_info.info.get("dayLow"),
                        "volume": ticker_info.info.get("volume"),
                        "market_cap": ticker_info.info.get("marketCap"),
                        # Add more fields as needed (e.g., 52-week high/low, market cap, etc.)
                    }
                    ticker_data[ticker] = data
                except Exception as e:
                    # If yfinance fails, try fetching from CoinGeckoAPI (for crypto)
                    try:
                        coin_data = self.cg.get_coin_by_id(ticker.lower())
                        data = {
                            "current_price": coin_data["market_data"]["current_price"][
                                "usd"
                            ],
                            "day_high": coin_data["market_data"]["high_24h"]["usd"],
                            "day_low": coin_data["market_data"]["low_24h"]["usd"],
                            "volume": coin_data["market_data"]["total_volume"]["usd"],
                            "market_cap": coin_data["market_data"]["market_cap"]["usd"],
                            # Add more fields as needed from coin_data
                        }
                        ticker_data[ticker] = data
                    except Exception as e:
                        logging.error(
                            f"Error fetching data for ticker '{ticker}' from CoinGeckoAPI: {str(e)}"
                        )
        except Exception as e:
            logging.error(f"Error fetching data for ticker '{ticker}': {str(e)}")
        return ticker_data

    def analyze_sentiment(self, text):
        """Analyze the sentiment of the text using TextBlob."""
        analysis = TextBlob(text)
        return {
            "polarity": analysis.polarity,
            "subjectivity": analysis.subjectivity,
        }

    async def process_portfolio(self):
        """Process the portfolio and generate blog posts for each ticker."""
        portfolio = self.read_portfolio()

        # Add validation before accessing portfolio data
        if not portfolio or "portfolio" not in portfolio:
            logging.error("Invalid portfolio data structure")
            return

        for category, tickers in portfolio["portfolio"].items():
            output_dir = f"output/{category}"
            os.makedirs(output_dir, exist_ok=True)
            for ticker in tickers:
                output_file = f"{output_dir}/{ticker}_blog_posts_{datetime.now().strftime('%Y-%m-%d')}.md"
                image_dir = f"{output_dir}/images"
                os.makedirs(image_dir, exist_ok=True)

                # Calculate date range for the last 7 days
                end_date = datetime.now()
                start_date = end_date - timedelta(days=self.days_back)

                # Perform Tavily client for news articles about the ticker
                client_response = self.tavily_client.search(
                    query=f"Latest {ticker} news",
                    topic="news",
                    include_raw_content=True,
                    days=self.days_back,
                    from_date=start_date.strftime("%Y-%m-%d"),
                    to_date=end_date.strftime("%Y-%m-%d"),
                )

                # Ensure client_response is a dictionary and contains the 'results' key
                if (
                    not isinstance(client_response, dict)
                    or "results" not in client_response
                ):
                    logging.warning(
                        f"Tavily search returned an invalid response for ticker: {ticker}."
                    )
                    continue

                client_results = client_response.get("results", [])

                all_article_content = []

                for result in client_results:
                    try:
                        link = result.get("url", "")
                        title = result.get("title", "Untitled")
                        if not link:
                            logging.warning("Skipping result with missing URL.")
                            continue
                        logging.info(f"Processing article: {title}")

                        article_content = await self.scrape_article_content(link)
                        if article_content is None:
                            logging.warning(
                                f"Failed to scrape content for article: {title}"
                            )
                            continue

                        all_article_content.append(article_content)

                        # Find similar content for the article
                        similar_articles = self.find_similar_content(link)
                        for similar_article in similar_articles:
                            similar_content = await self.scrape_article_content(
                                similar_article["url"]
                            )
                            if similar_content:
                                all_article_content.append(similar_content)

                    except Exception as e:
                        logging.error(f"Error processing search result: {str(e)}")

                if all_article_content:
                    combined_content = "\n\n".join(all_article_content)
                    ticker_data = self.fetch_ticker_data(ticker)
                    sentiment = self.analyze_sentiment(combined_content)

                    new_title, new_blog_post = await self.generate_blog_post(
                        combined_content, ticker, ticker_data, sentiment
                    )

                    if new_blog_post:
                        refined_prompt = self.generate_refined_prompt_for_blog_post(
                            {"content": combined_content}
                        )
                        image, image_path = self.generate_image(
                            refined_prompt, output_dir=image_dir
                        )

                        clean_title = new_title.strip("*") if new_title else ""
                        with open(output_file, "a") as f:
                            f.write(f"# {clean_title}\n\n")
                            f.write(new_blog_post + "\n\n")
                            if image_path:
                                f.write(f"![Image]({image_path})\n\n")

                        logging.info(f"Generated blog post for {ticker}.")
                    else:
                        logging.warning(f"Failed to generate blog post for {ticker}.")
                else:
                    logging.warning(f"No article content found for {ticker}.")

In [None]:
if __name__ == "__main__":
    blog_generator = AIBlogGenerator()
    asyncio.run(blog_generator.process_portfolio())

## Tavily API


In [None]:
class AIBlogGenerator:
    def __init__(
        self,
        portfolio_file="portfolio.json",
        tavily_api_key=os.getenv("TAVILY_API_KEY"),
    ):
        self.portfolio_file = portfolio_file
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
        ]
        self.client = OpenAI(
            base_url="https://api.groq.com/openai/v1",
            api_key=os.getenv("GROQ_API_KEY"),
        )
        self.days_back = 7  # Analyze news from the past 7 days
        self.nlp = spacy.load("en_core_web_sm")
        self.currency_rates = CurrencyRates()
        self.cg = CoinGeckoAPI()
        self.tavily_api_key = "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"
        self.tavily_client = TavilyClient(self.tavily_api_key)

    def read_portfolio(self):
        """Read portfolio data from a JSON file."""
        with open(self.portfolio_file, "r") as file:
            return json.load(file)

    def clean_html(self, content):
        """Remove HTML tags and clean up text."""
        try:
            soup = BeautifulSoup(content, "html.parser")
            clean_text = " ".join(soup.stripped_strings)
        except Exception as e:
            logging.error(f"Error cleaning HTML: {str(e)}")
            clean_text = content
        return html.unescape(clean_text)

    async def generate_blog_post(
        self, article_content, ticker, ticker_data=None, sentiment=None
    ):
        """Generate a blog post using AI."""

        if ticker_data and sentiment:
            prompt = f"""
            Based on the following article content, ticker data, and sentiment analysis, write a new blog post about {ticker}. 
            The new post should be original, engaging, and informative, while capturing the key 
            points and insights from the source material. It should also incorporate relevant 
            insights and analysis from the ticker data and sentiment analysis, where appropriate.
            Finally, suggest a market action (buy, sell, or hold) for {ticker} based on the analysis.

            Source content: {article_content[:4000]} 

            Ticker Data: {ticker_data}

            Sentiment Analysis: {sentiment}

            New blog post:
            """
        elif ticker_data:
            prompt = f"""
            Based on the following article content and ticker data, write a new blog post about {ticker}. 
            The new post should be original, engaging, and informative, while capturing the key 
            points and insights from the source material. It should also incorporate relevant 
            insights and analysis from the ticker data, where appropriate.

            Source content: {article_content[:4000]} 

            Ticker Data: {ticker_data}

            New blog post:
            """
        else:
            prompt = f"""
            Based on the following article content, write a new blog post about {ticker}. The new post should be 
            original, engaging, and informative, while capturing the key points and insights from 
            the source material.
            Source content: {article_content[:4000]} 
            New blog post:
            """

        max_retries = 5
        for attempt in range(max_retries):
            try:
                response = self.client.chat.completions.create(
                    model="llama-3.2-3b-preview",
                    messages=[
                        {
                            "role": "system",
                            "content": "You are a professional blogger who writes engaging and informative content across various topics, including technology, lifestyle, health, and culture. Adapt your tone and style to suit each subject while ensuring your writing is relatable and thought-provoking. When suggesting market actions, be cautious and avoid making strong recommendations. Use phrases like 'might be a good time to consider', 'could be an opportunity to', or 'investors may want to watch' to express potential actions without giving direct financial advice.",
                        },
                        {"role": "user", "content": prompt},
                    ],
                    max_tokens=4000,
                    temperature=0.5,
                )
                if response.choices and response.choices[0].message.content:
                    blog_post_content = response.choices[0].message.content.strip()

                    # Assuming the new title is returned at the beginning of the content
                    title = (
                        blog_post_content.split("\n")[0] if blog_post_content else ""
                    )  # Handle potential None
                    new_blog_post = (
                        "\n".join(blog_post_content.split("\n")[1:])
                        if blog_post_content
                        else ""
                    )  # Handle potential None

                    return (
                        title,
                        new_blog_post,
                    )  # Return both title and content
                else:
                    logging.error("OpenAI response is empty or incomplete.")
                    return None, None
            except Exception as e:
                if "429" in str(e):
                    wait_time = 2**attempt  # Exponential backoff
                    logging.warning(
                        f"Rate limit exceeded. Waiting for {wait_time} seconds."
                    )
                    time.sleep(wait_time)
                else:
                    logging.error(f"Error generating blog post: {str(e)}")
                    return None

    def generate_image(
        self,
        prompt,
        output_dir=".",
        width=1024,
        height=768,
        num_inference_steps=50,
        guidance_scale=9,
        scheduler="heunpp2",
        **kwargs,
    ):
        """Generate an image using the Hugging Face FLUX.1-dev model."""
        API_URL = (
            "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-dev"
        )
        API_TOKEN = os.getenv("HF_API_TOKEN")
        headers = {"Authorization": f"Bearer {API_TOKEN}"}

        payload = {
            "inputs": prompt,
            "parameters": {
                "width": width,
                "height": height,
                "num_inference_steps": num_inference_steps,
                "guidance_scale": guidance_scale,
                "scheduler": scheduler,
                **kwargs,  # Pass additional parameters here
            },
        }

        max_retries = 5
        for attempt in range(max_retries):
            try:
                response = requests.post(API_URL, headers=headers, json=payload)
                response.raise_for_status()

                if response.status_code == 200:
                    try:
                        image = Image.open(io.BytesIO(response.content))
                        timestamp = int(time.time())
                        filename = f"image_{timestamp}.png"
                        output_path = os.path.join(output_dir, filename)
                        image.save(output_path)
                        return image, output_path
                    except Exception as e:
                        logging.error(f"Error processing image data: {str(e)}")
                        return None, None
                else:
                    logging.error(f"Image generation API failed: {response.text}")
                    return None, None
            except requests.exceptions.HTTPError as e:
                if "Model too busy" in str(e):
                    wait_time = 2**attempt  # Exponential backoff
                    logging.warning(
                        f"Model too busy. Waiting for {wait_time} seconds before retrying."
                    )
                    time.sleep(wait_time)
                else:
                    logging.error(f"Error generating image: {str(e)}")
                    return None, None
            except Exception as e:
                logging.error(f"Error generating image: {str(e)}")
                return None, None

        logging.error("Failed to generate image after multiple retries.")
        return None, None

    def extract_nouns(self, text):
        """Extract unique nouns from text using spaCy."""
        doc = self.nlp(text)
        nouns = [token.text for token in doc if token.pos_ == "NOUN"]
        return list(set(nouns))  # Return unique nouns

    def generate_refined_prompt_for_blog_post(self, post):
        """Generate a refined prompt for AI image generation."""
        summary = self.summarize_with_spacy(post["content"])
        nouns = self.extract_nouns(summary)
        key_elements = ", ".join(nouns[:7])

        prompt = f"""
        Create a visually-rich prompt for AI image generation based on the summary of this blog post:

        {summary}

        Key visual themes: {key_elements}

        The prompt should:
        1. Be concise and focused on visual elements.
        2. Seamlessly incorporate key visual concepts without explicitly listing them.
        3. Maintain a professional tone inline with {summary} .
        4. Be optimized for AI image generation models, emphasizing clarity and detail.

        Please provide only the refined prompt without any additional commentary or explanation. The example should serve as inspiration without replicating its structure or content directly.
        """
        try:
            response = self.client.chat.completions.create(
                model="llama-3.1-70b-versatile",
                messages=[
                    {
                        "role": "system",
                        "content": "You are an expert at creating prompts for AI image generation.",
                    },
                    {"role": "user", "content": prompt},
                ],
                max_tokens=100,
                temperature=0.7,
            )
            return (
                response.choices[0].message.content.strip()
                if response.choices[0].message.content
                else "A modern illustration representing key themes from the blog post."
            )  # Handle potential None
        except Exception as e:
            logging.error(f"Error generating refined prompt: {str(e)}")
            return "A modern illustration representing key themes from the blog post."

    def summarize_with_spacy(self, text):
        """Summarize text using spaCy."""
        doc = self.nlp(text)
        sentences = [sent.text for sent in doc.sents if len(sent.text.split()) > 5]
        return " ".join(sentences[:7])

    def extract_tickers(self, text):
        """Extract potential stock tickers from text using regular expressions."""
        ticker_pattern = r"\b([A-Z]{2,}[A-Z0-9\.\-]*|[A-Z]{3,}/[A-Z]{3,})\b"
        tickers = re.findall(ticker_pattern, text)
        return list(set(tickers))  # Return unique tickers

    def fetch_ticker_data(self, ticker):
        """Fetch relevant ticker data using yfinance and CoinGeckoAPI."""
        ticker_data = {}
        try:
            # Check if it's a forex pair (e.g., EUR/USD)
            if "/" in ticker:
                base_currency, quote_currency = ticker.split("/")
                rate = self.currency_rates.get_rate(base_currency, quote_currency)
                ticker_data[ticker] = {"rate": rate}
            else:
                # Try fetching data from yfinance first (for stocks)
                try:
                    ticker_info = yf.Ticker(ticker)
                    data = {
                        "current_price": ticker_info.info.get("currentPrice"),
                        "day_high": ticker_info.info.get("dayHigh"),
                        "day_low": ticker_info.info.get("dayLow"),
                        "volume": ticker_info.info.get("volume"),
                        "market_cap": ticker_info.info.get("marketCap"),
                        # Add more fields as needed (e.g., 52-week high/low, market cap, etc.)
                    }
                    ticker_data[ticker] = data
                except Exception as e:
                    # If yfinance fails, try fetching from CoinGeckoAPI (for crypto)
                    try:
                        coin_data = self.cg.get_coin_by_id(ticker.lower())
                        data = {
                            "current_price": coin_data["market_data"]["current_price"][
                                "usd"
                            ],
                            "day_high": coin_data["market_data"]["high_24h"]["usd"],
                            "day_low": coin_data["market_data"]["low_24h"]["usd"],
                            "volume": coin_data["market_data"]["total_volume"]["usd"],
                            "market_cap": coin_data["market_data"]["market_cap"]["usd"],
                            # Add more fields as needed from coin_data
                        }
                        ticker_data[ticker] = data
                    except Exception as e:
                        logging.error(
                            f"Error fetching data for ticker '{ticker}': {str(e)}"
                        )
        except Exception as e:
            logging.error(f"Error fetching data for ticker '{ticker}': {str(e)}")
        return ticker_data

    def analyze_sentiment(self, text):
        """Analyze the sentiment of the text using TextBlob."""
        analysis = TextBlob(text)
        return {
            "polarity": analysis.polarity,
            "subjectivity": analysis.subjectivity,
        }

    async def process_portfolio(self):
        """Process the portfolio and generate blog posts for each ticker."""
        portfolio = self.read_portfolio()

        # Add validation before accessing portfolio data
        if not portfolio or "portfolio" not in portfolio:
            logging.error("Invalid portfolio data structure")
            return

        for category, tickers in portfolio["portfolio"].items():
            output_dir = f"output/{category}"
            os.makedirs(output_dir, exist_ok=True)
            for ticker in tickers:
                output_file = f"{output_dir}/{ticker}_blog_posts_{datetime.now().strftime('%Y-%m-%d')}.md"
                image_dir = f"{output_dir}/images"
                os.makedirs(image_dir, exist_ok=True)

                # Calculate date range for the last 7 days
                end_date = datetime.now()
                start_date = end_date - timedelta(days=self.days_back)

                # Perform Tavily client for news articles about the ticker
                client_response = self.tavily_client.search(
                    query=f"Latest {ticker} news and analysis",
                    from_date=start_date.strftime("%Y-%m-%d"),
                    to_date=end_date.strftime("%Y-%m-%d"),
                )

                # Ensure client_response is a dictionary and contains the 'results' key
                if (
                    not isinstance(client_response, dict)
                    or "results" not in client_response
                ):
                    logging.warning(
                        f"Tavily search returned an invalid response for ticker: {ticker}."
                    )
                    continue

                client_results = client_response.get("results", [])

                all_article_content = []

                for result in client_results:
                    try:
                        link = result.get("url", "")
                        title = result.get("title", "Untitled")
                        if not link:
                            logging.warning("Skipping result with missing URL.")
                            continue
                        logging.info(f"Processing article: {title}")

                        # Extract raw content using Tavily API
                        extract_response = self.tavily_client.extract(urls=[link])
                        if extract_response and "results" in extract_response:
                            for extract_result in extract_response["results"]:
                                raw_content = extract_result.get("raw_content", "")
                                all_article_content.append(raw_content)

                    except Exception as e:
                        logging.error(f"Error processing search result: {str(e)}")

                if all_article_content:
                    combined_content = "\n\n".join(all_article_content)
                    ticker_data = self.fetch_ticker_data(ticker)
                    sentiment = self.analyze_sentiment(combined_content)

                    new_title, new_blog_post = await self.generate_blog_post(
                        combined_content, ticker, ticker_data, sentiment
                    )

                    if new_blog_post:
                        refined_prompt = self.generate_refined_prompt_for_blog_post(
                            {"content": combined_content}
                        )
                        image, image_path = self.generate_image(
                            refined_prompt, output_dir=image_dir
                        )

                        clean_title = new_title.strip("*") if new_title else ""
                        with open(output_file, "a") as f:
                            f.write(f"# {clean_title}\n\n")
                            f.write(new_blog_post + "\n\n")
                            if image_path:
                                f.write(f"![Image]({image_path})\n\n")

                        logging.info(f"Generated insight for {ticker}.")
                    else:
                        logging.warning(f"Failed to generate insight for {ticker}.")
                else:
                    logging.warning(f"No article content found for {ticker}.")


if __name__ == "__main__":
    blog_generator = AIBlogGenerator()
    asyncio.run(blog_generator.process_portfolio())

In [None]:
import os
import json
import logging
import asyncio
import aiohttp
import spacy
import time
import io
import re
import html
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from openai import OpenAI
from PIL import Image
from forex_python.converter import CurrencyRates
from pycoingecko import CoinGeckoAPI
from textblob import TextBlob
import yfinance as yf
from tavily import TavilyClient
from redis import Redis
from prometheus_client import Counter, Histogram, start_http_server
import structlog
from pydantic_settings import BaseSettings
from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple, Any
import numpy as np
from fastapi import FastAPI, BackgroundTasks
import uvicorn

class Settings(BaseSettings):
This class defines the application settings for the project. It inherits from the `BaseSettings` class from the `pydantic_settings` library, which allows for easy loading of environment variables.
    
    The settings include:
    - `GROQ_API_KEY`: The API key for the GROQ API
    - `TAVILY_API_KEY`: The API key for the Tavily API
    - `HF_API_KEY`: The API key for the Hugging Face API
    - `REDIS_URL`: The URL for the Redis server
    - `PROMETHEUS_PORT`: The port for the Prometheus metrics server (default is 8000)
    - `LOG_LEVEL`: The logging level for the application (default is "INFO")
    - `BATCH_SIZE`: The batch size for processing data (default is 10)
    - `CACHE_TTL`: The time-to-live for cached data (default is 3600 seconds, or 1 hour)
    
    The `Config` class specifies that the environment variables should be loaded from a `.env` file.
        GROQ_API_KEY: str
    TAVILY_API_KEY: str
    HF_API_KEY: str
    REDIS_URL: str
    PROMETHEUS_PORT: int = 8000
    LOG_LEVEL: str = "INFO"
    BATCH_SIZE: int = 10
    CACHE_TTL: int = 3600
    
    class Config:
        env_file = ".env"

The `MetricsCollector` class is responsible for collecting and managing various metrics for the application. It provides methods for recording API requests, processing time, and errors. The metrics collected are intended to be used for monitoring and observability purposes.
class MetricsCollector:
    def __init__(self):
        self.api_requests = Counter('api_requests_total', 'Total API requests', ['endpoint'])
        self.processing_time = Histogram('processing_duration_seconds', 'Time spent processing')
        self.error_count = Counter('errors_total', 'Total errors', ['type'])

The `StructuredLogger` class is responsible for providing a structured logging interface for the application. It uses the `structlog` library to create a logger instance and provides a `log_operation` method to log information about various operations performed by the application.
class StructuredLogger:
    def __init__(self):
        self.logger = structlog.get_logger()
        
    def log_operation(self, operation: str, **kwargs):
        return self.logger.info(operation, **kwargs)

The `CacheManager` class is responsible for managing a cache using a Redis server. It provides methods for fetching data from the cache or setting new data in the cache with a configurable time-to-live (TTL). The class takes a Redis URL as input and uses the `redis-py` library to interact with the Redis server.
class CacheManager:
    def __init__(self, redis_url: str):
        self.redis = Redis.from_url(redis_url)
        self.cache_ttl = 3600

    async def get_or_set(self, key: str, fetch_func, ttl: Optional[int] = None):
        if cached := self.redis.get(key):
            return cached
        value = await fetch_func()
        self.redis.set(key, value, ex=ttl or self.cache_ttl)
        return value

The `APIClient` class is responsible for making API requests with retry logic. It takes a `ClientSession` and `Settings` object as input, and provides a `fetch_with_retry` method that will make up to 3 attempts to fetch data from a given URL, with exponential backoff between attempts.
class APIClient:
    def __init__(self, session: aiohttp.ClientSession, config: Settings):
        self.session = session
        self.config = config
        self._rate_limit_delay = 0.1
    
    async def fetch_with_retry(self, url: str, **kwargs):
        for attempt in range(3):
            try:
                async with self.session.request(**kwargs) as response:
                    return await response.json()
            except Exception as e:
                if attempt == 2:
                    raise
                await asyncio.sleep(2 ** attempt)

The `ImageService` class is responsible for managing the generation of images using an external API. It takes an `APIClient` instance and a `MetricsCollector` instance as dependencies, which are used to make API requests and collect metrics about the image generation process, respectively.
class ImageService:
    def __init__(self, api_client: APIClient, metrics: MetricsCollector):
        self.api_client = api_client
        self.metrics = metrics
        
    async def generate_image(
        self,
        prompt: str,
        output_dir: str,
        width: int = 1024,
        height: int = 768,
    ) -> Tuple[Optional[Image.Image], Optional[str]]:
        with self.metrics.processing_time.time():
            try:
                response = await self.api_client.fetch_with_retry(
                    "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-dev",
                    method="POST",
                    json={"inputs": prompt}
                )
                image = Image.open(io.BytesIO(response.content))
                timestamp = int(time.time())
                filename = f"image_{timestamp}.png"
                output_path = os.path.join(output_dir, filename)
                image.save(output_path)
                return image, output_path
            except Exception as e:
                self.metrics.error_count.labels(type='image_generation').inc()
                return None, None

The `ContentGenerator` class is responsible for generating content, such as blog posts, using an AI-powered content generation service. It takes an `OpenAI` client, a `MetricsCollector`, and a `CacheManager` as dependencies, and provides a `generate_content` method that generates content for a given article, ticker, ticker data, and sentiment.
class ContentGenerator:
    def __init__(self, client: OpenAI, metrics: MetricsCollector, cache: CacheManager):
        self.client = client
        self.metrics = metrics
        self.cache = cache
        
    async def generate_content(
        self,
        article_content: str,
        ticker: str,
        ticker_data: Dict,
        sentiment: Any
    ) -> Tuple[str, str]:
        cache_key = f"content_{hash(article_content)}"
        
        async def generate():
            with self.metrics.processing_time.time():
                response = await self._get_ai_response(
                    article_content, ticker, ticker_data, sentiment
                )
                return self._process_response(response)
                
        return await self.cache.get_or_set(cache_key, generate)

Represents a class that generates complete blog posts, including content, images, and other necessary components.
class CompleteBlogGenerator:
    def __init__(self):
        self.config = Settings()
        self.setup_components()
        
    def setup_components(self):
        self.metrics = MetricsCollector()
        self.logger = StructuredLogger()
        self.cache = CacheManager(self.config.REDIS_URL)
        self.setup_services()
        
    def setup_services(self):
        self.services = {
            'data_pipeline': DataPipeline(self.services, self.config),
            'sentiment': EnhancedSentimentAnalyzer(self.metrics),
            'image': ImageService(self.api_client, self.metrics),
            'content': ContentGenerator(self.client, self.metrics, self.cache),
            'portfolio': PortfolioProcessor(self.services, self.metrics, self.logger)
        }
        
    async def process_ticker(self, category: str, ticker: str):
        try:
            data = await self.services['data_pipeline'].process_data(ticker)
            content = await self.services['content'].generate_content(
                data['articles'],
                ticker,
                data['ticker_data'],
                data['sentiment_data']
            )
            
            image_result = await self.services['image'].generate_image(
                content['title'],
                f"output/{category}/images"
            )
            
            await self.output_manager.save_blog_post(
                category,
                ticker,
                {**content, 'image_path': image_result[1]}
            )
            
        except Exception as e:
            self.metrics.error_count.labels(type='process_ticker').inc()
            self.logger.log_operation('ticker_processing_failed', error=str(e))
            
    async def run(self):
        portfolio = self.read_portfolio()
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.process_ticker(category, ticker)
                for category, tickers in portfolio['portfolio'].items()
                for ticker in tickers
            ]
            await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    generator = CompleteBlogGenerator()
    asyncio.run(generator.run())


# FINAL VERSION

In [6]:
import os
import json
import logging
import asyncio
import aiohttp
import spacy
import time
import io
import re
import html
import yfinance as yf
import structlog
import numpy as np
import uvicorn
import nest_asyncio
import prometheus_client
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from openai import OpenAI
from PIL import Image
from forex_python.converter import CurrencyRates
from pycoingecko import CoinGeckoAPI
from textblob import TextBlob
from tavily import TavilyClient
from redis import Redis
from prometheus_client import Counter, Histogram, start_http_server, core
from pydantic_settings import BaseSettings
from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple, Any
from fastapi import FastAPI, BackgroundTasks

logging.basicConfig(level=logging.DEBUG)


nest_asyncio.apply()


class Settings(BaseSettings):
    GROQ_API_KEY: str
    TAVILY_API_KEY: str
    HF_API_TOKEN: str
    DEBUG_LEVEL: str = "DEBUG"
    REDIS_URL: str
    PROMETHEUS_PORT: int = 8000
    BATCH_SIZE: int = 10
    CACHE_TTL: int = 3600

    class Config:
        env_file = ".env"


class StructuredLogger:
    def __init__(self):
        self.logger = structlog.get_logger()

    def log_operation(self, operation: str, **kwargs):
        return self.logger.info(operation, **kwargs)


class CacheManager:
    def __init__(self, redis_url: str):
        self.redis = Redis.from_url(redis_url)
        self.cache_ttl = 3600

    async def get_or_set(self, key: str, fetch_func, ttl: Optional[int] = None):
        if cached := self.redis.get(key):
            return json.loads(cached)
        value = await fetch_func()
        self.redis.set(key, json.dumps(value), ex=ttl or self.cache_ttl)
        return value


class APIClient:
    def __init__(self, session: aiohttp.ClientSession, config: Settings):
        self.session = session
        self.config = config
        self._rate_limit_delay = 0.1

    async def fetch_with_retry(self, url: str, **kwargs):
        for attempt in range(3):
            try:
                async with self.session.get(url, **kwargs) as response:
                    return await response.json()
            except Exception as e:
                if attempt == 2:
                    raise
                await asyncio.sleep(2**attempt)


class ImageService:
    def __init__(self, api_client: APIClient):
        self.api_client = api_client
        self.base_delay = 10  # Start with a 20-second delay

    async def generate_image(
        self,
        prompt: str,
        output_dir: str,
        width: int = 1024,
        height: int = 768,
        inference_steps: int = 50,
        guidance_scale: float = 7.5,
        scheduler: str = "DDIM",
    ) -> Tuple[Optional[Image.Image], Optional[str]]:
        try:
            os.makedirs(output_dir, exist_ok=True)

            # Implement exponential backoff with initial longer delay
            await asyncio.sleep(self.base_delay)

            async with self.api_client.session.post(
                "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-dev",
                headers={
                    "Authorization": f"Bearer {self.api_client.config.HF_API_TOKEN}"
                },
                json={
                    "inputs": prompt,
                    "parameters": {
                            "width": width,
                            "height": height,
                            "inference_steps": inference_steps,
                            "guidance_scale": guidance_scale,
                            "scheduler": scheduler,
                        },
                },
            ) as response:
                if response.status == 429:
                    # Increase base delay for subsequent requests
                    self.base_delay *= 1.5
                    logging.info(
                        f"Rate limit hit, increasing delay to {self.base_delay} seconds"
                    )
                    return None, None

                if response.status == 200:
                    image_data = await response.read()
                    timestamp = int(time.time())
                    filename = f"image_{timestamp}.png"
                    output_path = os.path.join(output_dir, filename)

                    with open(output_path, "wb") as f:
                        f.write(image_data)

                    image = Image.open(output_path)
                    logging.info(
                        f"Successfully generated and saved image to {output_path}"
                    )
                    return image, output_path
                else:
                    logging.error(f"API request failed with status {response.status}")
                    return None, None

        except Exception as e:
            logging.error(f"Image generation failed: {str(e)}")
            return None, None

class DataPipeline:
    def __init__(self, services: Dict, config: Settings):
        self.tavily_client = TavilyClient(api_key=config.TAVILY_API_KEY)
        self.config = config
        self.cache = services.get("cache")
        self.logger = services.get("logger")

    async def process_data(self, ticker: str) -> Dict:
        try:
            # Now using the actual ticker (e.g. BTC-USD) not the category (e.g. CRYPTO)
            ticker_data = await self._get_ticker_data(
                ticker
            )  # This will query for BTC-USD
            articles = await self._fetch_articles(
                ticker
            )  # This will search for BTC-USD news
            sentiment_data = await self._analyze_sentiment(articles)

            return {
                "ticker_data": ticker_data,
                "articles": articles,
                "sentiment_data": sentiment_data,
            }
        except Exception as e:
            self.logger.log_operation("data_pipeline_failed", error=str(e))
            raise

    async def _get_ticker_data(self, ticker: str) -> Dict:
        cache_key = f"ticker_data_{ticker}"

        def convert_numpy_types(obj):
            if isinstance(obj, np.generic):
                return obj.item()
            elif isinstance(obj, dict):
                return {k: convert_numpy_types(v) for k, v in obj.items()}
            elif isinstance(obj, list):
                return [convert_numpy_types(v) for v in obj]
            elif isinstance(obj, tuple):
                return tuple(convert_numpy_types(v) for v in obj)
            return obj

        async def fetch():
            ticker_obj = yf.Ticker(ticker)
            info = ticker_obj.info
            history = ticker_obj.history(period="1mo")
            return convert_numpy_types(
                {
                    "info": info,
                    "price": history["Close"].iloc[-1],
                    "volume": history["Volume"].iloc[-1],
                    "change": (history["Close"].iloc[-1] - history["Close"].iloc[0])
                    / history["Close"].iloc[0],
                }
            )

        return await self.cache.get_or_set(cache_key, fetch)

    async def _fetch_articles(self, ticker: str) -> List[Dict]:
        cache_key = f"articles_{ticker}"

        async def fetch():
            search_results = self.tavily_client.search(
                query=f"Latest {ticker} news and analysis",
                search_depth="advanced",
                max_results=10,
            )
            return [
                {
                    "title": result["title"],
                    "content": result["content"],
                    "url": result["url"],
                    "published_date": result.get("published_date"),
                }
                for result in search_results["results"]
            ]

        return await self.cache.get_or_set(cache_key, fetch)

    async def _analyze_sentiment(self, articles: List[Dict]) -> Dict:
        combined_text = " ".join([article["content"] for article in articles])
        blob = TextBlob(combined_text)

        return {
            "polarity": blob.sentiment.polarity,
            "subjectivity": blob.sentiment.subjectivity,
            "article_count": len(articles),
        }


class EnhancedSentimentAnalyzer:
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")

    async def analyze(self, texts: List[str]) -> Dict[str, Any]:
        try:
            combined_text = " ".join(texts)
            blob = TextBlob(combined_text)
            textblob_sentiment = blob.sentiment.polarity
            subjectivity = blob.sentiment.subjectivity
            entity_sentiment = self._get_entity_sentiment(texts)
            temporal_sentiment = self._get_temporal_sentiment(texts)

            return {
                "overall": {
                    "polarity": textblob_sentiment,
                    "subjectivity": subjectivity,
                },
                "entities": entity_sentiment,
                "temporal": temporal_sentiment,
                "confidence_score": self._calculate_confidence(
                    textblob_sentiment, subjectivity
                ),
            }
        except Exception as e:
            raise

    def _get_textblob_sentiment(self, texts: List[str]) -> float:
        combined_text = " ".join(texts)
        blob = TextBlob(combined_text)
        return blob.sentiment.polarity

    def _get_entity_sentiment(self, texts: List[str]) -> Dict[str, float]:
        combined_text = " ".join(texts)
        doc = self.nlp(combined_text)
        entities = doc.ents
        entity_sentiments = {}
        for entity in entities:
            entity_text = entity.text
            blob = TextBlob(entity_text)
            sentiment = blob.sentiment.polarity
            entity_sentiments[entity_text] = sentiment
        return entity_sentiments

    def _get_temporal_sentiment(
        self, texts: List[str], num_periods: int = 3
    ) -> List[float]:
        total_texts = len(texts)
        if total_texts == 0:
            return []
        period_size = max(1, total_texts // num_periods)
        sentiments = []
        for i in range(num_periods):
            start = i * period_size
            end = (i + 1) * period_size if i != num_periods - 1 else total_texts
            period_texts = texts[start:end]
            combined_period_text = " ".join(period_texts)
            blob = TextBlob(combined_period_text)
            sentiment = blob.sentiment.polarity
            sentiments.append(sentiment)
        return sentiments

    def _calculate_confidence(self, polarity: float, subjectivity: float) -> float:
        # Confidence calculation: higher subjectivity means lower confidence
        # Confidence = 1 - subjectivity
        return 1 - subjectivity


class PortfolioProcessor:
    def __init__(self, services: Dict, logger: StructuredLogger):
        self.services = services
        self.logger = logger

    def read_portfolio(self) -> Dict[str, List[str]]:
        try:
            with open("portfolio.json", "r") as f:
                portfolio = json.load(f)
                # Log the actual tickers (not categories)
                self.logger.log_operation(
                    "portfolio_loaded",
                    tickers_count=sum(len(v) for v in portfolio.values()),
                )
                return portfolio  # Ensure this returns category-to-ticker mappings
        except Exception as e:
            self.logger.log_operation("portfolio_read_failed", error=str(e))
            return {"crypto": [], "forex": [], "stocks": []}


class CompleteBlogGenerator:
    def __init__(self):
        self.config = Settings()
        self.setup_components()
        self.client = OpenAI(
            base_url="https://api.groq.com/openai/v1", api_key=self.config.GROQ_API_KEY
        )
        self.nlp = spacy.load("en_core_web_sm")

    def setup_components(self):
        self.logger = StructuredLogger()
        self.cache = CacheManager(self.config.REDIS_URL)
        self.setup_services()

    def setup_services(self):
        # Create a single session to be shared across services
        self.session = aiohttp.ClientSession()
        self.api_client = APIClient(self.session, self.config)

        self.services = {
            "logger": self.logger,
            "cache": self.cache,
            "api_client": self.api_client,
        }

        self.services.update(
            {
                "data_pipeline": DataPipeline(self.services, self.config),
                "sentiment": EnhancedSentimentAnalyzer(),
                "portfolio": PortfolioProcessor(self.services, self.logger),
                "image": ImageService(self.api_client),
            }
        )

    async def save_blog_post(self, category: str, ticker: str, content: Dict):
        try:
            output_dir = f"output/{category}/posts"
            os.makedirs(output_dir, exist_ok=True)

            filename = f"{ticker}_{datetime.now().strftime('%Y%m%d%H%M%S')}.md"
            output_path = os.path.join(output_dir, filename)

            with open(output_path, "w") as f:
                # Title and timestamp
                f.write(f"# {content['title']}\n\n")
                f.write(f"**Timestamp:** {content['timestamp']}\n\n")

                # Display the generated image with a descriptive caption
                if content.get("image_path"):
                    f.write("## Market Analysis Visualization\n\n")
                    f.write(
                        f"![Market Analysis for {ticker}]({content['image_path']})\n\n"
                    )
                    f.write(
                        f"*Generated visualization based on market analysis for {ticker}*\n\n"
                    )

                # Rest of the content
                f.write(f"**Content:**\n\n{content['content']}\n\n")

                # Sentiment analysis
                f.write("**Sentiment Analysis:**\n\n")
                sentiment = content.get("sentiment", {})
                f.write(f"- Polarity: {sentiment.get('polarity', 'N/A')}\n")
                f.write(f"- Subjectivity: {sentiment.get('subjectivity', 'N/A')}\n")
                f.write(f"- Article Count: {sentiment.get('article_count', 'N/A')}\n\n")

                f.write(f"**Image Prompt:**\n\n{content.get('image_prompt', '')}\n\n")

            logging.info(f"Saved blog post for {ticker} to {output_path}")
        except Exception as e:
            logging.error(f"Failed to save blog post for {ticker}: {e}")

    def clean_html(self, html_text: str) -> str:
        soup = BeautifulSoup(html_text, "html.parser")
        text = soup.get_text()
        text = re.sub(r"\s+", " ", text).strip()
        text = html.unescape(text)
        return text

    def extract_nouns(self, text: str) -> List[str]:
        # Convert dict to string if needed
        if isinstance(text, dict):
            text = str(text)
        doc = self.nlp(text)
        nouns = [token.text for token in doc if token.pos_ in ["NOUN", "PROPN"]]
        return list(set(nouns))

    def summarize_with_spacy(self, text: str, max_sentences: int = 3) -> str:
        # Convert dict to string if needed
        if isinstance(text, dict):
            text = str(text)
        doc = self.nlp(text)
        sentences = [sent.text.strip() for sent in doc.sents]

        word_freq = {}
        for token in doc:
            if not token.is_stop and not token.is_punct:
                word_freq[token.text] = word_freq.get(token.text, 0) + 1

        sentence_scores = {}
        for sent in sentences:
            score = sum(word_freq.get(word, 0) for word in sent.split())
            sentence_scores[sent] = score

        summary_sentences = sorted(
            sentence_scores.items(), key=lambda x: x[1], reverse=True
        )[:max_sentences]

        return " ".join(sent[0] for sent in summary_sentences)

    def generate_refined_prompt_for_blog_post(self, post_content: str) -> str:
        summary = self.summarize_with_spacy(post_content)
        nouns = self.extract_nouns(summary)
        key_elements = ", ".join(nouns[:7])

        prompt = f"""
        Create a visually-rich prompt for AI image generation based on the summary of this blog post:

        {summary}

        Key visual themes: {key_elements}

        The prompt should:
        1. Be concise and focused on visual elements.
        2. Seamlessly incorporate key visual concepts without explicitly listing them.
        3. Maintain a professional tone inline with {summary}.
        4. Be optimized for AI image generation models, emphasizing clarity and detail.

        Please provide only the refined prompt without any additional commentary or explanation.
        """

        try:
            response = self.client.chat.completions.create(
                model="llama-3.2-3b-preview",
                messages=[
                    {
                        "role": "system",
                        "content": "You are an expert at creating prompts for AI image generation.",
                    },
                    {"role": "user", "content": prompt},
                ],
                max_tokens=100,
                temperature=0.7,
            )

            if response.choices[0].message.content:
                return response.choices[0].message.content.strip()
            else:
                "A modern illustration representing key themes from the blog post."

        except Exception as e:
            self.logger.log_operation("prompt_generation_failed", error=str(e))
            return "A modern illustration representing key themes from the blog post."

    async def generate_content(self, data: Dict, sentiment_data: Dict) -> Dict:
        """Generate blog post content using the OpenAI API"""
        try:
            # Extract sentiment values directly from sentiment_data
            sentiment_value = data["sentiment_data"]["polarity"]
            confidence_value = data["sentiment_data"]["subjectivity"]

            # Get ticker data directly
            ticker_name = data["ticker_data"]["info"]["shortName"]
            ticker_price = float(data["ticker_data"]["price"])
            ticker_change = float(data["ticker_data"]["change"])

            # Get article titles with list comprehension
            article_titles = " ".join(
                article["title"] for article in data["articles"][:3]
            )

            prompt = f"""
            Write a detailed blog post analyzing {ticker_name} based on:
            
            Market Data:
            - Current Price: ${ticker_price:.2f}
            - Price Change: {ticker_change:.2%}
            
            Sentiment Analysis:
            - Overall Sentiment: {sentiment_value:.2f}
            - Confidence Score: {confidence_value:.2f}
            
            Recent News:
            {article_titles}
            
            Include technical analysis, market sentiment discussion, and future outlook.
            Format in Markdown.
            """

            response = self.client.chat.completions.create(
                model="llama-3.2-3b-preview",
                messages=[
                    {
                        "role": "system",
                        "content": "You are a professional financial analyst and writer.",
                    },
                    {"role": "user", "content": prompt},
                ],
                max_tokens=2000,
                temperature=0.0,
            )

            return {
                "content": response.choices[0].message.content,
                "title": f"Analysis: {ticker_name}",
                "timestamp": datetime.now().isoformat(),
                "sentiment": data["sentiment_data"],
                "image_prompt": self.generate_refined_prompt_for_blog_post(
                    response.choices[0].message.content
                ),
            }

        except Exception as e:
            print(f"Error details: {str(e)}")
            self.logger.log_operation("content_generation_failed", error=str(e))
            raise

    async def process_ticker(self, category: str, ticker: str):
        try:
            logging.info(f"Processing ticker: {ticker}")

            # Fetch data specific to the ticker
            logging.debug(f"Fetching data for ticker: {ticker}")
            data = await self.services["data_pipeline"].process_data(ticker)

            # Analyze sentiment for related news articles
            logging.debug(f"Analyzing sentiment for ticker: {ticker}")
            sentiment = await self.services["sentiment"].analyze(
                [article["content"] for article in data["articles"]]
            )

            # Generate blog content
            logging.debug(f"Generating content for ticker: {ticker}")
            content = await self.generate_content(data, sentiment)

            # Add a delay before each API request to avoid hitting rate limits
            await asyncio.sleep(3)  # Adjust the delay as needed

            # Generate an image prompt and create an image
            logging.debug(f"Generating image prompt for ticker: {ticker}")
            image_prompt = self.generate_refined_prompt_for_blog_post(content)

            # Add a delay before each API request to avoid hitting rate limits
            await asyncio.sleep(2)  # Adjust the delay as needed

            logging.debug(f"Generating image for ticker: {ticker}")
            image_result = await self.services["image"].generate_image(
                image_prompt, f"output/{category}/images"
            )

            # Add the generated image path to the blog content
            content["image_path"] = image_result[1] if image_result[1] else None

            # Save the blog post
            logging.debug(f"Saving blog post for ticker: {ticker}")
            await self.save_blog_post(category, ticker, content)
        except Exception as e:
            logging.error(f"Ticker processing failed for {ticker}: {e}")

    async def run(self):
        portfolio = self.services["portfolio"].read_portfolio()
        async with aiohttp.ClientSession() as session:
            tasks = []
            for category, tickers in portfolio.items():
                for ticker in tickers:
                    tasks.append(self.process_ticker(category, ticker))
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for result in results:
                if isinstance(result, Exception):
                    logging.error(f"Task failed with exception: {result}")


if __name__ == "__main__":
    generator = CompleteBlogGenerator()
    asyncio.run(generator.run())

[2m2024-11-26 11:22:43[0m [[32m[1minfo     [0m] [1mportfolio_loaded              [0m [36mtickers_count[0m=[35m16[0m


ERROR:root:API request failed with status 400
ERROR:root:API request failed with status 400
ERROR:root:API request failed with status 400
