# 📊 Amazon XBRL to Amazon Neptune GraphDB Pipeline

In [1]:
!pip install rdflib boto3 gremlinpython arelle --quiet


In [None]:
# XBRL to Amazon Neptune Graph Transformation Pipeline with Focused Business View
# ---------------------------------------------------------------------
# ✅ Full working pipeline with improved visualization:
#    - Extracts XBRL financial metrics (Revenue, Assets)
#    - Adds temporal (nextYear) linkbase
#    - Uploads to Amazon Neptune
#    - Visualizes focused graph (10–20 nodes) with clear linkbase edges and descriptions
#    - Exports the graph visualization to PNG

import os
import json
import xml.etree.ElementTree as ET
from rdflib import Graph, Literal, Namespace, RDF, URIRef
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.structure.graph import Graph as GremlinGraph
from gremlin_python.driver.aiohttp.transport import AiohttpTransport
import ssl
import networkx as nx
import matplotlib.pyplot as plt
import nest_asyncio
import asyncio

nest_asyncio.apply()

# --- Step 1: Parse XBRL instance ---
INSTANCE_FILE = "amzn-20241231_htm.xml"

# --- Step 2: RDF Graph Setup ---
rdf_graph = Graph()
EX = Namespace("http://example.org/xbrl/")
rdf_graph.bind("ex", EX)

# --- Step 3: Extract Facts ---
tree = ET.parse(INSTANCE_FILE)
root = tree.getroot()
contexts = {}

for elem in root.findall("{http://www.xbrl.org/2003/instance}context"):
    context_id = elem.attrib['id']
    period = elem.find("{http://www.xbrl.org/2003/instance}period")
    end = period.find("{http://www.xbrl.org/2003/instance}endDate")
    if end is not None:
        contexts[context_id] = end.text[:4]

for fact in root:
    if fact.tag.startswith("{http://fasb.org/us-gaap/2024}"):
        name = fact.tag.split("}")[1]
        context_id = fact.attrib.get('contextRef')
        value = fact.text
        year = contexts.get(context_id)
        if year and value:
            fact_uri = URIRef(EX[f"{name}_{year}"])
            rdf_graph.add((fact_uri, RDF.type, EX.FinancialMetric))
            rdf_graph.add((fact_uri, EX.concept, Literal(name)))
            rdf_graph.add((fact_uri, EX.year, Literal(year)))
            rdf_graph.add((fact_uri, EX.value, Literal(value)))

# --- Step 4: Add Temporal + Simulated Linkbase Relations ---
for concept in ['Revenue', 'OperatingIncomeLoss', 'NetIncomeLoss', 'Assets']:
    for y1, y2 in [('2022', '2023'), ('2023', '2024')]:
        rdf_graph.add((URIRef(EX[f"{concept}_{y1}"]), EX.nextYear, URIRef(EX[f"{concept}_{y2}"])))

rdf_graph.add((URIRef(EX["Revenue_2024"]), EX.labelLinkbase, URIRef(EX["Label_Revenue"])))
rdf_graph.add((URIRef(EX["Revenue_2024"]), EX.presentationLinkbase, URIRef(EX["Presentation_IncomeStatement"])))
rdf_graph.add((URIRef(EX["Revenue_2024"]), EX.calculationLinkbase, URIRef(EX["Calc_TotalRevenue"])))
rdf_graph.add((URIRef(EX["Revenue_2024"]), EX.definitionLinkbase, URIRef(EX["Def_RevenueComponent"])))

# --- Step 5: Export RDF ---
rdf_graph.serialize("xbrl_neptune_output.ttl", format="turtle")
print("✅ RDF Exported to xbrl_neptune_output.ttl")

# --- Step 6: Neptune Upload ---
NEPTUNE_ENDPOINT = "wss://db-neptune-1.cluster-cuzxvxpbce3r.us-east-1.neptune.amazonaws.com:8182/gremlin"
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
transport = AiohttpTransport(ssl_context=ssl_ctx)

connection = DriverRemoteConnection(NEPTUNE_ENDPOINT, "g", transport_factory=lambda: transport)
gsql = GremlinGraph().traversal().withRemote(connection)

# --- Step 7: Clear Graph ---
gsql.V().drop().iterate()
print("🧹 Cleared previous Neptune graph")

# --- Step 8: Push RDF Nodes ---
for s, p, o in rdf_graph:
    s_id = str(s).split("/")[-1]
    v = gsql.addV("Fact").property("id", s_id)
    if p == RDF.type:
        v = v.property("type", str(o).split("/")[-1])
    elif p == EX.concept:
        v = v.property("concept", str(o))
    elif p == EX.year:
        v = v.property("year", str(o))
    elif p == EX.value:
        v = v.property("value", str(o))
    v.next()

