# Data Loaders

This notebook demonstrates the use of **data loaders** in `pyTigerGraph`. The job of a data loader is to pull data from the TigerGraph database to the machine that runs the data loaders. Currently, the following data loaders are provided:
* EdgeLoader, which returns batches of edges.
* VertexLoader, which returns batches of vertices.
* GraphLoader, which returns randomly sampled (probably disconnected) subgraphs in pandas `dataframe`, `PyG` or `DGL` format.
* NeighborLoader, which returns subgraphs as sampled in [GraphSAGE](https://arxiv.org/abs/1706.02216) in `dataframe`, `PyG` or `DGL` format.
* EdgeNeighborLoader, which returns subgraphs using neighbor sampling from edges in `dataframe`, `PyG` or `DGL` format.
* HGTLoader, which works similarly as NeighborLoader but performs stratified neighbor sampling as in [Heterogeneous Graph Transformer](https://arxiv.org/abs/2003.01332).

Every data loader above can either get all the batches as a HTTP response (default) or stream every batch through Kafka. The former mechanism is good for testing with small graphs and it is fast, but it subjects to a data size limit of 2GB. For large graphs, the HTTP channel will likely fail due to size limit and network connectivity issues. Streaming via Kafka is offered for data robustness and scalability. Also, Kafka excels at multi-consumer use cases, and is required by distributed loader groups. 

The data loaders support both homogeneous and heterogenous graphs. By default, they load from all vertex and edge types and treat the graph as a homogeneous graph. But they also allow users to specify what vertex and edge types to load from and what attributes to load from each type. This way users will get heterogeneous graph outputs.

**NOTE**: Currently, your database needs to be activated (only once) to work with the data loaders. If you are using the ML Workbench on tgCloud, then the bundled database is activated. Otherwise, you can download the activator at https://act.tigergraphlabs.com. Detailed instructions are also included on that website. 

Below we will use `NeighborLoader` for illustration. The other data loaders work similarly but use slightly different parameters. Please refer to the [reference doc](https://docs.tigergraph.com/pytigergraph/current/gds/gds) on details for each data loader.

## Table of Contents
* [Connection to Database](#connection) 
* [Neighbor Loader](#neighborloader)
  * [Homogeneous Graph](#homogeneous)
  * [Heterogeneous Graph](#heterogeneous)
* [Streaming through Kafka](#kafka)  

## Connection to Database <a name="connection"></a>

The `TigerGraphConnection` class represents a connection to the TigerGraph database. Under the hood, it stores the necessary information to communicate with the database. It is able to perform quite a few database tasks. Please see its [documentation](https://docs.tigergraph.com/pytigergraph/current/intro/) for details.

To connect your database, modify the `config.json` file accompanying this notebook. Set the value of `getToken` based on whether token auth is enabled for your database. Token auth is always enabled for tgcloud databases. 

In [1]:
from pyTigerGraph import TigerGraphConnection
import json

# Read in DB configs
with open('../config.json', "r") as config_file:
    config = json.load(config_file)
    
conn = TigerGraphConnection(
    host=config["host"],
    username=config["username"],
    password=config["password"]
)

### Ingest Data

In [2]:
from pyTigerGraph.datasets import Datasets

dataset = Datasets("Cora")

A folder with name Cora already exists in ./tmp. Skip downloading.


In [3]:
conn.ingestDataset(dataset, getToken=config["getToken"])

---- Checking database ----
A graph with name Cora already exists in the database. Skip ingestion.
Graph name is set to Cora for this connection.


### Visualize Schema

In [4]:
from pyTigerGraph.visualization import drawSchema

drawSchema(conn.getSchema(force=True))

CytoscapeWidget(cytoscape_layout={'name': 'circle', 'animate': True, 'padding': 1}, cytoscape_style=[{'selecto…

### Basic Statistics

In [5]:
# Number of vertices for every vertex type
conn.getVertexCount()

{'Paper': 2708}

In [6]:
# Number of edges for every type
conn.getEdgeCount()

{'Cite': 10556}

## Neighbor Loader <a name="neighborloader"></a>

### Homogeneous Graph <a name="homogeneous"></a>

`NeighborLoader` performs neighbor sampling as introduced in [Inductive Representation Learning on Large Graphs](https://arxiv.org/abs/1706.02216) and returns neighborhood subgraphs. Hence, the subgraphs from this loader are connected. 

Specifically, the loader first chooses `batch_size` number of vertices as seeds, then picks `num_neighbors` number of neighbors of each seed at random, then `num_neighbors` neighbors of each neighbor, and repeat for `num_hops`. This generates one subgraph. As you loop through this data loader, every vertex will at some point be chosen as a seed and you will get the subgraph expanded from the seed. If you want to limit seeds to certain vertices, the boolean attribute provided to `filter_by` will be used to indicate which vertices can be included as seeds. 

The loader can be created by calling the `.gds.neighborLoader()` function on the DB connection object. Key parameters to the data loader are
* batch_size, num_neighbors, num_hops: they are for the neighbor sampling process as described above.
* v_in_feats, v_out_labels, v_extra_feats: those dictate which vertex attributes will be pulled from the database. They can be omitted if no vertex attribute is needed.
* e_in_feats, e_out_labels, e_extra_feats: similar as above but for edge attributes.
* output_format: format of the output graph. "PyG", "DGL", "spektral", and "dataframe" are supported.
* shuffle: whether to shuffle the vertices before loading data.
* filter_by: a boolean attribute indicating which vertices can be included as seeds.

For details on those parameters and the complete parameter list, please refer to the [doc](https://docs.tigergraph.com/pytigergraph/current/gds/gds#_neighborloader).

**Note**: For the first time you initialize the loader on a graph in TigerGraph,
the initialization might take a minute as it installs the corresponding
query to the database and optimizes it. However, the query installation only
needs to be done once, so it will take no time when you initialize the loader
on the same graph again.

In [7]:
%%time
neighbor_loader = conn.gds.neighborLoader(
    batch_size=16,
    num_neighbors = 10,
    num_hops =2,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=True,
    filter_by="train_mask"
)

CPU times: user 1.81 s, sys: 154 ms, total: 1.97 s
Wall time: 2 s


There are two ways to use this data loader. We will try the first one below as it is more common.
* First, it can be used as an iterable, which means you can loop through
  it to get every batch of data. If you load all data in one batch (`num_batches=1`),
  there will be only one batch (of all the data) in the iterator.
* Second, you can access the `data` property of the class directly. If there is
  only one batch of data to load, it will give you the batch directly instead
  of an iterator, which might make more sense in that case. If there are
  multiple batches of data to load, it will return the loader itself.

In [8]:
%%time
for i, batch in enumerate(neighbor_loader):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
Data(edge_index=[2, 407], edge_feat=[407], is_train=[407], is_val=[407], x=[271, 1433], y=[271], train_mask=[271], val_mask=[271], test_mask=[271], is_seed=[271])
----Batch 1----
Data(edge_index=[2, 362], edge_feat=[362], is_train=[362], is_val=[362], x=[222, 1433], y=[222], train_mask=[222], val_mask=[222], test_mask=[222], is_seed=[222])
----Batch 2----
Data(edge_index=[2, 316], edge_feat=[316], is_train=[316], is_val=[316], x=[220, 1433], y=[220], train_mask=[220], val_mask=[220], test_mask=[220], is_seed=[220])
----Batch 3----
Data(edge_index=[2, 395], edge_feat=[395], is_train=[395], is_val=[395], x=[257, 1433], y=[257], train_mask=[257], val_mask=[257], test_mask=[257], is_seed=[257])
----Batch 4----
Data(edge_index=[2, 445], edge_feat=[445], is_train=[445], is_val=[445], x=[282, 1433], y=[282], train_mask=[282], val_mask=[282], test_mask=[282], is_seed=[282])
----Batch 5----
Data(edge_index=[2, 381], edge_feat=[381], is_train=[381], is_val=[381], x=[254, 1433], y

### Heterogeneous Graph <a name="heterogeneous"></a>

If the code above were run on heterogeneous graphs, it would also work but would ignore vertex or edge types and output homogeneous subgraphs (it also requires that the desired attributes exist on all vertices/edges or an error will be thrown). If you need the output to be heterogeneous subgraphs, use the dict input for v_in_feats, v_out_labels, v_extra_feats, e_in_feats, e_out_labels, or e_extra_feats. Keys of the dict are vertex/edge types to be selected, and values are lists of attributes to be selected for the vertex/edge types. This also gives you fine control over what types of vertices/edges to be included in the sampling process and in the output. 

Although `Cora` is a homogeneous graph, we will use it to illustrate how to specify the input to get output as heterogeneous subgraphs. It is straightforward to replace this Cora graph with your heterogeneous graph.

In [9]:
%%time
neighbor_loader = conn.gds.neighborLoader(
    batch_size=16,
    num_neighbors = 10,
    num_hops =2,
    v_in_feats = {"Paper": ["x"]},
    v_out_labels = {"Paper": ["y"]},
    v_extra_feats = {"Paper": ["train_mask", "val_mask", "test_mask"]},
    e_in_feats={"Cite": ["time"]},
    e_extra_feats={"Cite": ["is_train", "is_val"]},
    output_format = "PyG",
    shuffle=True,
    filter_by={"Paper": "train_mask"}
)

Installing and optimizing queries. It might take a minute or two.
Query installation finished.
CPU times: user 55.8 ms, sys: 4.95 ms, total: 60.7 ms
Wall time: 1min 11s


In [10]:
%%time
for i, batch in enumerate(neighbor_loader):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
HeteroData(
  [1mPaper[0m={
    x=[278, 1433],
    y=[278],
    train_mask=[278],
    val_mask=[278],
    test_mask=[278],
    is_seed=[278]
  },
  [1m(Paper, Cite, Paper)[0m={
    edge_index=[2, 410],
    edge_feat=[410],
    is_train=[410],
    is_val=[410]
  }
)
----Batch 1----
HeteroData(
  [1mPaper[0m={
    x=[267, 1433],
    y=[267],
    train_mask=[267],
    val_mask=[267],
    test_mask=[267],
    is_seed=[267]
  },
  [1m(Paper, Cite, Paper)[0m={
    edge_index=[2, 451],
    edge_feat=[451],
    is_train=[451],
    is_val=[451]
  }
)
----Batch 2----
HeteroData(
  [1mPaper[0m={
    x=[252, 1433],
    y=[252],
    train_mask=[252],
    val_mask=[252],
    test_mask=[252],
    is_seed=[252]
  },
  [1m(Paper, Cite, Paper)[0m={
    edge_index=[2, 382],
    edge_feat=[382],
    is_train=[382],
    is_val=[382]
  }
)
----Batch 3----
HeteroData(
  [1mPaper[0m={
    x=[249, 1433],
    y=[249],
    train_mask=[249],
    val_mask=[249],
    test_mask=[249],
 

## Streaming through Kafka <a name="kafka"></a>

**Note**: Kafka streaming function is only available when the database is activated with the [Enterprise edition](https://docs.tigergraph.com/ml-workbench/current/editions/).

To stream data from your DB through Kafka to the machine that runs the data loader, the only extra step is to provide the required information about your Kafka cluster and everything else works as above. 

Note that TigerGraph doesn't provide the Kafka cluster but only use it as a channel to stream data. You can use most Kafka clusters of your choice. Each batch (subgraph) will be a message, and hence if you need relatively large batches and your Kafka cluster has replicas, the `replica.fetch.max.bytes` setting on the Kafka cluster has to be large enough to accommadate the batches. Also, if you create a topic manually for the data loader to use, the `max.message.bytes` setting of the topic has to be set large enough as well. If you let the data loader to manage the topic (default), then its max message size is set to 100M by default.

### Configure Kafka
The Kafka cluster information is to be provided to the `.gds.configureKafka` function. Once configured, the settings will be shared with all newly created data loaders and no need to set up Kafka for each loader. A few important parameters to this function are:
* `kafka_address`: the only required parameter for obvious reason. 
* `kafka_topic`: name of the topic to use. If it doesn't exist, the data loader will create it for you provided it has the permission. If it is not given, a topic with name like `tg_randomString` will be generated.
* `kafka_security_protocol`: If authentication is required, `SSL`, `SASL_PLAINTEXT`, `SASL_SSL` are supported as security protocal. 
* `kafka_sasl_mechanism`: For the `SASL` protocal, mechanisms `PLAIN` and `GSSAPI` are supported. 

Please see the [doc](https://docs.tigergraph.com/pytigergraph/current/gds/gds#_configurekafka) for detailed settings.

In [11]:
conn.gds.configureKafka(
    kafka_address="127.0.0.1:9092",
)

### Load subgraphs

In [14]:
%%time
neighbor_loader = conn.gds.neighborLoader(
    batch_size=16,
    num_neighbors = 10,
    num_hops =2,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=True,
    filter_by="train_mask"
)

ERROR:kafka.conn:<BrokerConnection node_id=0 host=34.168.46.139:9092 <connected> [IPv4 ('34.168.46.139', 9092)]>: socket disconnected


CPU times: user 60.1 ms, sys: 3.93 ms, total: 64 ms
Wall time: 1 s


In [15]:
%%time
for i, batch in enumerate(neighbor_loader):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
Data(edge_index=[2, 380], edge_feat=[380], is_train=[380], is_val=[380], x=[257, 1433], y=[257], train_mask=[257], val_mask=[257], test_mask=[257], is_seed=[257])
----Batch 1----
Data(edge_index=[2, 413], edge_feat=[413], is_train=[413], is_val=[413], x=[243, 1433], y=[243], train_mask=[243], val_mask=[243], test_mask=[243], is_seed=[243])
----Batch 2----
Data(edge_index=[2, 358], edge_feat=[358], is_train=[358], is_val=[358], x=[212, 1433], y=[212], train_mask=[212], val_mask=[212], test_mask=[212], is_seed=[212])
----Batch 3----
Data(edge_index=[2, 468], edge_feat=[468], is_train=[468], is_val=[468], x=[298, 1433], y=[298], train_mask=[298], val_mask=[298], test_mask=[298], is_seed=[298])
----Batch 4----
Data(edge_index=[2, 406], edge_feat=[406], is_train=[406], is_val=[406], x=[266, 1433], y=[266], train_mask=[266], val_mask=[266], test_mask=[266], is_seed=[266])
----Batch 5----
Data(edge_index=[2, 353], edge_feat=[353], is_train=[353], is_val=[353], x=[242, 1433], y

### Clean up

When this notebook is shutdown or the neighbor_loader object is garbage collected, the Kafka topic created by this loader should be deleted automatically. However, you can call the `.stop()` member function to manually delete the topic. It also resets the loader and closes the backgroud threads.

In [17]:
neighbor_loader.stop(remove_topics=True)