# Parsing Data
In this notebook, let's explore how to leverage generative AI to build and consume a knowledge graph in Neo4j.

This notebook parses Form-13 data from SEC EDGAR. This is partially structured data, a mix of text and XML.  Instead of spending our time writing a bespoke parser to extract data from these files and load into Neo4j, we can prompt a Large Language Model (LLM) to do this for us automatically.  We will then also use the LLM to generate Cypher statements to load the extracted data into a Neo4j graph.

## Setup
First, let's install the libraries we're going to need for this lab and the following notebook dependent labs.  We'll also want to reboot the kernel once done.  To do that, go to the "Kernel" menu and click "Restart Kernel and Clear All Outputs."  That will get rid of everything the install statements printed, leaving us with a cleaner notebook to work with.

In [None]:
!pip install --upgrade pip
%pip install --user graphdatascience
%pip install --user "pydantic==2.9.1"
%pip install --user "langchain==0.3.0"
%pip install --user gradio
%pip install --user IProgress
%pip install --user tqdm
%pip install --user "langchain-community==0.3.0"
%pip install --user "langchain-google-vertexai==2.0.0"

Now restart the kernel. That will allow the Python evironment to import the new packages.

## Prompt Definition
We will extract knowledge adhering to the same schema we used previously.  To teach the LLM about the schema, we will use a series of prompts.  Each prompt is focused on only one task, extracting a specific entity:

1. Manager Information
2. Filing Information

In [None]:
mgr_info_tpl = """From the text below, extract the following as json. Do not miss any of these information.
* The tags mentioned below may or may not namespaced. So extract accordingly. Eg: <ns1:tag> is equal to <tag>
* "managerName" - The name from the <name> tag under <filingManager> tag
* "street1" - The manager's street1 address from the <com:street1> tag under <address> tag
* "street2" - The manager's street2 address from the <com:street2> tag under <address> tag
* "city" - The manager's city address from the <com:city> tag under <address> tag
* "stateOrCounty" - The manager's stateOrCounty address from the <com:stateOrCountry> tag under <address> tag
* "zipCode" - The manager's zipCode from the <com:zipCode> tag under <address> tag
* "reportCalendarOrQuarter" - The reportCalendarOrQuarter from the <reportCalendarOrQuarter> tag under <address> tag
* Just return me the JSON enclosed by 3 backticks. No other text in the response

Text:
$ctext
"""

In [None]:
filing_info_tpl = """The text below contains a list of investments. Each instance of <infoTable> tag represents a unique investment. 
For each investment, please extract the below variables into json then combine into a list enclosed by 3 backticks. Please use the quoted names below while doing this
* "cusip" - The cusip from the <cusip> tag under <infoTable> tag
* "companyName" - The name under the <nameOfIssuer> tag.
* "value" - The value from the <value> tag under <infoTable> tag. Return as a number. 
* "shares" - The sshPrnamt from the <sshPrnamt> tag under <infoTable> tag. Return as a number. 
* "sshPrnamtType" - The sshPrnamtType from the <sshPrnamtType> tag under <infoTable> tag
* "investmentDiscretion" - The investmentDiscretion from the <investmentDiscretion> tag under <infoTable> tag
* "votingSole" - The votingSole from the <votingSole> tag under <infoTable> tag
* "votingShared" - The votingShared from the <votingShared> tag under <infoTable> tag
* "votingNone" - The votingNone from the <votingNone> tag under <infoTable> tag

Output format:
* DO NOT output XML tags in the response. The output should be a valid JSON list enclosed by 3 backticks

Text:
$ctext
"""

## Functions for Using LLMs
Let's create some helper function to talk to the LLM with our prompt and text input. 

