# Load the MAG240M Dataset into a Property Graph

The notebook is meant to stress test loading data into a property graph. 
This notebook requires some setup 

__num_gpus = Enter the number of GPUs that you have in the cluster.  This will determine the number of records loaded__


In [None]:
# location of where the data is being downloaded/saved to
base_dir = "../../datasets"

__The notebook requires that the data has already been downloaded__


    from ogb.lsc import MAG240MDataset
    dataset = MAG240MDataset(root = base_dir)
    dataset.download()
    

In [None]:
# Feature variable
skip_edges = False
skip_features = False

load_paper_features = True
load_paper_labels = True
load_paper_year = True

load_affiliation_edges = True
load_writes_edges = True
load_cites_edges = True

In [None]:
# Import needed libraries. 
# We recommend using the [cugraph_dev](https://github.com/rapidsai/cugraph/tree/branch-22.10/conda/environments) env through conda

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
from cugraph.dask.comms import comms as Comms
import cugraph.dask as dask_cugraph
import cugraph
import dask_cudf
import cudf
import time
import numpy as np

## Setup variables

In [None]:
# size of dataset
class Dataset:
    def __init__(self):
        self.num_papers  = 121751666
        self.num_authors = 122383112
        self.num_institutions = 25721
        self.num_paper_features = 768
        self.num_classes = 153
        self.num_cite_edges = 1297748926
        self.num_write_edges = 386022720
        self.num_affiliated_edges = 44592586
    
    def adjust_by(self, factor):
        self.num_papers = int(self.num_papers * factor)
        self.num_authors = int(self.num_authors * factor)
        self.num_institutions = int(self.num_institutions * factor)
        self.num_paper_features = int(self.num_paper_features * factor)
        self.num_classes = int(self.num_classes * factor)
        self.num_cite_edges = int(self.num_cite_edges * factor)
        self.num_write_edges = int(self.num_write_edges * factor)
        self.num_affiliated_edges = int(self.num_affiliated_edges * factor)        

dataset = Dataset()

## Directories and Files

In [None]:
# This NEEDS to be set to the location of the downloaded data
mag_dir = base_dir + "/mag240m_kddcup2021/processed"

# Features
paper_feature_file = mag_dir + "/paper/node_feat.npy"
paper_label_file = mag_dir + "/paper/node_label.npy"
paper_year_file = mag_dir + "/paper/node_year.npy"

# Edges
auth_institute_file = mag_dir + "/author___affiliated_with___institution/edge_index.npy"
auth_write_file = mag_dir + "/author___writes___paper/edge_index.npy"
auth_cites_file = mag_dir + "/paper___cites___paper/edge_index.npy"

In [None]:
#!nvidia-smi

In [None]:
# Specify the GPUs to use
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"

# The number oif GPUs in your system - here we are just testing with 1
num_gpus = 1

In [None]:
# it takes 16 GPUs to fully load the data
# compute the percent of data to be loaded

# Note, you can adjust the amount of data loaded by increasing or decreasisng this value
# use a large number to load a small amount per GPU

# This will be used later
percent_data_factor = num_gpus / 16

In [None]:
dataset.adjust_by(factor=percent_data_factor)

In [None]:
# number of features to load?
# there are 768 features, specify how many to be loaded
# the code just loadfs feature 0 to N, sequencially
#num_features = 768

# using just 10 features in this test
num_features = 10

## Setup the Cluster

In [None]:
# Create the DASK Cluster
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

## Create the Property Graph

In [None]:
from cugraph.experimental import MGPropertyGraph
from cugraph.experimental import PropertyGraph

pG = PropertyGraph()

## Load the Paper (Vertex) Features


In [None]:
def data_load_properties(_PG, file_name, num_rec, name=None, chunk=-1, data_range=None, col_names=None):
    
    # are we using SG or MG PG?
    _use_dask = isinstance(_PG, MGPropertyGraph)
    
    _data = np.lib.format.open_memmap(file_name, mode='r')    
    
    if chunk == -1:
        chunk = num_rec
    
    _rec_read = 0
    
    while _rec_read < num_rec:
        _start_id = _rec_read
        _end_id = _start_id + chunk
        
        if (_end_id > num_rec):
            _end_id = num_rec

        print(f"reading {name} data from {_start_id} to {_end_id}")
        
        if data_range is not None:
            _x = _data[_start_id:_end_id, 0:data_range]
        else:
            _x = _data[_start_id:_end_id]

        gdf = cudf.DataFrame(_x, columns=col_names)
        gdf['id'] = gdf.index + _start_id
        gdf.columns = gdf.columns.astype(str)

        if _use_dask:
            ddf = dask_cudf.from_cudf(gdf, npartitions=num_gpus)
        else:
            ddf = gdf

        pG.add_vertex_data(ddf, vertex_col_name='id', type_name=name)

        _rec_read = _end_id       

In [None]:
if not skip_features:
    
    if load_paper_labels:
        data_load_properties(pG, paper_label_file, num_rec=dataset.num_papers, name='paper_label', col_names=["label"])
        print(f"PG now contains {pG.get_num_vertices()} ")
        
    if load_paper_year:
        data_load_properties(pG, paper_year_file, num_rec=dataset.num_papers, name='paper_year', col_names=["year"])
        print(f"PG now contains {pG.get_num_vertices()} ")
        
    if load_paper_features:
        data_load_properties(pG, paper_feature_file, num_rec=dataset.num_papers, name='paper_feature', data_range=num_features)
        print(f"PG now contains {pG.get_num_vertices()} ")


## Load the Edges

In [None]:
def data_load_edges(_PG, file_name, num_edges, name=None, chunk=-1, col_names=None):
    
    # are we using SG or MG PG?
    _use_dask = isinstance(_PG, MGPropertyGraph)
    
    _data = np.lib.format.open_memmap(file_name, mode='r')    
    
    if chunk == -1:
        chunk = num_edges
    
    _rec_read = 0
    
    while _rec_read < num_edges:
        _start_id = _rec_read
        _end_id = _start_id + chunk
        
        if (_end_id > num_edges):
            _end_id = num_edges

        print(f"reading {name} data from {_start_id} to {_end_id}")
        
        _x = _data[_start_id:_end_id]

        gdf = cudf.DataFrame()
        gdf['src'] = _x[0]
        gdf['dst'] = _x[1]
        gdf.columns = gdf.columns.astype(str)

        if _use_dask:
            ddf = dask_cudf.from_cudf(gdf, npartitions=num_gpus)
        else:
            ddf = gdf

        pG.add_edge_data(ddf, vertex_col_names=['src', 'dst'], type_name=name)

        _rec_read = _end_id       

In [None]:
if not skip_edges: 
    if load_affiliation_edges:
        data_load_edges(pG, auth_institute_file, num_edges=dataset.num_affiliated_edges, name="affiliated" )
        print(f"PG now contains {pG.get_num_edges()} ")

    if load_writes_edges:
        data_load_edges(pG, auth_write_file, num_edges=dataset.num_write_edges, name="writes" )
        print(f"PG now contains {pG.get_num_edges()} ")

    if load_cites_edges:
        data_load_edges(pG, auth_cites_file, num_edges=dataset.num_cite_edges, name="cites" )
        print(f"PG now contains {pG.get_num_edges()} ")

In [None]:
print(f"PG now contains {pG.get_num_edges()} ")