![image.png](./mag_schema.png)

In [1]:
%%capture
%pip install google-cloud-bigquery-storage pyarrow

In [2]:
from itertools import chain

import neo4j_arrow as na
import neo4j_bq as bq

## Setting up our BigQuery Integration

BigQuery already natively supports reading tables as streams of PyArrow Tables.
We just need to use the _Storage API_!

Our BigQuery dataset includes tables for the nodes and tables for the edges of
our graph. We'll define the schema using Python dictionaries so our loader
knows things like which table field is the node key and which fields define
the source/target node ids for the edges.

In [3]:
mag240m = bq.BigQuerySource("neo4jgraphconnectdemo2022", "ogb_mag240m")

nodes = [
    { "key": "paper", "streams": mag240m.table("papers") },
    { "key": "author", "streams": mag240m.table("authors_heterogenous") },
    { "key": "institution", "streams": mag240m.table("institution_heterogenous") },
]

edges = [
    {
        "src": "source", "dst": "target", "type": "type",
        "streams": mag240m.table("citations"),
    },
    {
        "src": "author", "dst": "paper", "type": "type",
        "streams": mag240m.table("authorship"), 
    },
    {
        "src": "author", "dst": "institution" , "type": "type",
        "streams": mag240m.table("affiliation"), 
    },
]

# Just count up the number of BigQuery streams so we know what we're up against
stream_cnt = sum([
    len(list(x)) for x in chain(
        map(chain, [n["streams"] for n in nodes]),
        map(chain, [e["streams"] for e in edges])
    )
])
print(f"Prepared {stream_cnt:,} BigQuery streams for ingestion")

Prepared 1,549 BigQuery streams for ingestion


## Ingestion

We build little work tasks for our worker pool.

Each task contains the BigQuery stream url as well as schema details.

In [4]:
node_work = bq.flatten(nodes, bq.streams_from)
edge_work = bq.flatten(edges, bq.streams_from)

len(node_work), len(edge_work)

(201, 1348)

We'll load our super secret password from a local file and configure our Neo4j
Arrow Connector. Here we just point at our Neo4j server and give our (soon to
be created) graph a name.

In [5]:
with open('pass.txt', mode='r') as f:
    password = f.readline().strip()

client = na.Neo4jArrowClient('demo2.graphconnect.app', password=password, concurrency=224)

print(f"Created {client}")

Created Neo4jArrowClient{neo4j@demo2.graphconnect.app:8491/gcdemo}


### Let's Build the Graph!

Now that we have our `Neo4jArrowClient` and our `BigQuerySource`, we can
orchestrate our graph load.

![image.png](./arrow_flow.png)

#### 1. Start!
We send our `CREATE_GRAPH` signal, letting Neo4j know we are beginning our load.

In [6]:
msg = client.start()
print(f"✅ Ready to load nodes for {msg['name']}")

✅ Ready to load nodes for gcdemo


#### 2a. Send our Nodes!
We can send data in parallel to speed up ingestion so we'll fan out the node
streams to numerous workers processes.

Each worker will receive a copy of the `Neo4jArrowClient` config use it to
stream data to the Neo4j graph.

In [7]:
node_results, timing = bq.fan_out(client, node_work)

total_nodes = sum([x["rows"] for x in node_results])
total_bytes = sum([x["bytes"] for x in node_results])

node_rate = int(total_nodes / timing)
data_rate = int(total_bytes / timing) >> 20
print(f"Sent: {total_nodes:,} nodes in {round(timing, 2)}s (~{node_rate:,} nodes/s, ~{data_rate} MiB/s)")

Using: 🚀 Neo4jArrowClient{neo4j@demo2.graphconnect.app:8491/gcdemo}
Spawning 83 workers 🧑‍🏭 to process 201 tasks 📋
⚙️ Loading: [➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶]


Sent: 244,160,499 nodes in 12.12s (~20,138,399 nodes/s, ~489 MiB/s)


🏁 Completed in 12.12s


#### 2b. Signal Nodes are Done!
We can now let Neo4j know we're done streaming nodes before moving onto edges.

In [8]:
print("Signalling we're done loading nodes...\n")
msg = client.nodes_done()
print(f"✅ Loaded {msg['node_count']:,} nodes. Ready for edges!")

Signalling we're done loading nodes...

✅ Loaded 244,160,499 nodes. Ready for edges!


#### 3a. Feed Edges
Time to stream the edges! Just like before, fan out the workload across a pool
of worker processes.

In [9]:
edge_results, timing = bq.fan_out(client, edge_work)
        
total_edges = sum([x["rows"] for x in edge_results])
total_bytes = sum([x["bytes"] for x in edge_results])

edge_rate = int(total_edges / timing)
data_rate = int(total_bytes / timing) >> 20
print(f"Sent: {total_edges:,} edges in {round(timing, 2)}s (~{edge_rate:,} edges/s, ~{data_rate} MiB/s)")

Using: 🚀 Neo4jArrowClient{neo4j@demo2.graphconnect.app:8491/gcdemo}
Spawning 83 workers 🧑‍🏭 to process 1,348 tasks 📋
⚙️ Loading: [➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶➶]


Sent: 1,728,364,232 edges in 55.97s (~30,878,751 edges/s, ~763 MiB/s)


🏁 Completed in 55.97s


#### 3b. And We're Done!
Signalling we're finished loading edges lets Neo4j know it's safe to expose the
graph in the graph catalog.

In [10]:
print("Awaiting graph creation...\n")
msg = client.edges_done()

print(f"✅ Loaded {msg['relationship_count']:,} edges.")
print("😁 Happy Graphing! 🥳")

Awaiting graph creation...

✅ Loaded 1,728,364,232 edges.
😁 Happy Graphing! 🥳
