<a href="https://colab.research.google.com/github/matheus896/agentic-rag/blob/main/gemini_open_deep_researcher_for_me.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install nest_asyncio gradio aiohttp google-generativeai
import nest_asyncio
nest_asyncio.apply()

In [12]:
import nest_asyncio
nest_asyncio.apply()

import asyncio
import aiohttp
import json
import google.generativeai as genai
from datetime import date

# =======================
# Configuration Constants
# =======================
GEMINI_API_KEY = "" # Replace with your Gemini API key
SERPAPI_API_KEY = "" # Replace with your SERPAPI API key
JINA_API_KEY = "" # Replace with your JINA API key

# Endpoints
SERPAPI_URL = "https://serpapi.com/search"
JINA_BASE_URL = "https://r.jina.ai/"

# Default LLM model (can be changed if desired)
DEFAULT_GEMINI_MODEL = "gemini-2.0-flash"

genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel(DEFAULT_GEMINI_MODEL)


# ============================
# Asynchronous Helper Functions
# ============================

async def call_gemini_async(messages, model_name=DEFAULT_GEMINI_MODEL):
    """
    Asynchronously call the Gemini API with the provided messages.
    Returns the text content of the response.
    """
    try:
        response = gemini_model.generate_content([msg['content'] for msg in messages])
        return response.text
    except Exception as e:
        print(f"Gemini API error: {e}")
        return None


async def generate_search_queries_async(session, interests):
    """
    Ask the LLM to produce up to four precise search queries (in Python list format)
    based on the user’s query, using Gemini. Handles markdown code blocks in response.
    """
    prompt = (
        "You are an expert research assistant. Given the user's query, generate up to four distinct, "
        "precise search queries that would help gather comprehensive information on the topic. "
        "Return only a Python list of strings, for example: ['query1', 'query2', 'query3']."
    )
    messages = [
        {"role": "system", "content": "You are a helpful and precise research assistant."},
        {"role": "user", "content": f"User Interests: {interests}\n\n{prompt}"}
    ]
    response = await call_gemini_async(messages)
    if response:
        try:
            # Check if the response is enclosed in a markdown code block
            if "```python" in response and "```" in response:
                start_index = response.find("```python") + len("```python")
                end_index = response.rfind("```") # Use rfind to find the last occurrence
                if start_index != -1 and end_index != -1 and start_index < end_index:
                    code_block_content = response[start_index:end_index].strip()
                    # Evaluate the code block content as Python list
                    search_queries = eval(code_block_content)
                else:
                    print("Could not properly extract code block, evaluating full response as fallback.")
                    search_queries = eval(response) # Fallback to evaluate full response
            else:
                # Try to evaluate the response directly if not in code block (fallback)
                search_queries = eval(response)

            if isinstance(search_queries, list):
                return search_queries
            else:
                print("LLM did not return a list. Response:", response)
                return []
        except Exception as e:
            print("Error parsing search queries:", e, "\nResponse:", response)
            print("Raw Gemini Response (for debugging):", response) # Print raw response
            return []
    return []


async def perform_search_async(session, query):
    """
    Asynchronously perform a Google search using SERPAPI for the given query.
    Returns a list of result URLs.
    """
    params = {
        "q": query,
        "api_key": SERPAPI_API_KEY,
        "engine": "google" # Focuses search on news results
    }
    try:
        async with session.get(SERPAPI_URL, params=params) as resp:
            if resp.status == 200:
                results = await resp.json()
                if "organic_results" in results:
                    links = [item.get("link") for item in results["organic_results"] if "link" in item]
                    return links
                else:
                    print("No organic results in SERPAPI response.")
                    return []
            else:
                text = await resp.text()
                print(f"SERPAPI error: {resp.status} - {text}")
                return []
    except Exception as e:
        print("Error performing SERPAPI search:", e)
        return []


async def fetch_webpage_text_async(session, url):
    """
    Asynchronously retrieve the text content of a webpage using Jina.
    The URL is appended to the Jina endpoint.
    """
    full_url = f"{JINA_BASE_URL}{url}"
    headers = {
        "Authorization": f"Bearer {JINA_API_KEY}"
    }
    try:
        async with session.get(full_url, headers=headers) as resp:
            if resp.status == 200:
                return await resp.text()
            else:
                text = await resp.text()
                print(f"Jina fetch error for {url}: {resp.status} - {text}")
                return ""
    except Exception as e:
        print("Error fetching webpage text with Jina:", e)
        return ""