The [Vertex AI documentation](https://cloud.google.com/vertex-ai/docs/generative-ai/learn/models) describes the available foundation models.  We will use gemini-1.5-flash model. In some cases, there may be a need to fine-tune LLM models for domain specific use cases. [Vertex AI provides an elegant way to fine-tune](https://cloud.google.com/vertex-ai/docs/generative-ai/models/tune-models) where the updated weights/model stay within your tenant and the base model is frozen.

In [None]:
from vertexai.generative_models import GenerativeModel

# Wrapper for calling language model
def run_text_model(
    model_name: str,
    temperature: float,
    max_output_tokens: int,
    top_p: float,
    top_k: int,
    prompt: str,
    ) :
    """Text Completion Use a Large Language Model."""
    model = GenerativeModel(model_name)
    response = model.generate_content(
        prompt,
        generation_config={ "temperature": temperature,
                           "max_output_tokens": max_output_tokens,"top_p": top_p,"top_k": top_k,},)
    return response.text

In [None]:
# Wrapper for entity extraction and parsing
def extract_entities_relationships(prompt):
    try:
        res = run_text_model("gemini-2.5-flash", 0, 8192, 0.8, 1, prompt)
        res = res.split('```')[1].strip('json').replace('\n', ' ')
        return res
    except Exception as e:
        print(e)

In [None]:
import re
import numpy as np

# splitting function for chunking up filing information to avoid hitting LLM token limits
def split_filing_info(s, chunk_size=5):
    pattern = '(</(\w+:)?infoTable>)'
    splitter = re.findall(pattern, s)[0][0]
    _parts = s.split(splitter)
    if len(_parts) > chunk_size:
        chunks_of_list = np.array_split(_parts, len(_parts)/chunk_size) # max 5 filings per part
        chunks_of_str = map(lambda x: splitter.join(x)+splitter, chunks_of_list)
        l = list(chunks_of_str)
        if len(l) > 0:
            l[len(l)-1] = re.sub(f'{splitter}$', '', l[len(l)-1])
        return l
    else:
        return [s]

## Test Example for Parsing
Let's start with one Form 13 file to see how we can parse it with Generative AI.

In [None]:
from google.cloud import storage

storage_client = storage.Client()
bucket = storage_client.bucket('neo4j-datasets')
blob = bucket.blob('hands-on-lab/form13-raw/raw_2022-01-03_archives_edgar_data_1026200_0001567619-22-000057.txt')

inp_text = blob.download_as_string().decode()

We can take a look at the file.  Note that it is an oddball mix of XML, delimeted and fixed spacing formatting that no standard parser could make sense of.

In [None]:
print(inp_text[:1500])

We can split data into manager and filing info pieces using `<XML>` tags

In [None]:
contents = inp_text.split('<XML>')
manager_info = contents[1].split('</XML>')[0].strip()
filing_info = contents[2].split('</XML>')[0].strip()

## Parsing Manager Information

In [None]:
import vertexai

vertexai.init()

In [None]:
from string import Template

prompt = Template(mgr_info_tpl).substitute(ctext=manager_info)
print(prompt)

In [None]:
import json

# Use LLM to parse out manager information
manager_data = json.loads(extract_entities_relationships(prompt))
manager_data

## Parse Filing Information
We will parse filing info in a similar manner to manager information. Because the filings include a list of many entries however, we will want to split the input into chunks so as not to exceed input or output token limits. 

In [None]:
filing_info_chunks = split_filing_info(filing_info)
len(filing_info_chunks)

In [None]:
prompt = Template(filing_info_tpl).substitute(ctext=filing_info_chunks[0])
response = json.loads(extract_entities_relationships(prompt))
print(response)

## Test Example

Let's walk through the steps to do this with just the 1 form above first, then we can move on to parsing and ingesting multiple form13s

To start we can run the LLM parsing over all the filing info from the form and then combine the resulting JSON into a list conducive for Neo4j loading.

In [None]:
filings_list = []
import time
for filing_info_chunk in filing_info_chunks:
    prompt = Template(filing_info_tpl).substitute(ctext=filing_info_chunk)
    response = extract_entities_relationships(prompt)
    # time.sleep(2) #uncomment this line if you face any rate limit error
    if '```' in response:
        response = response.split('```')[1].strip('json')
    filings_list.extend(json.loads(response))

for item in filings_list:
    item['managerName'] = manager_data['managerName']
    item['reportCalendarOrQuarter'] = manager_data['reportCalendarOrQuarter']
filings_list[:5]

In [None]:
len(filings_list)

## Establish Neo4j Connection

In [None]:
# Username is neo4j by default
NEO4J_USERNAME = 'neo4j'

# YOU WILL NEED TO CHANGE THESE to match the credentials in the file you downloaded when you deployed your Aura instance
NEO4J_URI = 'neo4j+s://your_instance_host_name'
NEO4J_PASSWORD = 'your_neo4j_password'


# Save instance authentication details to a file for later use
connection_filename = "../aura_connection.txt"
export_text = ""
export_text += "NEO4J_URI = " + NEO4J_URI + "\n"
export_text += "NEO4J_USERNAME = " + NEO4J_USERNAME + "\n"
export_text += "NEO4J_PASSWORD = " + NEO4J_PASSWORD + "\n"

f = open(connection_filename, "w")
f.write(export_text)
f.close()

In [None]:
from graphdatascience import GraphDataScience

gds = GraphDataScience(
    NEO4J_URI,
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD),
    aura_ds=True
)
gds.set_database('neo4j')

