# Goal & Approach

The goal of this file is to build a unified Knowledge Graph of actors and relationships based on the entities detected in the streetpress_extract_entities.ipynb file. 

In that previous extraction steps, entities were extracted from multiple chunks of texts coming from multiple articles. Those entities and relationships were stored in separate files.

The challenge here is to aggregate and resolve all these entities and relationships to build one big graph with as few duplicates as possible.

The resolution relies on 2 components : a full-text search index and a vector search index. And the final source of truth for resolved entities is a Sqlite instance.

<img src="streetpress_resolution.png" width="800" height="480">

# How to run this file

Required: 
- An OpenAI API key

Your secrets (API keys) must be put in a ".env" file in the same folder as this file.
The format should be OPENAI_API_KEY = "sk-..." (replace with your own key)

In [201]:
import os
from dotenv import load_dotenv
load_dotenv()

True

# Retrieve LLM reponses' data

In [202]:
# Load entities from disk

import json
import os
import glob

def load_entities_from_file(file_path):
    # Load data from the file
    actors = []
    rels = []
    with open(file_path, 'r') as file:
        data = json.load(file)
        actors = data['actors']
        rels = data['relationships']

    return actors, rels


def get_llm_files_paths(folder_name="llm_responses"):
    folder_path = os.path.join(".", folder_name)

    # Check if the folder exists
    if not os.path.exists(folder_path):
        print(f"Folder not found: {folder_path}")
        return None

    # Pattern to match files of the form 'article-x_chunk-y.json'
    pattern = os.path.join(folder_path, "article-*_chunk-*.json")

    # Find all matching files
    file_paths = glob.glob(pattern)

    return file_paths


In [172]:
# Test on the first chunk of the first article

file_path = "./llm_responses/article-1_chunk-1.json"
actors, rels = load_entities_from_file(file_path)
print(actors)
print(rels)