async def is_page_useful_async(session, user_query, page_text):
    """
    Ask the LLM (Gemini) if the provided webpage content is useful for answering the user's query.
    The LLM must reply with exactly "Yes" or "No".
    """
    prompt = (
        "You are a critical news evaluator. Given the user's interests and the content of a webpage, "
        "determine if the webpage contains relevant news information. "
        "Respond with exactly one word: 'Yes' if the page is useful, or 'No' if it is not. Do not include any extra text."
    )
    messages = [
        {"role": "system", "content": "You are a strict and concise evaluator of news relevance."},
        {"role": "user", "content": f"User Interests: {user_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_gemini_async(messages)
    if response:
        answer = response.strip()
        if answer in ["Yes", "No"]:
            return answer
        else:
            # Fallback: try to extract Yes/No from the response.
            if "Yes" in answer:
                return "Yes"
            elif "No" in answer:
                return "No"
    return "No"


async def extract_relevant_context_async(session, user_query, search_query, page_text):
    """
    Given the user's interests, the search query used, and the page content,
    have Gemini extract all information relevant for creating a news summary.
    """
    prompt = (
        "You are an expert news information extractor. Given the user's interests, the search query that led to this page, "
        "and the webpage content, extract all pieces of information that are relevant for a news summary. "
        "Return only the relevant context as plain text, ready to be included in a news summary, without commentary."
    )
    messages = [
        {"role": "system", "content": "You are an expert in extracting and summarizing relevant news information."},
        {"role": "user", "content": f"User Interests: {user_query}\nSearch Query: {search_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_gemini_async(messages)
    if response:
        return response.strip()
    return ""


async def get_new_search_queries_async(session, user_interests, previous_search_queries, all_contexts):
    """
    Decide if new search queries are needed. For news summarization, we may not need iterative queries, so this might be simplified.
    For now, let's assume we do not need new search queries after the initial set.
    """
    return "<done>" #  For news summarization, we can stop after the first round of searches.


async def generate_final_report_async(session, user_name, interests, date_str, all_contexts_dict):
    """
    Generate the final comprehensive news summary report using all gathered contexts, organized by interest category.
    """
    report_date = date.fromisoformat(date_str)
    formatted_date = report_date.strftime('%d de Fevereiro de %Y')

    report_title = f"Resumo Diário Personalizado - {user_name} - {formatted_date}"
    report_content = f"**{report_title}**\n\n**Instruções:** Este é um resumo de notícias diário personalizado...\n\n" # Add intro text from promptcustom.txt if needed

    report_content += "## Resumo de Notícias\n\n"

    for interest, contexts in all_contexts_dict.items():
        if contexts:
            report_content += f"### {interest.capitalize()}\n\n" # Capitalize interest category
            for context in contexts:
                report_content += f"* {context}\n\n"
        else:
            report_content += f"### {interest.capitalize()}\n\n"
            report_content += "Nenhuma notícia relevante encontrada para esta categoria.\n\n"

    messages = [
        {"role": "system", "content": "You are a skilled news report writer."},
        {"role": "user", "content": f"User Interests: {interests}\n\nGathered Relevant Contexts:\n{report_content}\n\nPrompt: Refine the above draft news summary into a well-structured and detailed report, ready for final presentation. Ensure it is comprehensive, objective, and adheres to high journalistic standards."}
    ]
    refined_report = await call_gemini_async(messages)
    return refined_report if refined_report else report_content # Return refined report or draft if refinement fails


async def process_link(session, link, user_interests, search_query):
    """
    Process a single link: fetch its content, judge its usefulness using Gemini, and if useful, extract the relevant context using Gemini.
    """
    print(f"Fetching content from: {link}")
    page_text = await fetch_webpage_text_async(session, link)
    if not page_text:
        return None
    usefulness = await is_page_useful_async(session, user_interests, page_text) # Pass user interests instead of generic query
    print(f"Page usefulness for {link}: {usefulness}")
    if usefulness == "Yes":
        context = await extract_relevant_context_async(session, user_interests, search_query, page_text) # Pass user interests
        if context:
            print(f"Extracted context from {link} (first 200 chars): {context[:200]}")
            return context
    return None


# =========================
# Main Asynchronous Routine
# =========================

async def async_main():
    # --- User Profile from promptcustom.txt ---
    user_name = "Matheus almeida"
    report_date_str = "2025-02-09" # Date set to 09.02.2025
    user_interests = [
        "IA",
        "Notícias de tecnologia (geral)",
        "Notícias de negócios (geral)",
        "Mercados financeiros", # (subcategorias: Renda Fixa, FIIS, Ações)", # Removed subcategories for initial query generation
        "Notícias políticas dos EUA e Brasil (relevantes)", # (subcategoria: atualizações sobre Juros)", # Removed subcategories
        "Notícias globais importantes",
        "Notícias do Futebol Brasileiro, Time Corinthias do brasil, NBA, e grandes histórias em outros esportes",
        "Outras notícias importantes (somente principais)"
    ]

    iter_limit_input = "1" # Limit to 1 iteration for news summary
    iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 1 # Set iteration limit to 1

    aggregated_contexts = {}    # Changed to dict to store contexts by interest
    for interest in user_interests:
        aggregated_contexts[interest] = []
    all_search_queries = []     # Every search query used across iterations
    iteration = 0

    async with aiohttp.ClientSession() as session:
        # ----- INITIAL SEARCH QUERIES -----
        new_search_queries = await generate_search_queries_async(session, user_interests)
        if not new_search_queries:
            print("No search queries were generated by the LLM. Exiting.")
            return
        all_search_queries.extend(new_search_queries)

        # ----- ITERATIVE RESEARCH LOOP -----
        while iteration < iteration_limit: # Will only run once now
            print(f"\n=== Iteration {iteration + 1} ===")
            iteration_contexts_for_interest = {interest: [] for interest in user_interests} # Contexts for this iteration, by interest

            # For each search query (and interest), perform SERPAPI searches concurrently.
            search_tasks = [perform_search_async(session, query) for query in new_search_queries]
            search_results = await asyncio.gather(*search_tasks)

            # Aggregate all unique links from all search queries of this iteration.
            unique_links_by_interest = {} # {interest: {link: search_query}}
            for idx, links in enumerate(search_results):
                interest = user_interests[idx] # Assuming order of queries matches interests
                query = new_search_queries[idx]
                unique_links_by_interest[interest] = {}
                for link in links:
                    if link not in unique_links_by_interest[interest]:
                        unique_links_by_interest[interest][link] = query

            total_unique_links_iteration = sum(len(links_dict) for links_dict in unique_links_by_interest.values())
            print(f"Aggregated {total_unique_links_iteration} unique links from this iteration.")


            # Process each link concurrently: fetch, judge, and extract context.
            link_tasks = []
            for interest, links_map in unique_links_by_interest.items():
                for link, search_query in links_map.items():
                    link_tasks.append(process_link(session, link, user_interests, search_query)) # Pass user_interests

            link_results = await asyncio.gather(*link_tasks)

            link_results_iter = iter(link_results) # Iterator to assign results back to interests

            # Collect non-None contexts and assign to respective interests.
            for interest, links_map in unique_links_by_interest.items():
                for link, search_query in links_map.items():
                    result_context = next(link_results_iter)
                    if result_context:
                        iteration_contexts_for_interest[interest].append(result_context)


            for interest in user_interests:
                if iteration_contexts_for_interest[interest]:
                    aggregated_contexts[interest].extend(iteration_contexts_for_interest[interest])
                else:
                    print(f"No useful contexts were found for {interest} in this iteration.")


            # ----- ASK THE LLM IF MORE SEARCHES ARE NEEDED -----
            # For news summary, we stop after one iteration.
            new_search_queries = await get_new_search_queries_async(session, user_interests, all_search_queries, aggregated_contexts)
            if new_search_queries == "<done>":
                print("LLM indicated that no further research is needed.")
                break
            elif new_search_queries:
                print("LLM provided new search queries:", new_search_queries)
                all_search_queries.extend(new_search_queries)
                print(f"Generated Search Queries: {new_search_queries}")
            else:
                print("LLM did not provide any new search queries. Ending the loop.")
                break


            iteration += 1

        # ----- FINAL REPORT -----
        print("\nGenerating final report...")
        final_report = await generate_final_report_async(session, user_name, user_interests, report_date_str, aggregated_contexts)
        print("\n==== FINAL REPORT ====\n")
        print(final_report)


def main():
    asyncio.run(async_main())


if __name__ == "__main__":
    main()


=== Iteration 1 ===
Aggregated 37 unique links from this iteration.
Fetching content from: https://www.forbes.com/sites/bernardmarr/2024/12/16/6-game-changing-ai-breakthroughs-that-defined-2024/
Fetching content from: https://www.pbs.org/newshour/show/how-artificial-intelligence-impacted-our-lives-in-2024-and-whats-next
Fetching content from: https://www.trendmicro.com/en_us/research/25/a/top-ai-trends-from-2024-review.html
Fetching content from: https://www.channelinsider.com/managed-services/generative-ai-developments-trends-year-in-review/
Fetching content from: https://www.inc.com/kit-eaton/what-happened-in-ai-in-2024-and-what-will-the-new-year-bring/91070073
Fetching content from: https://blog.google/technology/ai/google-ai-big-scientific-breakthroughs-2024/
Fetching content from: https://finance.yahoo.com/news/top-13-artificial-intelligence-ai-222907215.html
Fetching content from: https://www.politico.com/news/brazil
Fetching content from: https://www.nytimes.com/2025/01/16/worl

In [2]:
# Teste chamada de LLM

import google.generativeai as genai

# Replace with your actual Gemini API key
GEMINI_API_KEY = ""
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash"

genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel(DEFAULT_GEMINI_MODEL)

async def call_gemini_async(messages, model_name=DEFAULT_GEMINI_MODEL):
    """
    Asynchronously call the Gemini API with the provided messages.
    Returns the text content of the response.
    """
    try:
        response = gemini_model.generate_content([msg['content'] for msg in messages])
        return response.text
    except Exception as e:
        print(f"Gemini API error: {e}")
        return None

# --- Test the function ---
import asyncio

async def test_gemini():
    messages = [{"role": "user", "content": "What is the capital of France?"}]
    gemini_response = await call_gemini_async(messages)
    print("Gemini API Response (Isolated Test):")
    print(gemini_response)

asyncio.run(test_gemini())

Gemini API Response (Isolated Test):
Paris