Before loading, we should create node key constraints for nodes.  This acts as a unique id and an index and is necessary for fast, efficient queries.  In general, if you notice ingestion is super slow (and getting slower) with Neo4j, double-check that you created indexes.  For this small sample, it won't matter, but it will undoubtedly impact as we ingest more data. 

In [None]:
gds.run_cypher('CREATE CONSTRAINT unique_manager IF NOT EXISTS FOR (n:Manager) REQUIRE (n.managerName) IS NODE KEY')
gds.run_cypher('CREATE CONSTRAINT unique_company_id IF NOT EXISTS FOR (n:Company) REQUIRE (n.cusip) IS NODE KEY')

To merge the data, we can use parameterized Cypher queries.  Basically, we will send filings in batches (in this sample case, just one batch) for each node and relationship type and insert them as parameters in the query.

In [None]:
# Merge company nodes
gds.run_cypher('''
UNWIND $records AS record
MERGE (c:Company {cusip: record.cusip})
SET c.companyName = record.companyName
RETURN count(c) AS company_node_merge_count
''', params={'records':filings_list})

In [None]:
# Merge manager node
gds.run_cypher('''
MERGE (m:Manager {managerName: $name})
RETURN count(m) AS manager_node_merge_count
''', params={'name':manager_data['managerName']})

In [None]:
# Merge owns Relationship
gds.run_cypher('''
UNWIND $records AS record
MATCH (m:Manager {managerName: record.managerName})
MATCH (c:Company {cusip: record.cusip})
MERGE(m)-[r:OWNS]->(c)
SET r.reportCalendarOrQuarter = date(datetime({epochmillis: apoc.date.parse(record.reportCalendarOrQuarter, "ms", "MM-dd-yyyy")})),
    r.value = record.value,
    r.shares = record.shares
RETURN count(r) AS owns_relationship_merge_count
''', params={'records':filings_list})

## Ingest Multiple Form 13 Files
We will make a pipeline using the methods above.  In this case we will take a two-step approach, first parse all the data, then chunk that data and ingest into Neo4j.

For purposes of this lab we will just use a few form13 files.

In [None]:
### If you have time to parse more files, you can uncomment these lines.
sample_file_names = [
   'hands-on-lab/form13-raw/raw_2022-01-03_archives_edgar_data_1844571_0001844571-22-000001.txt',
   'hands-on-lab/form13-raw/raw_2022-01-03_archives_edgar_data_1875995_0001875995-22-000004.txt',
   'hands-on-lab/form13-raw/raw_2022-01-06_archives_edgar_data_1495703_0001495703-22-000002.txt'
]