[{'id': 'kata_fight_club', 'name': 'Kata fight club', 'label': 'ORGANIZATION', 'key_information': 'It is the name of the underground fight club.'}, {'id': 'gud', 'name': 'Gud', 'label': 'ORGANIZATION', 'key_information': 'It is a right-wing extremist group.'}, {'id': 'generation_identitaire', 'name': 'Génération identitaire', 'label': 'ORGANIZATION', 'key_information': 'It is a right-wing extremist group.'}, {'id': 'zouaves', 'name': 'Zouaves', 'label': 'ORGANIZATION', 'key_information': 'It is a neo-Nazi group.'}, {'id': 'remi_g', 'name': 'Rémi G.', 'label': 'PERSON', 'key_information': 'He is a main participant of the right-wing extremist fight club./nHe attended a prestigious military school./nHe shares his drawings and other content on Instagram and YouTube./nHe is part of the Catholic identitarian movement Academia Christiana.'}, {'id': 'herve_ryssen', 'name': 'Hervé Ryssen', 'label': 'PERSON', 'key_information': 'He is an antisemitic and conspiracist French person whose documenta

# Create the SQLite DB

In [203]:
# Boilerplate to interact with SQLite

import sqlite3
import os


def create_sqlite_db(db_name='streetpress.db'):
    # Check if the database file exists
    if os.path.exists(db_name):
        # Delete the file
        os.remove(db_name)
    
    # Creates a new db with that name
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    # Create table with an auto-incrementing primary key
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS actors (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            label TEXT NOT NULL,
            key_information TEXT
        )
    ''')

    cursor.execute('''
        CREATE TABLE IF NOT EXISTS relationships (
            id INTEGER PRIMARY KEY,
            source_actor INTEGER,
            target_actor INTEGER,
            label TEXT NOT NULL,
            rationale TEXT,
            FOREIGN KEY (source_actor) REFERENCES actors (id),
            FOREIGN KEY (target_actor) REFERENCES actors (id)
        )
    ''')

    # Commit and close
    conn.commit()
    conn.close()


def insert_actor_sqlite(actor, db_name="streetpress.db"):
    # Connect to the db
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    # Insert a new row without specifying the ID
    cursor.execute('''
        INSERT INTO actors (name, label, key_information) VALUES (?, ?, ?)
    ''', (actor['name'], actor['label'], actor['key_information']))

    # Retrieve the ID of the newly inserted row
    inserted_row_id = cursor.lastrowid

    # Commit and close
    conn.commit()
    conn.close()

    return inserted_row_id

def insert_relationship_sqlite(rel, actors_id_map, db_name="streetpress.db"):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    # Insert a new row without specifying the ID
    cursor.execute('''
        INSERT INTO relationships (source_actor, target_actor, label, rationale) VALUES (?, ?, ?, ?)
    ''', (actors_id_map[rel['source']], actors_id_map[rel['target']], rel['label'], rel['rationale']))

    # Retrieve the ID of the newly inserted row
    inserted_row_id = cursor.lastrowid

    # Commit and close
    conn.commit()
    conn.close()

    return inserted_row_id

def display_table_sqlite(table_name, db_name="streetpress.db"):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()    

    query = "SELECT * FROM " + table_name + " LIMIT 10"
    cursor.execute(query)

    # Fetch and print the results
    print(cursor.fetchall())

    # Close the connection
    conn.close()

def get_actor_by_id(actor_id, db_name="streetpress.db"):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    query = "SELECT * FROM actors WHERE id = ?"
    cursor.execute(query, (actor_id,))

    # Fetch and return the result
    result = cursor.fetchone()

    # Close the connection
    conn.close()

    return result

def get_rel_by_id(rel_id, db_name="streetpress.db"):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    query = "SELECT * FROM relationships WHERE id = ?"
    cursor.execute(query, (rel_id,))

    # Fetch and return the result
    result = cursor.fetchone()

    # Close the connection
    conn.close()

    return result


In [174]:
# Run an example

create_sqlite_db()
actors_id_map = {}
for actor in actors:
    actor_db_id = insert_actor_sqlite(actor)
    actors_id_map[actor['id']] = actor_db_id
for rel in rels:
    insert_relationship_sqlite(rel,actors_id_map)
display_table_sqlite("actors")

[(1, 'Kata fight club', 'ORGANIZATION', 'It is the name of the underground fight club.'), (2, 'Gud', 'ORGANIZATION', 'It is a right-wing extremist group.'), (3, 'Génération identitaire', 'ORGANIZATION', 'It is a right-wing extremist group.'), (4, 'Zouaves', 'ORGANIZATION', 'It is a neo-Nazi group.'), (5, 'Rémi G.', 'PERSON', 'He is a main participant of the right-wing extremist fight club./nHe attended a prestigious military school./nHe shares his drawings and other content on Instagram and YouTube./nHe is part of the Catholic identitarian movement Academia Christiana.'), (6, 'Hervé Ryssen', 'PERSON', 'He is an antisemitic and conspiracist French person whose documentaries Rémi G. shares.'), (7, 'Marc de Cacqueray', 'PERSON', 'He is the leader of Gud and a known right-wing extremist.'), (8, 'Loutres Gang', 'ORGANIZATION', 'It is a group of fighters from the right-wing extremist groups Auctorum and Luminis.'), (9, 'Auctorum', 'ORGANIZATION', 'It is a right-wing extremist group.'), (10, 

# Create a full-text index with Whoosh

Observations: 
- The main QueryParser on the "name" works with OR allowing for partial match on groups of words
- Fuzziness is applied for words that are 4 letters or more
- Any match (whatever its score) is accepted

In [280]:
from whoosh.fields import Schema, TEXT, ID
from whoosh.analysis import StandardAnalyzer
from whoosh.index import create_in
from whoosh.qparser import QueryParser, FuzzyTermPlugin, OrGroup
import os.path
import shutil

class FullTextIndex:
    def __init__(self, ft_index_name="streetpress_ft_index"):
        self.schema = Schema(name=TEXT(stored=True, analyzer=StandardAnalyzer()), sqlite_id=ID(stored=True))

        if os.path.exists(ft_index_name):
            shutil.rmtree(ft_index_name)
        os.mkdir(ft_index_name)

        self.index = create_in(ft_index_name, self.schema)
        self.setup_query_parsers()

    def setup_query_parsers(self):
        self.name_qp = QueryParser("name", schema=self.index.schema, group=OrGroup.factory(0.9))
        self.name_qp.add_plugin(FuzzyTermPlugin())
        self.sqlite_id_qp = QueryParser("sqlite_id", schema=self.index.schema)

    def insert_doc(self, entity_name, sqlite_id):
        with self.index.writer() as writer:
            writer.add_document(name=entity_name, sqlite_id=str(sqlite_id))

    def name_search(self, text, FT_THREASHOLD=5):
        query_words = text.split()
        combined_query = " OR ".join([f"{word}~" if len(word) > 3 else word for word in query_words])
        q = self.name_qp.parse(combined_query)
        extracted_results = []

        with self.index.searcher() as s:
            results = s.search(q)
            if len(results) > 0:
                extracted_results = [{"name": result['name'], "sqlite_id": result['sqlite_id'], "score": result.score} for result in results if result.score>FT_THREASHOLD]

        return extracted_results


In [281]:
# Run an example 

create_sqlite_db()
ft_index = FullTextIndex()
actors_id_map = {}
for actor in actors:
    actor_db_id = insert_actor_sqlite(actor)
    ft_index.insert_doc(actor['name'], actor_db_id)
    actors_id_map[actor['id']] = actor_db_id

In [282]:
with ft_index.index.searcher() as searcher:
    for doc in searcher.documents():
        print(doc)

{'name': 'Academia Christiana', 'sqlite_id': '11'}
{'name': 'Marc de Cacqueray', 'sqlite_id': '7'}
{'name': 'Loutres Gang', 'sqlite_id': '8'}
{'name': 'Auctorum', 'sqlite_id': '9'}
{'name': 'Luminis', 'sqlite_id': '10'}
{'name': 'Hervé Ryssen', 'sqlite_id': '6'}
{'name': 'Kata fight club', 'sqlite_id': '1'}
{'name': 'Gud', 'sqlite_id': '2'}
{'name': 'Génération identitaire', 'sqlite_id': '3'}
{'name': 'Zouaves', 'sqlite_id': '4'}
{'name': 'Rémi G.', 'sqlite_id': '5'}


In [283]:
ft_index.name_search("La Dissidence Française")

[]

# Create a vector index with faiss

Observations:
- The distance measure used is the Euclidian distance (default with faiss)
- The maximum distance used is 0.2

In [205]:
# Initialize the vector store
import faiss
import numpy as np
from langchain.vectorstores import FAISS
from langchain.docstore.in_memory import InMemoryDocstore
from langchain.vectorstores.utils import DistanceStrategy
from langchain.embeddings.openai import OpenAIEmbeddings

class VectorIndex:
    def __init__(self):
        self.embedding_engine = OpenAIEmbeddings()
        sample_embedding = self.embedding_engine.embed_documents(["Sample text"])[0]

        self.vct_index = faiss.IndexFlatL2(len(sample_embedding))
        self.vct_db = FAISS(
            self.embedding_engine,
            self.vct_index,
            InMemoryDocstore(),
            {},
            normalize_L2=False,
            distance_strategy=DistanceStrategy.EUCLIDEAN_DISTANCE
        )

    def insert_doc(self, text, sqlite_id):
        self.vct_db.add_texts([text], [{"sqlite_id": str(sqlite_id)}])

    def search(self, text, MAX_DIST=0.2):
        results = self.vct_db.similarity_search_with_score(text)
        filtered_results = [result[0].metadata['sqlite_id'] for result in results if result[1]< MAX_DIST]

        return filtered_results

In [181]:
# Run an example

create_sqlite_db()
ft_index = FullTextIndex()
vct_actor_index = VectorIndex()
vct_rel_index = VectorIndex()

actors_id_map = {}
for actor in actors:
    actor_sqlite_id = insert_actor_sqlite(actor)
    ft_index.insert_doc(actor['name'], actor_sqlite_id)
    vct_actor_index.insert_doc(actor['key_information'], actor_sqlite_id)
    actors_id_map[actor['id']] = actor_sqlite_id

for rel in rels:
    rel_sqlite_id = insert_relationship_sqlite(rel, actors_id_map)
    vct_rel_index.insert_doc(rel['rationale'], rel_sqlite_id)


In [182]:
print(get_actor_by_id(vct_actor_index.search("It is the name of the underground fight club.")[0]))
print(get_actor_by_id(vct_actor_index.search("He is a main participant of the right-wing extremist fight club.")[0]))

(1, 'Kata fight club', 'ORGANIZATION', 'It is the name of the underground fight club.')
(5, 'Rémi G.', 'PERSON', 'He is a main participant of the right-wing extremist fight club./nHe attended a prestigious military school./nHe shares his drawings and other content on Instagram and YouTube./nHe is part of the Catholic identitarian movement Academia Christiana.')


In [183]:
print(get_rel_by_id(vct_rel_index.search("Rémi G. is a member of Gud.")[0]))

(1, 5, 2, 'member of', 'Rémi G. is a member of Gud.')


# Entity resolution

Observations:
- Vector search was initially applied on actors' key information if the full-text search came back empty. But it was removed (commented section): the information provided by the LLM was sometimes too succinct, generating false positives. 
- The vector search seams to be letting through quite a few duplicates for the relationships as well. Some tuning or alternative strategies would be needed.

In [264]:
import logging
import datetime
import os

def ensure_log_directory_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

def setup_logger(directory):
    current_time = datetime.datetime.now()
    log_filename = f"process_log_{current_time.strftime('%Y%m%d_%H%M%S')}.txt"
    log_filepath = os.path.join(directory, log_filename)

    logger = logging.getLogger('MyLogger')
    logger.setLevel(logging.INFO)

    file_handler = logging.FileHandler(log_filepath)
    formatter = logging.Formatter('%(asctime)s - %(message)s')
    file_handler.setFormatter(formatter)

    logger.addHandler(file_handler)
    return logger




In [261]:
def process_actor(actor, ft__actor_index, vct_actor_index, actors_id_map):
    ft_actor_search_results = ft__actor_index.name_search(actor['name'])
    vct_actor_search_results = vct_actor_index.search(actor['key_information'])

    if len(ft_actor_search_results)>0:
        resolved_id = ft_actor_search_results[0]['sqlite_id']
        ft_index.insert_doc(actor['name'], resolved_id)
        vct_actor_index.insert_doc(actor['key_information'], resolved_id)
        actors_id_map[actor['id']] = resolved_id
        message = f"RESOLVED ACTOR: {actor['name']} => {get_actor_by_id(resolved_id)[1]}"
        logger.info(message)
        print(message)
    # elif len(vct_actor_search_results)>0:
    #     resolved_id = vct_actor_search_results[0]
    #     ft_index.insert_doc(actor['name'], resolved_id)
    #     vct_actor_index.insert_doc(actor['key_information'], resolved_id)
    #     actors_id_map[actor['id']] = resolved_id
    else:
        actor_db_id = insert_actor_sqlite(actor)
        ft_index.insert_doc(actor['name'], actor_db_id)
        vct_actor_index.insert_doc(actor['key_information'], actor_db_id)
        actors_id_map[actor['id']] = actor_db_id
        message = f"CREATED ACTOR: {actor['name']} with DB ID {actor_db_id}"
        logger.info(message)
        print(message)
    
    return actors_id_map

def process_rel(rel, vct_rel_index, actors_id_map):
    endpoints = [actors_id_map[rel['source']], actors_id_map[rel['target']]]
    vct_search_raw_results = vct_rel_index.search(rel['rationale'])
    filtered_vct_search_results = [result for result in vct_search_raw_results if (str(get_rel_by_id(result)[1]) in endpoints and str(get_rel_by_id(result)[2]) in endpoints)]
    if len(filtered_vct_search_results)==0:
        rel_db_id = insert_relationship_sqlite(rel, actors_id_map)
        vct_rel_index.insert_doc(rel['rationale'], rel_db_id)
        message = f"CREATED REL: {rel}"
        logger.info(message)
        print(message)
        return rel_db_id
    else:
        message = f"RESOLVED REL: {rel} resolved to {get_rel_by_id(filtered_vct_search_results[0])}"
        logger.info(message)
        print(message)

    


In [185]:
# Run an example

create_sqlite_db()
ft_actor_index = FullTextIndex()
vct_actor_index = VectorIndex()
vct_rel_index = VectorIndex()
log_directory = "log_files"
ensure_log_directory_exists(log_directory)
logger = setup_logger(log_directory)

actors_id_map = {}
for actor in actors:
    process_actor(actor, ft_actor_index, vct_actor_index, actors_id_map)

In [186]:
for rel in rels:
    process_rel(rel, vct_rel_index,actors_id_map)

# Process all the files

In [284]:
# Prepare the DB and the indexes

create_sqlite_db()
ft_actor_index = FullTextIndex()
vct_actor_index = VectorIndex()
vct_rel_index = VectorIndex()
log_directory = "log_files"
ensure_log_directory_exists(log_directory)
logger = setup_logger(log_directory)

In [285]:
import datetime

# Find all matching files
file_paths = get_llm_files_paths()
print(file_paths)

for file_path in file_paths:
    message = f"FILE: starting to process {file_path}"
    logger.info(message)
    print(message)
    start_time = datetime.datetime.now()
    print(f"Iteration started at {start_time}")
    file_actors, file_rels = load_entities_from_file(file_path)
    actors_id_map = {}
    
    for actor in file_actors:
        #print(f"start to process actor: {actor}")
        process_actor(actor, ft_actor_index, vct_actor_index, actors_id_map)

    for rel in file_rels:
        #print(f"start to process rel: {rel}")
        process_rel(rel, vct_rel_index, actors_id_map)

    end_time = datetime.datetime.now()
    duration = end_time - start_time
    print(f"Iteration ended at {end_time}, duration: {duration}")

['./llm_responses/article-7_chunk-2.json', './llm_responses/article-19_chunk-1.json', './llm_responses/article-2_chunk-1.json', './llm_responses/article-5_chunk-1.json', './llm_responses/article-28_chunk-1.json', './llm_responses/article-39_chunk-2.json', './llm_responses/article-18_chunk-1.json', './llm_responses/article-4_chunk-1.json', './llm_responses/article-29_chunk-1.json', './llm_responses/article-3_chunk-1.json', './llm_responses/article-7_chunk-3.json', './llm_responses/article-23_chunk-1.json', './llm_responses/article-10_chunk-2.json', './llm_responses/article-42_chunk-2.json', './llm_responses/article-17_chunk-2.json', './llm_responses/article-36_chunk-1.json', './llm_responses/article-31_chunk-1.json', './llm_responses/article-34_chunk-2.json', './llm_responses/article-15_chunk-1.json', './llm_responses/article-40_chunk-1.json', './llm_responses/article-12_chunk-1.json', './llm_responses/article-16_chunk-2.json', './llm_responses/article-11_chunk-2.json', './llm_responses

In [217]:
conn = sqlite3.connect("streetpress.db")
cursor = conn.cursor()    

query = "SELECT COUNT(*) FROM relationships"
cursor.execute(query)

# Fetch and print the results
print(cursor.fetchall())

# Close the connection
conn.close()

[(328,)]


In [197]:
display_table_sqlite("actors")

[(1, 'RNJ', 'ORGANIZATION', 'The RNJ is a radical youth organization that was relaunched in 2023./nThe RNJ organizes Forums for training and cohesion for its members and sympathizers./nThe RNJ has local sections, each with a responsible person.'), (2, 'Pierre-Romain Thionnet', 'PERSON', 'Pierre-Romain Thionnet is the head of the RNJ and the creator of the RNJ Forums./nHe has also held the position of national secretary at the Cocarde.'), (3, 'Aloys Vojinovic', 'PERSON', 'Aloys Vojinovic is a former member of the Zouaves Paris, a dissolved nazifying group, who attended a talk on the situation in Hungary at an RNJ Forum./nHe is known to some young RN leaders, including parliamentary attaché Luc Lahalle.'), (4, 'Eyquem Pons', 'PERSON', 'Eyquem Pons, also known as Etienne Cormier, is a cadre of Generation Identity who also attended a talk on the situation in Hungary at an RNJ Forum.'), (5, 'Luc Lahalle', 'PERSON', 'Luc Lahalle is a parliamentary attaché who often attends RNJ Forums and has

In [213]:
print(get_rel_by_id(vct_rel_index.search("The Martel division has Mathis D. as a member.")[0]))

(21, 20, 22, 'has member', 'The Martel division has Mathis D. as a member.')