# --- Step 9: Push RDF Edges ---
for s, p, o in rdf_graph:
    if isinstance(o, URIRef) and p in [EX.nextYear, EX.labelLinkbase, EX.presentationLinkbase, EX.calculationLinkbase, EX.definitionLinkbase]:
        from_id = str(s).split("/")[-1]
        to_id = str(o).split("/")[-1]
        label = p.split("/")[-1]
        try:
            gsql.V().has("id", from_id).as_('a')\
                .V().has("id", to_id)\
                .addE(label).from_('a').next()
        except StopIteration:
            print(f"⚠️ Skipped missing edge: {from_id} -> {to_id} ({label})")

# --- Step 10: Close Neptune Connection ---
if hasattr(gsql, "remote_connection") and gsql.remote_connection:
    session = gsql.remote_connection._transport._client_session
    gsql.remote_connection.close()
    await session.close()
print("✅ Graph pushed to Amazon Neptune")

# --- Step 11: Visualize and Export Subgraph ---
G = nx.DiGraph()
for s, p, o in rdf_graph:
    s_label = str(s).split("/")[-1]
    o_label = str(o).split("/")[-1] if isinstance(o, URIRef) else str(o)
    if p == RDF.type:
        G.add_node(s_label, label='Metric', color='lightblue', desc='Financial metric node')
    elif p == EX.concept:
        G.add_node(s_label, label='Metric', color='lightblue')
        G.nodes[s_label]['concept'] = o_label
        G.nodes[s_label]['desc'] = f"Concept: {o_label}"
    elif p == EX.year:
        if s_label in G.nodes:
            G.nodes[s_label]['year'] = o_label
            G.nodes[s_label]['desc'] = G.nodes[s_label].get('desc', '') + f", Year: {o_label}"
    elif p == EX.value:
        if s_label in G.nodes:
            G.nodes[s_label]['value'] = o_label
            G.nodes[s_label]['desc'] = G.nodes[s_label].get('desc', '') + f", Value: {o_label}"
    elif p == EX.nextYear:
        G.add_edge(s_label, o_label, label='nextYear', color='green')
    elif p == EX.labelLinkbase:
        G.add_edge(s_label, o_label, label='label', color='red')
    elif p == EX.presentationLinkbase:
        G.add_edge(s_label, o_label, label='presentation', color='purple')
    elif p == EX.calculationLinkbase:
        G.add_edge(s_label, o_label, label='calculation', color='blue')
    elif p == EX.definitionLinkbase:
        G.add_edge(s_label, o_label, label='definition', color='orange')

focus_concepts = ["Revenue", "Assets"]
extra_nodes = ["Label_Revenue", "Presentation_IncomeStatement", "Calc_TotalRevenue", "Def_RevenueComponent"]
focus_nodes = [n for n in G.nodes if any(n.startswith(f"{c}_") for c in focus_concepts)] + extra_nodes
subG = G.subgraph(focus_nodes).copy()

pos = nx.spring_layout(subG, seed=42)
node_colors = [subG.nodes[n].get('color', 'gray') for n in subG.nodes]
edge_colors = [d['color'] for _, _, d in subG.edges(data=True)]
edge_labels = {(u, v): d['label'] for u, v, d in subG.edges(data=True)}

plt.figure(figsize=(14, 9))
nx.draw(subG, pos, with_labels=False, node_color=node_colors, edge_color=edge_colors, font_size=9, node_size=1800)
nx.draw_networkx_edge_labels(subG, pos, edge_labels=edge_labels, font_color='black')
node_labels = {n: f"{n}\n{subG.nodes[n].get('desc','')}" for n in subG.nodes}
nx.draw_networkx_labels(subG, pos, labels=node_labels, font_size=8)
plt.title("📊 Focused Graph: Revenue & Assets 2022–2024 + Linkbases with Descriptions")
plt.axis("off")
plt.tight_layout()
plt.savefig("xbrl_graph_visualization.png", dpi=300)
plt.show()
print("📸 Saved image as xbrl_graph_visualization.png")


✅ RDF Exported to xbrl_neptune_output.ttl


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000026374E7D7C0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000263707C2CF0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000263707C2C30>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000263707BAE40>


🧹 Cleared previous Neptune graph


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000263707BB080>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000263707BAD80>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000263707BAD50>
