# Load Form 10k Chunk Nodes

## Imports

In [1]:
from dotenv import load_dotenv
import os

# Common data processing
import json
from pandas import DataFrame
import pandas as pd
from typing import List, Tuple, Union
from numpy.typing import ArrayLike
from progress.bar import Bar

# Langchain
from langchain.graphs import Neo4jGraph
from langchain_community.vectorstores import Neo4jVector
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQAWithSourcesChain
from langchain_community.chat_models import ChatOpenAI

## Neo4j Utility Functions

In [6]:
def neo4j_version(kg: Neo4jGraph) -> Tuple[int, int, int]:
  """Return the version of Neo4j as a tuple of ints"""
  version = kg.query("CALL dbms.components()")[0]["versions"][0]
  if "aura" in version:
    version_tuple = tuple(map(int, version.split("-")[0].split("."))) + (0,)
  else:
    version_tuple = tuple(map(int, version.split(".")))
  return version_tuple

In [7]:
def neo4j_create_vector_index(kg: Neo4jGraph, index_name: str, 
                              for_label: str, on_property: str, 
                              vector_dimensions: int = 1536,
                              similarity_function: str = 'cosine') -> None:
  """Create a Neo4j index for vector properties"""
  
  if neo4j_version(kg) >= (5, 15, 0): # For Neo4j version 5.15.0 and above
    kg.query(f"""CREATE VECTOR INDEX `{index_name}` IF NOT EXISTS
      FOR (n:{for_label}) ON (n.{on_property}) 
      OPTIONS {{indexConfig: {{
        `vector.dimensions`: {vector_dimensions},
        `vector.similarity_function`: '{similarity_function}'
      }}}}"""
    )
  else: # For Neo4j versions below 5.15.0 (warning: this is deprecated)
    kg.query(
      f"""CALL db.index.vector.createNodeIndex(
        '{index_name}', '{for_label}', '{on_property}', 
        {vector_dimensions}, '{similarity_function}'
      )"""
    )

## Set up Neo4j and Langchain

In [4]:
# Load from environment
load_dotenv('.env', override=True)
NEO4J_URI = os.getenv('NEO4J_URI')
NEO4J_USERNAME = os.getenv('NEO4J_USERNAME')
NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')
NEO4J_DATABASE = os.getenv('NEO4J_DATABASE') or 'neo4j'

# Global constants
VECTOR_INDEX_NAME = 'form_10k_chunks'
VECTOR_NODE_LABEL = 'Chunk'
VECTOR_SOURCE_PROPERTY = 'text'
VECTOR_EMBEDDING_PROPERTY = 'textEmbedding'


In [8]:
# Create a knowledge graph using Langchain's Neo4j integration.
# This will be used for direct querying of the knowledge graph. 
kg = Neo4jGraph(
    url=NEO4J_URI, username=NEO4J_USERNAME, password=NEO4J_PASSWORD, database=NEO4J_DATABASE
)

# OpenAI for creating embeddings
embeddings_model = OpenAIEmbeddings()

# Splitting text into chunks using the RecursiveCharacterTextSplitter 
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size = 2000,
    chunk_overlap  = 200,
    length_function = len,
    is_separator_regex = False,
)


In [9]:
# Create a vector index for the textEmbedding property of Text nodes. Call the index "form_10k_text" 
neo4j_create_vector_index(kg, VECTOR_INDEX_NAME, 'Chunk', 'textEmbedding')

# Create a uniqueness constraint on the textId property of Text nodes 
kg.query('CREATE CONSTRAINT unique_chunk IF NOT EXISTS FOR (n:Chunk) REQUIRE n.chunkId IS UNIQUE')


[]

In [10]:

# Create a vector store for the Neo4j knowledge graph
# This will be used for vector similarity queries.
vector_store = Neo4jVector.from_existing_index(
    OpenAIEmbeddings(),
    url=NEO4J_URI,
    username=NEO4J_USERNAME,
    password=NEO4J_PASSWORD,
    index_name=VECTOR_INDEX_NAME,
)
vector_store = Neo4jVector.from_existing_graph(
    embedding=embeddings_model,
    url=NEO4J_URI,
    username=NEO4J_USERNAME,
    password=NEO4J_PASSWORD,
    index_name=VECTOR_INDEX_NAME,
    node_label=VECTOR_NODE_LABEL,
    text_node_properties=[VECTOR_SOURCE_PROPERTY],
    embedding_node_property=VECTOR_EMBEDDING_PROPERTY,
)
retriever = vector_store.as_retriever()
chain = RetrievalQAWithSourcesChain.from_chain_type(
    ChatOpenAI(temperature=0), chain_type="stuff", retriever=retriever
)


