# Parse Data

Original notebook: https://github.com/neo4j-partners/hands-on-lab-neo4j-and-vertex-ai/blob/main/Lab%205%20-%20Parsing%20Data/parsing-data.ipynb

Note:
- Library 'graphdatascience' requires pyarrow, which is compatible with up to Python 3.11.

## Prompt Definition

In [None]:
mgr_info_tpl = """From the text below, extract the following as json. Do not miss any of these information.
* The tags mentioned below may or may not namespaced. So extract accordingly. Eg: <ns1:tag> is equal to <tag>
* "managerName" - The name from the <name> tag under <filingManager> tag
* "street1" - The manager's street1 address from the <com:street1> tag under <address> tag
* "street2" - The manager's street2 address from the <com:street2> tag under <address> tag
* "city" - The manager's city address from the <com:city> tag under <address> tag
* "stateOrCounty" - The manager's stateOrCounty address from the <com:stateOrCountry> tag under <address> tag
* "zipCode" - The manager's zipCode from the <com:zipCode> tag under <address> tag
* "reportCalendarOrQuarter" - The reportCalendarOrQuarter from the <reportCalendarOrQuarter> tag under <address> tag
* Just return me the JSON enclosed by 3 backticks. No other text in the response

Text:
$ctext
"""

In [None]:
filing_info_tpl = """The text below contains a list of investments. Each instance of <infoTable> tag represents a unique investment. 
For each investment, please extract the below variables into json then combine into a list enclosed by 3 backticks. Please use the quoted names below while doing this
* "cusip" - The cusip from the <cusip> tag under <infoTable> tag
* "companyName" - The name under the <nameOfIssuer> tag.
* "value" - The value from the <value> tag under <infoTable> tag. Return as a number. 
* "shares" - The sshPrnamt from the <sshPrnamt> tag under <infoTable> tag. Return as a number. 
* "sshPrnamtType" - The sshPrnamtType from the <sshPrnamtType> tag under <infoTable> tag
* "investmentDiscretion" - The investmentDiscretion from the <investmentDiscretion> tag under <infoTable> tag
* "votingSole" - The votingSole from the <votingSole> tag under <infoTable> tag
* "votingShared" - The votingShared from the <votingShared> tag under <infoTable> tag
* "votingNone" - The votingNone from the <votingNone> tag under <infoTable> tag

Output format:
* DO NOT output XML tags in the response. The output should be a valid JSON list enclosed by 3 backticks

Text:
$ctext
"""

## Functions for using LLMs

In [None]:
from vertexai.language_models import TextGenerationModel

def run_text_model(
    model_name: str,
    temperature: float,
    max_decode_steps: int,
    top_p: float,
    top_k: int,
    prompt: str,
    tuned_model_name: str=None, 
):
    """
    Text Completion Use a Large Language Model.
    """
    if not tuned_model_name:
        model = TextGenerationModel.from_pretrained(model_name)
    else:
        model = model.get_tuned_model(tuned_model_name)
        
    response = model.predict(
        prompt,
        temperature=temperature,
        max_output_tokens=max_decode_steps,
        top_k=top_k,
        top_p=top_p
    )

    return response.text

In [None]:
def extract_entities_relationships(prompt, tuned_model_name=None):
    """Wrapper for entity extraction and parsing"""
    try:
        res = run_text_model(
            'text-bison@001',
            0,
            1024,
            0.8,
            1,
            prompt,
            tuned_model_name
        )

        return res 
    except Exception as e:
        print(e)

In [None]:
import re
import numpy as np 

def split_filing_info(s, chunk_size=5):
    """splitting function for chunking up filing information to avoid hitting LLM token limits."""
    pattern = r'(</(\w+:)?infoTable>)'
    splitter = re.findall(pattern, s)[0][0]
    _parts = s.split(splitter)

    if len(_parts) > chunk_size:
        chunks_of_list = np.array_split(_parts, len(_parts)/chunk_size) # max 5 filings per part
        chunks_of_str = map(lambda x: splitter.join(x)+splitter, chunks_of_list)
        l = list(chunks_of_str)
        if len(l) > 0:
            l[len(l)-1] = re.sub(f'{splitter}$', '', l[len(l)-1])
        return l 
    else:
        return [s]

## Establish Neo4j Connection

In [None]:
# Update accordingly

NEO4J_USERNAME = ''
NEO4J_PASSWORD = '' 
NEO4J_URI = ''

In [None]:
from graphdatascience import GraphDataScience

gds = GraphDataScience(
    NEO4J_URI,
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD),
    aura_ds=True
)
gds.set_database('neo4j')

