Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ondisk dataset to PCQM4M_v2 example #474

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 31 additions & 10 deletions examples/lsc/pcqm4m-v2/main_gnn_multi_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import argparse
import time

from torch_geometric.datasets import PCQM4Mv2

### importing OGB-LSC
try:
from ogb.lsc import PygPCQM4Mv2Dataset, PCQM4Mv2Evaluator
Expand Down Expand Up @@ -103,26 +105,41 @@ def run(rank, dataset, args):
os.environ["MASTER_PORT"] = "12355"
dist.init_process_group("nccl", rank=rank, world_size=num_devices)

split_idx = dataset.get_idx_split()
if args.on_disk_dataset:
train_idx = torch.arange(len(dataset.indices()))
else:
split_idx = dataset.get_idx_split()
train_idx = split_idx["train"]

train_idx = split_idx["train"]

if num_devices > 1:
train_idx = train_idx.split(train_idx.size(0) // num_devices)[rank]

if num_devices > 1:
train_idx = train_idx.split(train_idx.size(0) // num_devices)[rank]
if args.train_subset:
subset_ratio = 0.1
subset_idx = torch.randperm(len(train_idx))[:int(subset_ratio*len(split_idx["train"]))]
train_loader = DataLoader(dataset[train_idx[subset_idx]], batch_size=args.batch_size, shuffle=True, num_workers = args.num_workers)
subset_idx = torch.randperm(len(train_idx))[:int(subset_ratio*len(train_idx))]
train_dataset = dataset[train_idx[subset_idx]]
else:
train_loader = DataLoader(dataset[train_idx], batch_size=args.batch_size, shuffle=True, num_workers = args.num_workers)
train_dataset = dataset[train_idx]

if rank == 0:
valid_loader = DataLoader(dataset[split_idx["valid"]], batch_size=args.batch_size, shuffle=False, num_workers = args.num_workers)
train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, num_workers = args.num_workers)

if rank == 0:
if args.on_disk_dataset:
valid_dataset = PCQM4Mv2(root = 'on_disk_dataset/', split="val")
test_dev_dataset = PCQM4Mv2(root = 'on_disk_dataset/', split="test")
test_challenge_dataset = PCQM4Mv2(root = 'on_disk_dataset/', split="holdout")
else:
valid_dataset = dataset[split_idx["valid"]]
test_dev_dataset = dataset[split_idx["test-dev"]]
test_challenge_dataset = dataset[split_idx["test-challenge"]]

valid_loader = DataLoader(valid_dataset, batch_size=args.batch_size, shuffle=False, num_workers = args.num_workers)
if args.save_test_dir != '':
testdev_loader = DataLoader(dataset[split_idx["test-dev"]], batch_size=args.batch_size, shuffle=False, num_workers = args.num_workers)
testchallenge_loader = DataLoader(dataset[split_idx["test-challenge"]], batch_size=args.batch_size, shuffle=False, num_workers = args.num_workers)
testdev_loader = DataLoader(test_dev_dataset, batch_size=args.batch_size, shuffle=False, num_workers = args.num_workers)
testchallenge_loader = DataLoader(test_challenge_dataset, batch_size=args.batch_size, shuffle=False, num_workers = args.num_workers)

if args.checkpoint_dir != '':
os.makedirs(args.checkpoint_dir, exist_ok = True)
Expand Down Expand Up @@ -267,6 +284,7 @@ def run(rank, dataset, args):
parser.add_argument('--checkpoint_dir', type=str, default = '', help='directory to save checkpoint')
parser.add_argument('--save_test_dir', type=str, default = '', help='directory to save test submission file')
parser.add_argument('--num_devices', type=int, default='0', help="Number of GPUs, if 0 runs on the CPU")
parser.add_argument('--on_disk_dataset', action='store_true')
args = parser.parse_args()

print(args)
Expand All @@ -277,7 +295,10 @@ def run(rank, dataset, args):
assert args.num_devices <= available_gpus, f"Cannot train with {args.num_devices} GPUs: available GPUs count {available_gpus}"

### automatic dataloading and splitting
dataset = PygPCQM4Mv2Dataset(root = 'dataset/')
if args.on_disk_dataset:
dataset = PCQM4Mv2(root = 'on_disk_dataset/', split='train')
else:
dataset = PygPCQM4Mv2Dataset(root = 'dataset/')

if args.num_devices > 1:
print(f'Starting multi-GPU training with DDP with {args.num_devices} GPUs')
Expand Down