## Basic Cypher Queries

In [5]:
kg.refresh_schema()
print(kg.schema)

Node properties are the following:

Relationship properties are the following:

The relationships are the following:



In [105]:
# "Hello world" query in cypher. Returns a concatenated string using a parameter.
kg.query("RETURN 'Hello ' + $name as message", params={'name':"Andreas!"})

[{'message': 'Hello Andreas!'}]

In [106]:
# Create a node in the knowledge graph for a person named "Andreas"
kg.query("CREATE (n:Person {name: $name})", params={'name':"Andreas"})

[]

In [108]:
# Find the person node we just created by looking for a single node pattern
# that matches any node labeled "Person" and return the name property of the first one
kg.query("MATCH (n:Person) RETURN n.name as name LIMIT 1")

[{'name': 'Andreas'}]

In [109]:
# Clean up by removing that example node
kg.query("MATCH (n:Person) WHERE n.name=$name DETACH DELETE n", params={'name':"Andreas"})

[]

In [6]:
# Check how many nodes exist in the database
kg.query("MATCH (n) RETURN count(n) as count")

[{'count': 0}]

In [14]:
def neo4j_vector_search(kg: Neo4jGraph, embeddings_model: OpenAIEmbeddings,
                        index_name: str, query: str, top_k: int = 10) -> List:
  """Search for similar nodes using the Neo4j vector index"""
  embedded_query = embeddings_model.embed_query(query)
  vector_search = f"""
    CALL db.index.vector.queryNodes($index_name, $top_k, $embedding) yield node, score
    RETURN node.text AS result
  """
  similar = kg.query(vector_search, params={'embedding': embedded_query, 'index_name':index_name, 'top_k': top_k})
  return similar

## Prepare Neo4j indexes

## Data loading utility functions 

In [7]:
def make_map(x):
    if type(x) == str:
        return x, x
    elif type(x) == tuple:
        return x
    else:
        raise Exception("Entry must of type string or tuple")


def make_set_clause(prop_names: ArrayLike, element_name='n', item_name='rec'):
    clause_list = []
    for prop_name in prop_names:
        clause_list.append(f'{element_name}.{prop_name} = {item_name}.{prop_name}')
    return 'SET ' + ', '.join(clause_list)


def make_node_merge_query(node_key_name: str, node_label: str, cols: ArrayLike):
    template = f'''UNWIND $recs AS rec\nMERGE(n:{node_label} {{{node_key_name}: rec.{node_key_name}}})'''
    prop_names = [x for x in cols if x != node_key_name]
    if len(prop_names) > 0:
        template = template + '\n' + make_set_clause(prop_names)
    return template + '\nRETURN count(n) AS nodeLoadedCount'


def make_rel_merge_query(source_target_labels: Union[Tuple[str, str], str],
                         source_node_key: Union[Tuple[str, str], str],
                         target_node_key: Union[Tuple[str, str], str],
                         rel_type: str,
                         cols: ArrayLike,
                         rel_key: str = None):
    source_target_label_map = make_map(source_target_labels)
    source_node_key_map = make_map(source_node_key)
    target_node_key_map = make_map(target_node_key)

    merge_statement = f'MERGE(s)-[r:{rel_type}]->(t)'
    if rel_key is not None:
        merge_statement = f'MERGE(s)-[r:{rel_type} {{{rel_key}: rec.{rel_key}}}]->(t)'

    template = f'''\tUNWIND $recs AS rec
    MATCH(s:{source_target_label_map[0]} {{{source_node_key_map[0]}: rec.{source_node_key_map[1]}}})
    MATCH(t:{source_target_label_map[1]} {{{target_node_key_map[0]}: rec.{target_node_key_map[1]}}})\n\t''' + merge_statement
    prop_names = [x for x in cols if x not in [rel_key, source_node_key_map[1], target_node_key_map[1]]]
    if len(prop_names) > 0:
        template = template + '\n\t' + make_set_clause(prop_names, 'r')
    return template + '\n\tRETURN count(r) AS relLoadedCount'


def batches(xs, n=100):
    n = max(1, n)
    return [xs[i:i + n] for i in range(0, len(xs), n)]


def load_nodes(graph: Neo4jGraph, node_df: pd.DataFrame, node_key_col: str, node_label: str, batch_size=1000):
    records = node_df.to_dict('records')
    print(f'======  loading {node_label} nodes  ======')
    total = len(records)
    print(f'staging {total:,} records')
    query = make_node_merge_query(node_key_col, node_label, node_df.columns.copy())
    print(f'\nUsing This Cypher Query:\n```\n{query}\n```\n')
    cumulative_count = 0
    for recs in batches(records, batch_size):
        res = graph.query(query, params={'recs': recs})
        cumulative_count += res[0].get('nodeLoadedCount')
        print(f'Loaded {cumulative_count:,} of {total:,} nodes')