In [None]:
# Helper function for getting filing info
def get_manager_and_filing_info(raw_txt):
    contents = raw_txt.split('<XML>')
    manager_info = contents[1].split('</XML>')[0].strip()
    filing_info = contents[2].split('</XML>')[0].strip()
    
    return manager_info, filing_info

In [None]:
%%time

print(f'=== Parsing {len(sample_file_names)} Form 13 Files ===')

filings_list = []
manager_list = []

for file_name in sample_file_names:
    
    print(f'--- parsing {file_name} ---')
    try:
        # Get raw form13 file
        print('getting file text from gcloud....')
        blob = bucket.blob(file_name)
        raw_text = blob.download_as_string().decode()

        # Get raw manager and filing info from file
        print('getting file contents...')
        manager_info, filing_info = get_manager_and_filing_info(raw_text)

        # Parse manager info into dict using LLM
        print('Parsing submission and manager info...')
        mng_prompt = Template(mgr_info_tpl).substitute(ctext=manager_info)
        mng_response = extract_entities_relationships(mng_prompt)
        manager_data = json.loads(mng_response.replace('```', ''))
        manager_list.append({'managerName': manager_data['managerName']})

        # Parse filing info into list of dicts using LLM
        print('Parsing filing info...')
        tmp_filing_list = []
        for filing_info_chunk in split_filing_info(filing_info):
            filing_prompt = Template(filing_info_tpl).substitute(ctext=filing_info_chunk)
            filing_response = extract_entities_relationships(filing_prompt)
            #time.sleep(3) #uncomment this line if you face any rate limit error
            if '```' in filing_response:
                filing_response = filing_response.split('```')[1].strip('json')
            tmp_filing_list.extend(json.loads(filing_response))
        for item in tmp_filing_list: #Add information from manager_info to enable OWNS relationship loading
            item['managerName'] = manager_data['managerName']
            item['reportCalendarOrQuarter'] = manager_data['reportCalendarOrQuarter']
        filings_list.extend(tmp_filing_list)
    except Exception as e:
        print(filing_response)
        raise e


Now we can merge the manager nodes

In [None]:
# Merge manager nodes
gds.run_cypher('''
UNWIND $records AS record
MERGE (m:Manager {managerName: record.managerName})
RETURN count(m) AS manager_node_merge_count
''', params={'records':manager_list})

For filings lets check ther length of the list

In [None]:
len(filings_list)

While we should not need chunking for this example, below is an example of how to chunk up a parameterized function for loading in case you need to scale up. 

In [None]:
# As the dataset gets bigger we will want to chunk up the filings we send to Neo4j
def chunks(xs, n=10_000):
    n = max(1, n)
    return [xs[i:i + n] for i in range(0, len(xs), n)]

In [None]:
# Merge company nodes
for d in chunks(filings_list):
    res = gds.run_cypher('''
    UNWIND $records AS record
    MERGE (c:Company {cusip: record.cusip})
    SET c.companyName = record.companyName
    RETURN count(c) AS company_node_merge_count
    ''', params={'records':d})
    print(res)

In [None]:
# Merge owns Relationships
for d in chunks(filings_list):
    res = gds.run_cypher('''
    UNWIND $records AS record
    MATCH (m:Manager {managerName: record.managerName})
    MATCH (c:Company {cusip: record.cusip})
    MERGE(m)-[r:OWNS]->(c)
    SET r.reportCalendarOrQuarter = date(datetime({epochmillis: apoc.date.parse(record.reportCalendarOrQuarter, "ms", "MM-dd-yyyy")})),
        r.value = record.value,
        r.shares = record.shares
    RETURN count(r) AS owns_relationship_merge_count
    ''', params={'records':d})
    print(res)

This type of workflow can be applied to other unstructured data to parse entities and relationships with language models and load them into a Neo4j knowledge graph. 