In [1]:
from neo4j import GraphDatabase
import aiohttp
import asyncio
from langchain_text_splitters import TokenTextSplitter
from datetime import datetime
text_splitter = TokenTextSplitter(chunk_size=512, chunk_overlap=56)

driver = GraphDatabase.driver("neo4j+s://diffbot.neo4jlabs.com:7687", auth=("neo4j", ""))

  from pandas.core import (


In [2]:
def get_org_ids(top_k: int = 1000):
    records, _, _ = driver.execute_query(
        "MATCH (o:Organization) WHERE o.importance IS NOT NULL RETURN o.id AS id ORDER BY o.importance DESC LIMIT toInteger($limit)",
    limit=top_k)
    return [el['id'] for el in records]

In [3]:
DIFFBOT_TOKEN = ""

async def get_articles(entity_id: str, date_after: str):
    # Base URL for the API call
    base_url = "https://kg.diffbot.com/kg/v3/dql"
    
    # Construct the query part
    query = f'type:Article tags.uri:"http://diffbot.com/entity/{entity_id}" strict:language:"en" date>"{date_after}" sortBy:date'
    
    # Create the full URL with the query and token
    url = f"{base_url}?type=query&token={DIFFBOT_TOKEN}&query={query}&size=100"
    
    # Make the GET request asynchronously
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                return await response.json()  # Return the JSON response
            else:
                return None  # Handle the error case as needed


In [4]:
import_query = """
UNWIND $data AS row
MERGE (a:Article {id: row.diffbotUri})
SET a.pageUrl = row.pageUrl,
    a.title = row.title,
    a.language = row.language,
    a.sentiment = toFloat(row.sentiment),
    a.date = row.date_obj,
    a.summary = row.summary,
    a.publisherCountry = row.publisherCountry,
    a.publisherRegion = row.publisherRegion
WITH a, row
CALL (a, row) {
UNWIND row.categories AS category
MERGE (c:Category {name: category.name})
MERGE (a)-[hc:HAS_CATEGORY]->(c)
SET hc.score = toFloat(category.score)
RETURN count(*) AS count
}
WITH a, row
CALL (a, row) {
UNWIND [el in row.tags WHERE el.uri IS NOT NULL | el ] AS tag
MERGE (t:Tag {id: tag.uri})
ON CREATE SET t.label = tag.label
MERGE (a)-[ht:HAS_TAG]->(t)
SET ht.score = toFloat(tag.score),
    ht.sentiment = toFloat(tag.sentiment)
RETURN count(*) AS count
}
WITH a, row
CALL (a, row) {
UNWIND row.texts as text
MERGE (a)-[:HAS_CHUNK]->(c:Chunk {index: text.index})
SET c.text = text.text
RETURN count(*) AS count
}
WITH a, row
CALL (a, row) {
WITH a, row
WHERE row.author IS NOT NULL
MERGE (au:Author {name: row.author})
MERGE (a)-[:HAS_AUTHOR]->(au)
}
WITH a, row
CALL (a, row) {
UNWIND [el in row.videos WHERE el.url IS NOT NULL] as video
MERGE (v:Video {uri: video.url})
SET v.summary = video.summary,
    v.name = video.name
MERGE (a)-[:HAS_VIDEO]->(v)
RETURN count(*) AS count
}
RETURN count(*)
"""

In [5]:
for con in ["CREATE CONSTRAINT article_unique_id IF NOT EXISTS FOR (a:Article) REQUIRE a.id IS UNIQUE;",
"CREATE CONSTRAINT category_unique_name IF NOT EXISTS FOR (c:Category) REQUIRE c.name IS UNIQUE;",
"CREATE CONSTRAINT tag_unique_id IF NOT EXISTS FOR (t:Tag) REQUIRE t.id IS UNIQUE;",
"CREATE CONSTRAINT author_unique_name IF NOT EXISTS FOR (au:Author) REQUIRE au.name IS UNIQUE;",
"CREATE CONSTRAINT video_unique_uri IF NOT EXISTS FOR (v:Video) REQUIRE v.uri IS UNIQUE;"]:
    driver.execute_query(con, database_='articles')


In [8]:
step = 1000

# Define a helper function to process the articles for a single organization
async def process_organization(organization, date_after):
    data = await get_articles(organization, date_after)
    for ent in data['data']:
        ent['entity']['texts'] = [{'text': el, 'index': i} for i, el in enumerate(text_splitter.split_text(ent['entity']['text']))]
        ent['entity']['date_obj'] = datetime.fromtimestamp(ent['entity']['date']['timestamp'] / 1000)
    return [ent['entity'] for ent in data['data']]

async def update_articles(top_k: int = 1000, date_after: str = '2024-01-01'):
    articles = []
    organization_ids = get_org_ids(top_k)
    # Gather all articles concurrently
    org_tasks = [process_organization(org, date_after) for org in organization_ids]
    org_results = await asyncio.gather(*org_tasks)
    # Flatten the list of results into one list
    for result in org_results:
        articles.extend(result)
    # Batch per 1000 articles
    for index in range(0, len(articles), step):
        batch = articles[index: index + step]
        driver.execute_query(import_query, data=batch, database_='articles')

In [9]:
await update_articles(10)