def load_rels(graph: Neo4jGraph,
              rel_df: pd.DataFrame,
              source_target_labels: Union[Tuple[str, str], str],
              source_node_key: Union[Tuple[str, str], str],
              target_node_key: Union[Tuple[str, str], str],
              rel_type: str,
              rel_key: str = None,
              batch_size=10_000):
    records = rel_df.to_dict('records')
    print(f'======  loading {rel_type} relationships  ======')
    total = len(records)
    print(f'staging {total:,} records')
    query = make_rel_merge_query(source_target_labels, source_node_key,
                                 target_node_key, rel_type, rel_df.columns.copy(), rel_key)
    print(f'\nUsing This Cypher Query:\n```\n{query}\n```\n')
    cumulative_count = 0
    for recs in batches(records, batch_size):
        res = graph.query(query, params={'recs': recs})
        cumulative_count += res[0].get('relLoadedCount')
        print(f'Loaded {cumulative_count:,} of {total:,} relationships')

In [8]:
def get_and_split_txt_data(file_names: List[str]) -> DataFrame:
    doc_data_list = []
    for file_name in file_names:
        with open(file_name) as f:
            f10_k = json.load(f)
            for item in ['item1', 'item1a', 'item7', 'item7a']:
                #split text data
                txt = f10_k[item]
                split_txts = text_splitter.split_text(txt)
                chunk_seq_id = 0
                for split_txt in split_txts:
                    form_id = file_name[file_name.rindex('/') + 1:file_name.rindex('.')]
                    doc_data_list.append({ 'formId': f'{form_id}',
                                           'chunkId': f'{form_id}-{item}-chunk{chunk_seq_id:04d}',
                                           'cik': f10_k['cik'],
                                           'cusip6': f10_k['cusip6'],
                                           'source': f10_k['source'],
                                           'f10kItem': item,
                                           'chunkSeqId': chunk_seq_id,
                                           'text': split_txt})
                    chunk_seq_id += 1
    return pd.DataFrame(doc_data_list)

def add_text_embeddings(df):
    count = 0
    embeddings = []
    for docs in batches(df.text, n=100):
        count += len(docs)
        print(f'Embedded {count} of {df.shape[0]}')
        embeddings.extend(embeddings_model.embed_documents(docs))
    df['textEmbedding'] = embeddings



## Load Form 10k documents

1. iterate through all the files in the directory
2. batch load sets of the files
3. for each file, load the content and split the text into chunks
4. for each chunk, create a graph Node that includes metadata and the chunk text

In [10]:
%%time

all_file_names = ['../source-data-pull/form10k/data/form10k-clean/' + x for x in os.listdir('../source-data-pull/form10k/data/form10k-clean/')]
counter = 0
for file_names in batches(all_file_names, 20):
    counter += len(file_names)
    print(f'=== Processing {counter-len(file_names)}:{counter} of {len(all_file_names)} ===')
    # get and split text data
    print('Loading and splitting Text Files...')
    doc_df = get_and_split_txt_data(file_names)
    # perform text embedding
    print('Performing Text Embedding...')
    add_text_embeddings(doc_df)
    #load nodes
    print('Loading Nodes...')
    load_nodes(kg, doc_df.drop(columns='textEmbedding'), 'chunkId', 'Chunk')
    print(f'Done Processing {counter-len(file_names)}:{counter}')

    # Merge text embeddings using set vector property
    records = doc_df[['chunkId', 'textEmbedding']].to_dict('records')
    print(f'======  loading Document text embeddings ======')
    total = len(records)
    print(f'staging {total:,} records')
    cumulative_count = 0
    for recs in batches(records, n=100):
        res = kg.query('''
        UNWIND $recs AS rec
        MATCH(n:Chunk {chunkId: rec.chunkId})
        CALL db.create.setNodeVectorProperty(n, "textEmbedding", rec.textEmbedding)
        RETURN count(n) AS propertySetCount
        ''', params={'recs': recs})
        cumulative_count += res[0].get('propertySetCount')
        print(f'Set {cumulative_count:,} of {total:,} text embeddings')