In [None]:
gds.run_cypher('CREATE CONSTRAINT unique_manager IF NOT EXISTS FOR (n:Manager) REQUIRE (n.managerName) IS NODE KEY')
gds.run_cypher('CREATE CONSTRAINT unique_company_id IF NOT EXISTS FOR (n:Company) REQUIRE (n.cusip) IS NODE KEY')

## Ingest Form 13 Files

In [None]:
### If you have time to parse more files, you can uncomment these lines.
sample_file_names = [
   'hands-on-lab/form13-raw/raw_2022-01-03_archives_edgar_data_1844571_0001844571-22-000001.txt',
   'hands-on-lab/form13-raw/raw_2022-01-03_archives_edgar_data_1875995_0001875995-22-000004.txt',
   'hands-on-lab/form13-raw/raw_2022-01-06_archives_edgar_data_1495703_0001495703-22-000002.txt'
]

In [None]:
def get_manager_and_filing_info(raw_txt):
    """Helper function for getting filing info"""
    contents = raw_txt.split('<XML>')
    manager_info = contents[1].split('</XML>')[0].strip()
    filing_info = contents[2].split('</XML>')[0].strip()
    
    return manager_info, filing_info

In [None]:
from google.cloud import storage

storage_client = storage.Client()
bucket = storage_client.bucket('neo4j-datasets')

In [None]:
%%time

from string import Template
import json

print(f'=== Parsing {len(sample_file_names)} Form 13 Files ===')

filings_list = []
manager_list = []

for file_name in sample_file_names:
    
    print(f'--- parsing {file_name} ---')
    try:
        # Get raw form13 file
        print('getting file text from gcloud....')
        blob = bucket.blob(file_name)
        raw_text = blob.download_as_string().decode()

        # Get raw manager and filing info from file
        print('getting file contents...')
        manager_info, filing_info = get_manager_and_filing_info(raw_text)

        # Parse manager info into dict using LLM
        print('Parsing submission and manager info...')
        mng_prompt = Template(mgr_info_tpl).substitute(ctext=manager_info)
        mng_response = extract_entities_relationships(mng_prompt)
        manager_data = json.loads(mng_response.replace('```', ''))
        manager_list.append({'managerName': manager_data['managerName']})

        # Parse filing info into list of dicts using LLM
        print('Parsing filing info...')
        tmp_filing_list = []
        for filing_info_chunk in split_filing_info(filing_info):
            filing_prompt = Template(filing_info_tpl).substitute(ctext=filing_info_chunk)
            filing_response = extract_entities_relationships(filing_prompt)
            if '```' in filing_response:
                filing_response = filing_response.split('```')[1].strip('json')
            tmp_filing_list.extend(json.loads(filing_response))
        for item in tmp_filing_list: #Add information from manager_info to enable OWNS relationship loading
            item['managerName'] = manager_data['managerName']
            item['reportCalendarOrQuarter'] = manager_data['reportCalendarOrQuarter']
        filings_list.extend(tmp_filing_list)
    except Exception as e:
        print(filing_response)
        raise e

Merge the manager node

In [None]:
gds.run_cypher('''
UNWIND $records AS record
MERGE (m:Manager {managerName: record.managerName})
RETURN count(m) AS manager_node_merge_count
''', params={'records':manager_list})

For filings lets check ther length of the list

In [None]:
len(filings_list)

While we should not need chunking for this example, below is an example of how to chunk up a parameterized function for loading in case you need to scale up

In [None]:
# As the dataset gets bigger we will want to chunk up the filings we send to Neo4j
def chunks(xs, n=10_000):
    n = max(1, n)
    return [xs[i:i + n] for i in range(0, len(xs), n)]

Merge company nodes

In [None]:
for d in chunks(filings_list):
    res = gds.run_cypher('''
    UNWIND $records AS record
    MERGE (c:Company {cusip: record.cusip})
    SET c.companyName = record.companyName
    RETURN count(c) AS company_node_merge_count
    ''', params={'records':d})
    print(res)

Merge 'owns' relationship

In [None]:
for d in chunks(filings_list):
    res = gds.run_cypher('''
    UNWIND $records AS record
    MATCH (m:Manager {managerName: record.managerName})
    MATCH (c:Company {cusip: record.cusip})
    MERGE(m)-[r:OWNS]->(c)
    SET r.reportCalendarOrQuarter = record.reportCalendarOrQuarter,
        r.value = record.value,
        r.shares = record.shares
    RETURN count(r) AS owns_relationship_merge_count
    ''', params={'records':d})
    print(res)

This type of workflow can be applied to other unstructured data to parse entities and relationships with language models and load them into a Neo4j knowledge graph.