In [6]:
# !pip install googlenewsdecoder
# !pip install pygooglenews

In [7]:
from pygooglenews import GoogleNews
import pprint
from itertools import islice
from googlenewsdecoder import gnewsdecoder
from datetime import datetime, timedelta
import polars as pl
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

  from .autonotebook import tqdm as notebook_tqdm


We are going to define here the search term and start and end dates for our query as well as the language and country of the GoogleNews class.

In [8]:
# 1. SET THE SEARCH PARAMETERS
search_term = 'Sudan'
start_date, end_date = '2024-01-01', '2024-01-31' # 1 month for now
language, country = 'en', 'US' # country must be the isocode of the country

# 2. INITIALIZE GOOGLE NEWS
gn = GoogleNews(lang = language, country=country)

# 3. SET UP DATE RANGE AND BATCHES
start, end = datetime.strptime(start_date, '%Y-%m-%d'), datetime.strptime(end_date, '%Y-%m-%d')
timestep = timedelta(days=1)  # because we want to get 100 articles per day
days_per_batch = 3 # queries will be made for 3-day intervals

def date_range_batches(start, end, days_per_batch):
    current = start
    while current < end:
        yield current, min(current + timedelta(days=days_per_batch), end)
        current += timedelta(days=days_per_batch)

# 4. SET UP THE QUERY
def get_articles(start_date, end_date):
    try:
        query = gn.search(query=search_term, from_=start_date, to_=end_date)
        return query["entries"]
    except Exception as e:
        print(f"Error on {start_date} to {end_date} query: {e}")
        return []
    

# 5. FETCH BATCHES OF ARTICLES 
max_workers = 16

def fetch_batch(batch_start, batch_end, days_per_batch):
    start_str = batch_start.strftime("%Y-%m-%d")
    end_str = batch_end.strftime("%Y-%m-%d")
    articles = get_articles(start_str, end_str)

    if len(articles) >= 100 and days_per_batch > 1:
        tqdm.write(f"⚠️ 100 articles between {start_str} and {end_str}. Splitting batch...")
        return collect_articles(batch_start, batch_end, 1)
    return articles

def collect_articles(start_date, end_date, days_per_batch):
    results = []
    batches = date_range_batches(start_date, end_date, days_per_batch)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_range = {
            executor.submit(fetch_batch, start, end, days_per_batch): (start, end)
            for start, end in batches
        }

        for future in tqdm(as_completed(future_to_range), total=len(future_to_range), desc="Fetching batches", dynamic_ncols=False, leave=True):
            batch_results = future.result()
            results.extend(batch_results)

    return results

# Run it
articles = collect_articles(start, end, days_per_batch)

# Format into Polars DataFrame
rows = [
    {
        "title": article.get("title"),
        "google_link": article.get("link"),
        "published": article.get("published"),
        "source": article.get("source", {}).get("title") if article.get("source") else None,
    }
    for article in articles
]

df = pl.DataFrame(rows).unique(subset=["google_link"])

print('------------------------------------------------------')
print('TOTAL NUMBER OF ARTICLES:' , len(df['title']))  
print('Average articles per day:', round(len(df['title']) / ((end - start).days + 1)))
print(df.head())

Fetching batches:  10%|█         | 1/10 [00:01<00:10,  1.17s/it]

⚠️ 100 articles between 2024-01-19 and 2024-01-22. Splitting batch...


                                                                
                                                                

⚠️ 100 articles between 2024-01-10 and 2024-01-13. Splitting batch...



                                                                
                                                                
                                                                
                                                                