=== Processing 0:20 of 79 ===
Loading and splitting Text Files...
Performing Text Embedding...
Embedded 100 of 2485
Embedded 200 of 2485
Embedded 300 of 2485
Embedded 400 of 2485
Embedded 500 of 2485
Embedded 600 of 2485
Embedded 700 of 2485
Embedded 800 of 2485
Embedded 900 of 2485
Embedded 1000 of 2485
Embedded 1100 of 2485
Embedded 1200 of 2485
Embedded 1300 of 2485
Embedded 1400 of 2485
Embedded 1500 of 2485
Embedded 1600 of 2485
Embedded 1700 of 2485
Embedded 1800 of 2485
Embedded 1900 of 2485
Embedded 2000 of 2485
Embedded 2100 of 2485
Embedded 2200 of 2485
Embedded 2300 of 2485
Embedded 2400 of 2485
Embedded 2485 of 2485
Loading Nodes...
staging 2,485 records

Using This Cypher Query:
```
UNWIND $recs AS rec
MERGE(n:Chunk {chunkId: rec.chunkId})
SET n.formId = rec.formId, n.cik = rec.cik, n.cusip6 = rec.cusip6, n.source = rec.source, n.f10kItem = rec.f10kItem, n.chunkSeqId = rec.chunkSeqId, n.text = rec.text
RETURN count(n) AS nodeLoadedCount
```

Loaded 1,000 of 2,485 nodes
Loa

## Example queries

In [11]:
question = 'Who makes hydraulic and mechanical tools?'

In [15]:
# Vector search using our utility function
neo4j_vector_search(kg, embeddings_model, VECTOR_INDEX_NAME, question, top_k=3)

[{'result': "The Company's critical raw material is steel. Out of Brazil, the Company sources three basic types of steel which are carbon steel, high speed steel and carbide cylinders. The Company has a number of long-term suppliers in Europe, Asia and Brazil and its sourcing mix is distributed according to the pricing including exchange rates. The U.S. sources steel, and small amounts of aluminum and brass through distributors. None of these suppliers accounts for more than 5% of the Company's purchases\nFor over 140 years, the Company has been a recognized leader in providing measurement and cutting solutions to industry. Measurement tools consist of precision instruments such as micrometers, vernier calipers, height distributors, depth gages, electronic gages, dial indicators, steel rules, combination squares, custom, non-contact gaging such as vision, optical and laser measurement systems. The Company believes advanced, non-contact systems with easy-to use software will be attracti

In [16]:
# Vector search using the langchain vector store
docs_with_score = vector_store.similarity_search_with_score(question, k=3)

for doc, score in docs_with_score:
    print("-" * 80)
    print("Score: ", score)
    print(doc.page_content)
    print("-" * 80)

--------------------------------------------------------------------------------
Score:  0.8979572653770447

text: The Company's critical raw material is steel. Out of Brazil, the Company sources three basic types of steel which are carbon steel, high speed steel and carbide cylinders. The Company has a number of long-term suppliers in Europe, Asia and Brazil and its sourcing mix is distributed according to the pricing including exchange rates. The U.S. sources steel, and small amounts of aluminum and brass through distributors. None of these suppliers accounts for more than 5% of the Company's purchases
For over 140 years, the Company has been a recognized leader in providing measurement and cutting solutions to industry. Measurement tools consist of precision instruments such as micrometers, vernier calipers, height distributors, depth gages, electronic gages, dial indicators, steel rules, combination squares, custom, non-contact gaging such as vision, optical and laser measurement s

In [17]:
# Vector search using the langchain retriever over the Neo4j vector store
retriever.get_relevant_documents(question)[0]

Document(page_content="\ntext: The Company's critical raw material is steel. Out of Brazil, the Company sources three basic types of steel which are carbon steel, high speed steel and carbide cylinders. The Company has a number of long-term suppliers in Europe, Asia and Brazil and its sourcing mix is distributed according to the pricing including exchange rates. The U.S. sources steel, and small amounts of aluminum and brass through distributors. None of these suppliers accounts for more than 5% of the Company's purchases\nFor over 140 years, the Company has been a recognized leader in providing measurement and cutting solutions to industry. Measurement tools consist of precision instruments such as micrometers, vernier calipers, height distributors, depth gages, electronic gages, dial indicators, steel rules, combination squares, custom, non-contact gaging such as vision, optical and laser measurement systems. The Company believes advanced, non-contact systems with easy-to use softwar

In [18]:
chain(
    {"question": question},
    return_only_outputs=True,
)

{'answer': 'Enerpac Tool Group Corp. is the company that makes hydraulic and mechanical tools.\n',
 'sources': 'https://www.sec.gov/Archives/edgar/data/6955/000000695523000034/0000006955-23-000034-index.htm'}