[A                                                    

⚠️ 100 articles between 2024-01-16 and 2024-01-19. Splitting batch...
⚠️ 100 articles between 2024-01-01 and 2024-01-04. Splitting batch...
⚠️ 100 articles between 2024-01-04 and 2024-01-07. Splitting batch...
⚠️ 100 articles between 2024-01-13 and 2024-01-16. Splitting batch...


                                                                
                                                                
Fetching batches:  10%|█         | 1/10 [00:01<00:10,  1.17s/it]

⚠️ 100 articles between 2024-01-07 and 2024-01-10. Splitting batch...
⚠️ 100 articles between 2024-01-28 and 2024-01-31. Splitting batch...
⚠️ 100 articles between 2024-01-22 and 2024-01-25. Splitting batch...



[A

[A[A



[A[A[A[A


[A[A[A






[A[A[A[A[A[A[A





[A[A[A[A[A[A




[A[A[A[A[A







Fetching batches: 100%|██████████| 3/3 [00:01<00:00,  2.01it/s]
Fetching batches:  20%|██        | 2/10 [00:02<00:11,  1.41s/it]
[A



[A[A[A[A




[A[A[A[A[A
[A






Fetching batches: 100%|██████████| 3/3 [00:01<00:00,  2.11it/s]








Fetching batches:  30%|███       | 3/10 [00:03<00:07,  1.09s/it]



[A[A[A[A






[A[A[A[A[A[A[A


[A[A[A





[A[A[A[A[A[A






Fetching batches: 100%|██████████| 3/3 [00:01<00:00,  1.89it/s]
Fetching batches:  40%|████      | 4/10 [00:03<00:04,  1.34it/s]



[A[A[A[A







[A[A[A[A[A[A[A[A


Fetching batches: 100%|██████████| 3/3 [00:01<00:00,  1.70it/s]


[A[A




Fetching batches:  50%|█████     | 5/10 [00:03<00:02,  1.85it/s]


[A[A[A







[A[A[A[A[A[A[A[A

[A[A




[A[A[A[A[A





Fetching batches: 100%|██████████| 3/3 [00:02<00:00,  1.45it/s]
Fetching batches

------------------------------------------------------
TOTAL NUMBER OF ARTICLES: 851
Average articles per day: 27
shape: (5, 4)
┌──────────────────────────┬──────────────────────────┬──────────────────┬─────────────────────────┐
│ title                    ┆ google_link              ┆ published        ┆ source                  │
│ ---                      ┆ ---                      ┆ ---              ┆ ---                     │
│ str                      ┆ str                      ┆ str              ┆ str                     │
╞══════════════════════════╪══════════════════════════╪══════════════════╪═════════════════════════╡
│ UN peacekeeper among 52  ┆ https://news.google.com/ ┆ Mon, 29 Jan 2024 ┆ Anadolu Ajansı          │
│ killed…                  ┆ rss/ar…                  ┆ 08:00:00 GMT     ┆                         │
│ World Report 2024:       ┆ https://news.google.com/ ┆ Thu, 11 Jan 2024 ┆ Human Rights Watch      │
│ Rights Tren…             ┆ rss/ar…                  ┆ 15:12:41

This method seems to be working really well -- we extract articles in 3-day batches and for those batches with 100 articles (max number) we scale down the batch size to one day. We could even do that again and get a few-hours batch size for days with a lot of activity, but that might just welcome a lot of noise and multiple articles about the same event. Consider.
The time was 5-6seconds for a 31-day interval from which we obtained 850 articles.

Next, we decode the URLs from Google News format to "regular" format, so we can obtain the text in the next step.

In [9]:
indexed_urls = list(enumerate(df['google_link'].to_list()))  # this will make a list of tuples (index, url)

def decode_one_url(idx_url_tuple):
    idx, url = idx_url_tuple
    interval_time = 1  # interval is optional, default is None
    #proxy = "http://user:pass@localhost:8080" # proxy is optional, default is None

    try:
        decoded_url = gnewsdecoder(url, 
                                   interval=interval_time, 
                                   #proxy=proxy
                                   )
        if decoded_url.get("status"):
            return (idx, decoded_url["decoded_url"])
        else:
            tqdm.write(f"Error: {decoded_url['message']}")
    except Exception as e:
        tqdm.write(f"Error occurred: {e}")
    return (idx, None)

def decode_urls_concurrently(indexed_url_list, max_workers=16):
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(decode_one_url, (idx, url)): idx for idx, url in indexed_url_list}
        for future in tqdm(as_completed(futures), total=len(futures), desc='decoding URLs', dynamic_ncols=False, leave=True):
            result = future.result() # (index, decoded_url)
            if result:
                results.append(result)
    return results # results will be a list of tuples (index, decoded_url)

decoded_urls = decode_urls_concurrently(indexed_urls)

decoded_dict = dict(decoded_urls)
final_decoded_list = [decoded_dict.get(i, None) for i in range(len(df))]

df = df.with_columns(pl.Series(name="decoded_url", values=final_decoded_list))

print('TOTAL NUMBER OF URLs DECODED:', len(decoded_urls))
#print('averge urls per day:', round(len(decoded_urls) / ((end - start).days + 1)))

decoding URLs: 100%|██████████| 851/851 [02:37<00:00,  5.39it/s]

TOTAL NUMBER OF URLs DECODED: 851





This process is kinda slow -- took exactly 3min for 100 articles (31 min per 850 articles).

We updated it to be done with ThreadPoolExecutor and then it took **4min30 for 850 articles** (see [info](https://www.digitalocean.com/community/tutorials/how-to-use-threadpoolexecutor-in-python-3-es) and [documentation](https://docs.python.org/3/library/concurrent.futures.html)).

In [10]:
decoded_urls = list(set(decoded_urls))  # Remove duplicates
print(f"Number of unique decoded URLs: {len(decoded_urls)}")

Number of unique decoded URLs: 851


# 2. Extracting the articles' content

In [11]:
# !pip install trafilatura

In [12]:
import httpx
import trafilatura
import polars as pl
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import polars as pl

In [13]:
# Fetch function
def fetch_one_article(idx_url_tuple, timeout=15):
    idx, url = idx_url_tuple
    headers = {
        "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/114.0.0.0 Safari/537.36"
    }
    try:
        with httpx.Client(headers=headers, follow_redirects=True, timeout=timeout) as client:
            response = client.get(url)
            if response.status_code == 200:
                html = response.text
                article_text = trafilatura.extract(html)
                return (idx, article_text)
    except Exception as e:
        tqdm.write(f"[{idx}] Error fetching article: {e}")
    return (idx, None)
    

# run concurrently and add results to a list
full_text_tuples = []

with ThreadPoolExecutor(max_workers=16) as executor:
    futures = {executor.submit(fetch_one_article, (idx, url)): idx for idx, url in decoded_urls}
    full_text_tuples = [future.result() for future in tqdm(as_completed(futures), total=len(futures), desc="Fetching", ncols=80)]

# add to our dataframe
full_texts_dict = dict(full_text_tuples)
final_full_texts = [full_texts_dict.get(i, None) for i in range(len(df))]
df = df.with_columns(pl.Series(name="full_text", values=final_full_texts))
df = df.unique(subset=["decoded_url"])


Fetching:  11%|███▍                            | 93/851 [00:07<00:55, 13.65it/s]

[453] Error fetching article: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1010)


Fetching:  50%|███████████████▋               | 429/851 [00:27<00:19, 22.00it/s]

[383] Error fetching article: The read operation timed out


Fetching:  69%|█████████████████████▎         | 586/851 [00:36<00:15, 17.63it/s]

[6] Error fetching article: The read operation timed out


Fetching:  93%|████████████████████████████▋  | 789/851 [00:47<00:02, 21.77it/s]

[232] Error fetching article: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1010)


Fetching: 100%|███████████████████████████████| 851/851 [00:57<00:00, 14.85it/s]


In [14]:
nulls = df.null_count()["full_text"].item()
percentage = ((len(df) - nulls) / len(df)) * 100
print(f"CORRECTLY FETCHED ARTICLES: {len(df) - nulls}/{len(df)} ({percentage:.2f}%)")

CORRECTLY FETCHED ARTICLES: 650/851 (76.38%)


In [15]:
df.head()

title,google_link,published,source,decoded_url,full_text
str,str,str,str,str,str
"""Algeria expresses solidarity w…","""https://news.google.com/rss/ar…","""Mon, 29 Jan 2024 08:00:00 GMT""","""Xinhua""","""https://english.news.cn/202401…","""ALGIERS, Jan. 28 (Xinhua) -- A…"
"""War in Sudan displaces over 50…","""https://news.google.com/rss/ar…","""Mon, 29 Jan 2024 08:00:00 GMT""","""African Business""","""https://african.business/2024/…","""Download logo “More than 500,0…"
"""Sudan Regional Crisis: Emergen…","""https://news.google.com/rss/ar…","""Tue, 30 Jan 2024 08:00:00 GMT""","""ReliefWeb""","""https://reliefweb.int/report/s…","""OVERVIEW Throughout December 2…"
"""Sudan conflict has displaced m…","""https://news.google.com/rss/ar…","""Sun, 07 Jan 2024 08:00:00 GMT""","""Sudan Tribune""","""https://sudantribune.com/artic…",
"""54 killed in clashes in area c…","""https://news.google.com/rss/ar…","""Tue, 30 Jan 2024 08:00:00 GMT""","""Arab News""","""https://www.arabnews.com/node/…",


# 3. Knowledge Graph

In [None]:
# !pip install "openai" "langchain" "neo4j" "pydantic" "tiktoken"
# !pip install fsspec langchain-text-splitters tiktoken python-dotenv numpy torch neo4j-graphrag google-genai langchain-google-genai "neo4j-graphrag[sentence-transformers]" polars

^C


In [13]:
import os
import neo4j
from langchain_google_genai import GoogleGenerativeAI # Import LangChain's GoogleGenerativeAI
from neo4j_graphrag.generation import GraphRAG
from neo4j_graphrag.llm import LLMInterface, LLMResponse
from langchain_google_genai import GoogleGenerativeAI
from typing import Dict, Any, Optional
import asyncio
from typing import Any, List, Optional, Union
from neo4j_graphrag.message_history import MessageHistory
from neo4j_graphrag.types import LLMMessage


In [2]:
NEO4J_URI = os.getenv('NEO4J_URI')
NEO4J_USERNAME = os.getenv('NEO4J_USERNAME')
NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')

if NEO4J_PASSWORD is not None and GEMINI_API_KEY is not None and NEO4J_URI is not None and NEO4J_USERNAME is not None:
    print('ALL KEYS OBTAINED CORRECTLY')
else:
    print('There is an error with one of the keys!')

ALL KEYS OBTAINED CORRECTLY


In [None]:
driver = neo4j.GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))

MODEL = "gemini-2.5-flash-preview-04-17"

class GeminiLLM(LLMInterface):
    """
    A custom LLM class for Google Gemini models that implements the Neo4j GraphRAG LLMInterface.
    """
    
    def __init__(
        self, 
        model_name: str, 
        google_api_key: str, 
        model_params: Dict[str, Any] = None,
        default_system_instruction: Optional[str] = None
    ):
        """
        Initialize the Gemini LLM.
        
        Args:
            model_name: The name of the Gemini model to use (e.g., "gemini-2.5-flash-preview-04-17")
            google_api_key: The Google API key to authenticate with Gemini
            model_params: Optional parameters to pass to the model (e.g., temperature)
            default_system_instruction: Default system prompt to use when none is provided
        """
        # Initialize the parent class
        super().__init__(model_name=model_name, model_params=model_params or {})
        
        # Store the API key
        self.google_api_key = google_api_key
        
        # Store the default system instruction
        self.default_system_instruction = default_system_instruction or "You are a helpful AI assistant."
        
        # Initialize the LangChain Gemini model
        self.llm = GoogleGenerativeAI(
            model=self.model_name,
            google_api_key=self.google_api_key,
            **self.model_params
        )
    
    def invoke(
        self,
        input: str,
        message_history: Optional[Union[List[LLMMessage], MessageHistory]] = None,
        system_instruction: Optional[str] = None,
    ) -> LLMResponse:
        """
        Invoke the Gemini model synchronously.
        
        Args:
            input: The text prompt to send to the model
            
        Returns:
            LLMResponse: An object containing the model's response
        """
         # Implement how to handle system_instruction
        # For example:
        effective_system_instruction = system_instruction or self.default_system_instruction
        
        try:
            # Get the response from the model
            response = self.llm.invoke(input)
            
            # Return as LLMResponse object (tokens_used is not provided by the Gemini API through LangChain)
            return LLMResponse(content=response)
        except Exception as e:
            # Handle any errors that might occur
            error_message = f"Error invoking Gemini model: {str(e)}"
            return LLMResponse(content=error_message)
    
    async def ainvoke(
        self,
        input: str,
        message_history: Optional[Union[List[LLMMessage], MessageHistory]] = None,
        system_instruction: Optional[str] = None,
    ) -> LLMResponse:
        """
        Invoke the Gemini model asynchronously.
        
        Args:
            input: The text prompt to send to the model
            
        Returns:
            LLMResponse: An object containing the model's response
        """
        # Similar implementation for async version
        effective_system_instruction = system_instruction or self.default_system_instruction

        # Use run_in_executor to make the synchronous call asynchronous
        # This is because the LangChain GoogleGenerativeAI doesn't have native async support
        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(None, self.invoke, input)
        return response
    

llm = GeminiLLM(
    model_name=MODEL,
    google_api_key=GEMINI_API_KEY,
    model_params={"temperature": 0.0}
)