In [1]:
import functools
import os
import shutil
from typing import Any, Dict, List, Optional

from absl import app
from absl import flags
from absl import logging
import clrs
import jax
import numpy as np
import requests
import tensorflow as tf
from tqdm import tqdm
from jax import numpy as jnp
import wandb

In [2]:
for name in list(flags.FLAGS):
  delattr(flags.FLAGS, name)

In [None]:
flags.DEFINE_list('algorithms', ['dfs'], 'Which algorithms to run.')
flags.DEFINE_list('train_lengths', ['4', '7', '11', '13', '16'],
                  'Which training sizes to use. A size of -1 means '
                  'use the benchmark dataset.')
flags.DEFINE_integer('length_needle', -8,
                     'Length of needle for training and validation '
                     '(not testing) in string matching algorithms. '
                     'A negative value randomizes the length for each sample '
                     'between 1 and the opposite of the value. '
                     'A value of 0 means use always 1/4 of the length of '
                     'the haystack (the default sampler behavior).')
flags.DEFINE_integer('seed', 42, 'Random seed to set')

flags.DEFINE_boolean('random_pos', True,
                     'Randomize the pos input common to all algos.')
flags.DEFINE_boolean('enforce_permutations', True,
                     'Whether to enforce permutation-type node pointers.')
flags.DEFINE_boolean('enforce_pred_as_input', True,
                     'Whether to change pred_h hints into pred inputs.')
flags.DEFINE_integer('batch_size', 32, 'Batch size used for training.')
flags.DEFINE_boolean('chunked_training', False,
                     'Whether to use chunking for training.')
flags.DEFINE_integer('chunk_length', 16,
                     'Time chunk length used for training (if '
                     '`chunked_training` is True.')
flags.DEFINE_integer('train_steps', 200, 'Number of training iterations.')
flags.DEFINE_integer('eval_every', 10, 'Evaluation frequency (in steps).')
flags.DEFINE_integer('test_every', 500, 'Evaluation frequency (in steps).')

flags.DEFINE_integer('hidden_size', 128,
                     'Number of hidden units of the model.')
flags.DEFINE_integer('nb_heads', 1, 'Number of heads for GAT processors')
flags.DEFINE_integer('nb_msg_passing_steps', 1,
                     'Number of message passing steps to run per hint.')
flags.DEFINE_float('learning_rate', 0.001, 'Learning rate to use.')
flags.DEFINE_float('grad_clip_max_norm', 1.0,
                   'Gradient clipping by norm. 0.0 disables grad clipping')
flags.DEFINE_float('dropout_prob', 0.0, 'Dropout rate to use.')
flags.DEFINE_float('hint_teacher_forcing', 0.0,
                   'Probability that ground-truth teacher hints are encoded '
                   'during training instead of predicted hints. Only '
                   'pertinent in encoded_decoded modes.')
flags.DEFINE_enum('hint_mode', 'encoded_decoded',
                  ['encoded_decoded', 'decoded_only', 'none'],
                  'How should hints be used? Note, each mode defines a '
                  'separate task, with various difficulties. `encoded_decoded` '
                  'requires the model to explicitly materialise hint sequences '
                  'and therefore is hardest, but also most aligned to the '
                  'underlying algorithmic rule. Hence, `encoded_decoded` '
                  'should be treated as the default mode for our benchmark. '
                  'In `decoded_only`, hints are only used for defining '
                  'reconstruction losses. Often, this will perform well, but '
                  'note that we currently do not make any efforts to '
                  'counterbalance the various hint losses. Hence, for certain '
                  'tasks, the best performance will now be achievable with no '
                  'hint usage at all (`none`).')
flags.DEFINE_enum('hint_repred_mode', 'soft', ['soft', 'hard', 'hard_on_eval'],
                  'How to process predicted hints when fed back as inputs.'
                  'In soft mode, we use softmaxes for categoricals, pointers '
                  'and mask_one, and sigmoids for masks. '
                  'In hard mode, we use argmax instead of softmax, and hard '
                  'thresholding of masks. '
                  'In hard_on_eval mode, soft mode is '
                  'used for training and hard mode is used for evaluation.')
flags.DEFINE_boolean('use_ln', True,
                     'Whether to use layer normalisation in the processor.')
flags.DEFINE_boolean('use_lstm', False,
                     'Whether to insert an LSTM after message passing.')
flags.DEFINE_integer('nb_triplet_fts', 8,
                     'How many triplet features to compute?')

flags.DEFINE_enum('encoder_init', 'xavier_on_scalars',
                  ['default', 'xavier_on_scalars'],
                  'Initialiser to use for the encoders.')
flags.DEFINE_enum('processor_type', 'mpnn',
                  ['deepsets', 'mpnn', 'pgn', 'pgn_mask',
                   'triplet_mpnn', 'triplet_pgn', 'triplet_pgn_mask',
                   'gat', 'gatv2', 'gat_full', 'gatv2_full',
                   'gpgn', 'gpgn_mask', 'gmpnn',
                   'triplet_gpgn', 'triplet_gpgn_mask', 'triplet_gmpnn'],
                  'Processor type to use as the network P.')

flags.DEFINE_string('checkpoint_path', '/tmp/CLRS30',
                    'Path in which checkpoints are saved.')
flags.DEFINE_string('dataset_path', '/tmp/CLRS30',
                    'Path in which dataset is stored.')
flags.DEFINE_boolean('freeze_processor', False,
                     'Whether to freeze the processor of the model.')

FLAGS = flags.FLAGS


PRED_AS_INPUT_ALGOS = [
    'binary_search',
    'minimum',
    'find_maximum_subarray',
    'find_maximum_subarray_kadane',
    'matrix_chain_order',
    'lcs_length',
    'optimal_bst',
    'activity_selector',
    'task_scheduling',
    'naive_string_matcher',
    'kmp_matcher',
    'jarvis_march']

In [5]:
flags.FLAGS.mark_as_parsed()

In [6]:
# helpers
def unpack(v):
  try:
    return v.item()  # DeviceArray  # pytype: disable=attribute-error
  except (AttributeError, ValueError):
    return v


def _iterate_sampler(sampler, batch_size):
  while True:
    yield sampler.next(batch_size)


def _maybe_download_dataset(dataset_path):
  """Download CLRS30 dataset if needed."""
  dataset_folder = os.path.join(dataset_path, clrs.get_clrs_folder())
  if os.path.isdir(dataset_folder):
    logging.info('Dataset found at %s. Skipping download.', dataset_folder)
    return dataset_folder
  logging.info('Dataset not found in %s. Downloading...', dataset_folder)

  clrs_url = clrs.get_dataset_gcp_url()
  request = requests.get(clrs_url, allow_redirects=True)
  clrs_file = os.path.join(dataset_path, os.path.basename(clrs_url))
  os.makedirs(dataset_folder)
  open(clrs_file, 'wb').write(request.content)
  shutil.unpack_archive(clrs_file, extract_dir=dataset_folder)
  os.remove(clrs_file)
  return dataset_folder


def _concat(dps, axis):
  return jax.tree_util.tree_map(lambda *x: np.concatenate(x, axis), *dps)


In [7]:
# samplers and eval stuff
def make_sampler(length: int,
                 rng: Any,
                 algorithm: str,
                 split: str,
                 batch_size: int,
                 multiplier: int,
                 randomize_pos: bool,
                 enforce_pred_as_input: bool,
                 enforce_permutations: bool,
                 chunked: bool,
                 chunk_length: int,
                 sampler_kwargs: Dict[str, Any]):
  """Create a sampler with given options.

  Args:
    length: Size of samples (i.e., number of nodes in the graph).
      A length of -1 will mean that the benchmark
      dataset (for the given split) is used. Positive sizes will instantiate
      samplers of the corresponding size.
    rng: Numpy random state.
    algorithm: The name of the algorithm to sample from.
    split: 'train', 'val' or 'test'.
    batch_size: Samples per batch.
    multiplier: Integer multiplier for the number of samples in the dataset,
      only used for positive sizes. Negative multiplier means infinite samples.
    randomize_pos: Whether to randomize the `pos` input.
    enforce_pred_as_input: Whether to convert fixed pred_h hints to inputs.
    enforce_permutations: Whether to enforce permutation pointers.
    chunked: Whether to chunk the dataset.
    chunk_length: Unroll length of chunks, if `chunked` is True.
    sampler_kwargs: Extra args passed to the sampler.
  Returns:
    A sampler (iterator), the number of samples in the iterator (negative
    if infinite samples), and the spec.
  """
  if length < 0:  # load from file
    dataset_folder = _maybe_download_dataset(FLAGS.dataset_path)
    sampler, num_samples, spec = clrs.create_dataset(folder=dataset_folder,
                                                     algorithm=algorithm,
                                                     batch_size=batch_size,
                                                     split=split)
    sampler = sampler.as_numpy_iterator()
  else:
    num_samples = clrs.CLRS30[split]['num_samples'] * multiplier
    sampler, spec = clrs.build_sampler(
        algorithm,
        seed=rng.randint(2**32),
        num_samples=num_samples,
        length=length,
        **sampler_kwargs,
        )
    sampler = _iterate_sampler(sampler, batch_size)

  if randomize_pos:
    sampler = clrs.process_random_pos(sampler, rng)
  if enforce_pred_as_input and algorithm in PRED_AS_INPUT_ALGOS:
    spec, sampler = clrs.process_pred_as_input(spec, sampler)
  spec, sampler = clrs.process_permutations(spec, sampler, enforce_permutations)
  if chunked:
    sampler = clrs.chunkify(sampler, chunk_length)
  return sampler, num_samples, spec


def make_multi_sampler(sizes, rng, **kwargs):
  """Create a sampler with cycling sample sizes."""
  ss = []
  tot_samples = 0
  for length in sizes:
    sampler, num_samples, spec = make_sampler(length, rng, **kwargs)
    ss.append(sampler)
    tot_samples += num_samples

  def cycle_samplers():
    while True:
      for s in ss:
        yield next(s)
  return cycle_samplers(), tot_samples, spec


def _concat(dps, axis):
  return jax.tree_util.tree_map(lambda *x: np.concatenate(x, axis), *dps)


def collect_and_eval(sampler, predict_fn, sample_count, rng_key, extras):
  """Collect batches of output and hint preds and evaluate them."""
  processed_samples = 0
  preds = []
  outputs = []
  while processed_samples < sample_count:
    feedback = next(sampler)
    batch_size = feedback.outputs[0].data.shape[0]
    outputs.append(feedback.outputs)
    new_rng_key, rng_key = jax.random.split(rng_key)
    cur_preds, _ = predict_fn(new_rng_key, feedback.features)
    preds.append(cur_preds)
    processed_samples += batch_size
  outputs = _concat(outputs, axis=0)
  preds = _concat(preds, axis=0)
  out = clrs.evaluate(outputs, preds)
  if extras:
    out.update(extras)
  return {k: unpack(v) for k, v in out.items()}


def create_samplers(
    rng,
    train_lengths: List[int],
    *,
    algorithms: Optional[List[str]] = None,
    val_lengths: Optional[List[int]] = None,
    test_lengths: Optional[List[int]] = None,
    train_batch_size: int = 32,
    val_batch_size: int = 32,
    test_batch_size: int = 32,
):
  """Create samplers for training, validation and testing.

  Args:
    rng: Numpy random state.
    train_lengths: list of training lengths to use for each algorithm.
    algorithms: list of algorithms to generate samplers for. Set to
        FLAGS.algorithms if not provided.
    val_lengths: list of lengths for validation samplers for each algorithm. Set
        to maxumim training length if not provided.
    test_lengths: list of lengths for test samplers for each algorithm. Set to
        [-1] to use the benchmark dataset if not provided.
    train_batch_size: batch size for training samplers.
    val_batch_size: batch size for validation samplers.
    test_batch_size: batch size for test samplers.

  Returns:
    Tuple of:
      train_samplers: list of samplers for training.
      val_samplers: list of samplers for validation.
      val_sample_counts: list of sample counts for validation.
      test_samplers: list of samplers for testing.
      test_sample_counts: list of sample counts for testing.
      spec_list: list of specs for each algorithm.

  """

  train_samplers = []
  val_samplers = []
  val_sample_counts = []
  test_samplers = []
  test_sample_counts = []
  spec_list = []

  algorithms = algorithms or FLAGS.algorithms
  for algo_idx, algorithm in enumerate(algorithms):
    # Set the training lengths for the current algorithm.
    current_algo_train_lengths = train_lengths

     # Make full dataset pipeline run on CPU (including prefetching).
    with tf.device('/cpu:0'):
      if algorithm in ['naive_string_matcher', 'kmp_matcher']:
        # Fixed haystack + needle; variability will be in needle
        # Still, for chunked training, we maintain as many samplers
        # as train lengths, since, for each length there is a separate state,
        # and we must keep the 1:1 relationship between states and samplers.
        max_length = max(current_algo_train_lengths)
        if max_length > 0:  # if < 0, we are using the benchmark data
          max_length = (max_length * 5) // 4
        current_algo_train_lengths = [max_length]
        if FLAGS.chunked_training:
          current_algo_train_lengths = current_algo_train_lengths * len(
              current_algo_train_lengths
          )

      logging.info('Creating samplers for algo %s', algorithm)

      p = tuple([0.1 + 0.1 * i for i in range(9)])
      if p and algorithm in ['articulation_points', 'bridges',
                             'mst_kruskal', 'bipartite_matching']:
        # Choose a lower connection probability for the above algorithms,
        # otherwise trajectories are very long
        p = tuple(np.array(p) / 2)
      length_needle = FLAGS.length_needle
      sampler_kwargs = dict(p=p, length_needle=length_needle)
      if length_needle == 0:
        sampler_kwargs.pop('length_needle')

      common_sampler_args = dict(
          algorithm=algorithms[algo_idx],
          rng=rng,
          enforce_pred_as_input=FLAGS.enforce_pred_as_input,
          enforce_permutations=FLAGS.enforce_permutations,
          chunk_length=FLAGS.chunk_length,
          )

      train_args = dict(
          sizes=current_algo_train_lengths,
          split='train',
          batch_size=train_batch_size,
          multiplier=-1,
          randomize_pos=FLAGS.random_pos,
          chunked=FLAGS.chunked_training,
          sampler_kwargs=sampler_kwargs,
          **common_sampler_args,
      )
      train_sampler, _, _ = make_multi_sampler(**train_args)

      mult = clrs.CLRS_30_ALGS_SETTINGS[algorithm]['num_samples_multiplier']
      val_args = dict(
          sizes=val_lengths or [np.amax(current_algo_train_lengths)],
          split='val',
          batch_size=val_batch_size,
          multiplier=2 * mult,
          randomize_pos=FLAGS.random_pos,
          chunked=False,
          sampler_kwargs=sampler_kwargs,
          **common_sampler_args,
      )
      val_sampler, val_samples, _ = make_multi_sampler(**val_args)

      test_args = dict(sizes=test_lengths or [-1],
                       split='test',
                       batch_size=test_batch_size,
                       multiplier=2 * mult,
                       randomize_pos=False,
                       chunked=False,
                       sampler_kwargs={},
                       **common_sampler_args)
      test_sampler, test_samples, spec = make_multi_sampler(**test_args)

    spec_list.append(spec)
    train_samplers.append(train_sampler)
    val_samplers.append(val_sampler)
    val_sample_counts.append(val_samples)
    test_samplers.append(test_sampler)
    test_sample_counts.append(test_samples)

  return (train_samplers,
          val_samplers, val_sample_counts,
          test_samplers, test_sample_counts,
          spec_list)


In [8]:
def vanilla_clrs(seed, unused_argv):
  wandb.init(project="clrs-project", name=f"baseline-training-{FLAGS.algorithms}-mpnn")
  if FLAGS.hint_mode == 'encoded_decoded':
    encode_hints = True
    decode_hints = True
  elif FLAGS.hint_mode == 'decoded_only':
    encode_hints = False
    decode_hints = True
  elif FLAGS.hint_mode == 'none':
    encode_hints = False
    decode_hints = False
  else:
    raise ValueError('Hint mode not in {encoded_decoded, decoded_only, none}.')

  train_lengths = [int(x) for x in FLAGS.train_lengths]

  rng = np.random.RandomState(seed)
  rng_key = jax.random.PRNGKey(rng.randint(2**32))

  # Create samplers
  (
      train_samplers,
      val_samplers,
      val_sample_counts,
      test_samplers,
      test_sample_counts,
      spec_list,
  ) = create_samplers(
      rng=rng,
      train_lengths=train_lengths,
      algorithms=FLAGS.algorithms,
      val_lengths=[np.amax(train_lengths)],
      test_lengths=[-1],
      train_batch_size=FLAGS.batch_size,
  )

  processor_factory = clrs.get_processor_factory(
      FLAGS.processor_type,
      use_ln=FLAGS.use_ln,
      nb_triplet_fts=FLAGS.nb_triplet_fts,
      nb_heads=FLAGS.nb_heads,
  )
  model_params = dict(
      processor_factory=processor_factory,
      hidden_dim=FLAGS.hidden_size,
      encode_hints=encode_hints,
      decode_hints=decode_hints,
      encoder_init=FLAGS.encoder_init,
      use_lstm=FLAGS.use_lstm,
      learning_rate=FLAGS.learning_rate,
      grad_clip_max_norm=FLAGS.grad_clip_max_norm,
      checkpoint_path=FLAGS.checkpoint_path,
      freeze_processor=FLAGS.freeze_processor,
      dropout_prob=FLAGS.dropout_prob,
      hint_teacher_forcing=FLAGS.hint_teacher_forcing,
      hint_repred_mode=FLAGS.hint_repred_mode,
      nb_msg_passing_steps=FLAGS.nb_msg_passing_steps,
      )

  eval_model = clrs.models.BaselineModel(
      spec=spec_list,
      dummy_trajectory=[next(t) for t in val_samplers],
      **model_params
  )
  if FLAGS.chunked_training:
    train_model = clrs.models.BaselineModelChunked(
        spec=spec_list,
        dummy_trajectory=[next(t) for t in train_samplers],
        **model_params
        )
  else:
    train_model = eval_model

  # Training loop.
  best_score = -1.0
  current_train_items = [0] * len(FLAGS.algorithms)
  step = 0
  next_eval = 0
  # Make sure scores improve on first step, but not overcome best score
  # until all algos have had at least one evaluation.
  val_scores = [-99999.9] * len(FLAGS.algorithms)
  length_idx = 0

  while step < FLAGS.train_steps:
    feedback_list = [next(t) for t in train_samplers]

    # Initialize model.
    if step == 0:
      all_features = [f.features for f in feedback_list]
      if FLAGS.chunked_training:
        # We need to initialize the model with samples of all lengths for
        # all algorithms. Also, we need to make sure that the order of these
        # sample sizes is the same as the order of the actual training sizes.
        all_length_features = [all_features] + [
            [next(t).features for t in train_samplers]
            for _ in range(len(train_lengths))]
        train_model.init(all_length_features[:-1], FLAGS.seed + 1)
      else:
        train_model.init(all_features, FLAGS.seed + 1)

    # Training step.
    print(f"{train_samplers=}")
    for algo_idx in range(len(train_samplers)):
      feedback = feedback_list[algo_idx]
      rng_key, new_rng_key = jax.random.split(rng_key)
      if FLAGS.chunked_training:
        # In chunked training, we must indicate which training length we are
        # using, so the model uses the correct state.
        length_and_algo_idx = (length_idx, algo_idx)
      else:
        # In non-chunked training, all training lengths can be treated equally,
        # since there is no state to maintain between batches.
        length_and_algo_idx = algo_idx
      cur_loss = train_model.feedback(rng_key, feedback, length_and_algo_idx)
      rng_key = new_rng_key

      if FLAGS.chunked_training:
        examples_in_chunk = np.sum(feedback.features.is_last).item()
      else:
        examples_in_chunk = len(feedback.features.lengths)
      current_train_items[algo_idx] += examples_in_chunk
      logging.info('Algo %s step %i current loss %f, current_train_items %i.',
                   FLAGS.algorithms[algo_idx], step,
                   cur_loss, current_train_items[algo_idx])
      wandb.log({
          "loss": float(cur_loss),
          "step": step
      })

    # Periodically evaluate model
    if step >= next_eval:
      eval_model.params = train_model.params
      for algo_idx in range(len(train_samplers)):
        common_extras = {'examples_seen': current_train_items[algo_idx],
                         'step': step,
                         'algorithm': FLAGS.algorithms[algo_idx]}

        # Validation info.
        new_rng_key, rng_key = jax.random.split(rng_key)
        val_stats = collect_and_eval(
            val_samplers[algo_idx],
            functools.partial(eval_model.predict, algorithm_index=algo_idx),
            val_sample_counts[algo_idx],
            new_rng_key,
            extras=common_extras)
        logging.info('(val) algo %s step %d: %s',
                     FLAGS.algorithms[algo_idx], step, val_stats)
        val_scores[algo_idx] = val_stats['score']

      next_eval += FLAGS.eval_every

      # If best total score, update best checkpoint.
      # Also save a best checkpoint on the first step.
      msg = (f'best avg val score was '
             f'{best_score/len(FLAGS.algorithms):.3f}, '
             f'current avg val score is {np.mean(val_scores):.3f}, '
             f'val scores are: ')
      msg += ', '.join(
          ['%s: %.3f' % (x, y) for (x, y) in zip(FLAGS.algorithms, val_scores)])
      if (sum(val_scores) > best_score) or step == 0:
        best_score = sum(val_scores)
        logging.info('Checkpointing best model, %s', msg)
        train_model.save_model('best.pkl')
      else:
        logging.info('Not saving new best model, %s', msg)
    
      wandb.log({
          "score": float(np.mean(val_scores)),
          "step": step
      })

    step += 1
    length_idx = (length_idx + 1) % len(train_lengths)

  logging.info('Restoring best model from checkpoint...')
  eval_model.restore_model('best.pkl', only_load_processor=False)

  for algo_idx in range(len(train_samplers)):
    common_extras = {'examples_seen': current_train_items[algo_idx],
                     'step': step,
                     'algorithm': FLAGS.algorithms[algo_idx]}

    new_rng_key, rng_key = jax.random.split(rng_key)
    test_stats = collect_and_eval(
        test_samplers[algo_idx],
        functools.partial(eval_model.predict, algorithm_index=algo_idx),
        test_sample_counts[algo_idx],
        new_rng_key,
        extras=common_extras)
    logging.info('(test) algo %s : %s', FLAGS.algorithms[algo_idx], test_stats)

  logging.info('Done!')


In [9]:
vanilla_clrs(42,'')

[34m[1mwandb[0m: [wandb.login()] Loaded credentials for https://api.wandb.ai from /Users/juli/.netrc.
[34m[1mwandb[0m: Currently logged in as: [33mtgoab[0m ([33mtgoabike[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin




train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals>.cycle_samplers at 0x12bf11560>]
train_samplers=[<generator object make_multi_sampler.<locals

In [10]:
from collections import deque
from typing import Dict
import numpy as np
import jax.numpy as jnp
from clrs._src import processors


# ============================================================================
# Cayley Graph Utilities
# ============================================================================

def get_cayley_graph(n: int) -> np.ndarray:
    """Generate edge_index (2, E) of the Cayley graph Cay(SL(2, Z_n); S_n).

    Uses 4 generators of SL(2, Z_n) and BFS from the identity matrix.
    The generator set is closed under inversion, so the resulting graph
    is effectively undirected.
    """
    generators = np.array([
        [[1, 1], [0, 1]],
        [[1, n - 1], [0, 1]],
        [[1, 0], [1, 1]],
        [[1, 0], [n - 1, 1]],
    ])
    ind = 1
    queue = deque([np.array([[1, 0], [0, 1]])])
    nodes = {(1, 0, 0, 1): 0}
    senders, receivers = [], []

    while queue:
        x = queue.pop()
        x_flat = (x[0][0], x[0][1], x[1][0], x[1][1])
        ind_x = nodes[x_flat]
        for i in range(4):
            tx = np.mod(np.matmul(x, generators[i]), n)
            tx_flat = (tx[0][0], tx[0][1], tx[1][0], tx[1][1])
            if tx_flat not in nodes:
                nodes[tx_flat] = ind
                ind += 1
                queue.append(tx)
            senders.append(ind_x)
            receivers.append(nodes[tx_flat])

    return np.asarray([senders, receivers], dtype=np.int64)


def cayley_graph_size(n: int) -> int:
    """|SL(2, Z_n)| = n^3 * prod(1 - 1/p^2) for each distinct prime factor p."""
    from primefac import primefac
    n = int(n)
    if n <= 1:
        return 1
    primes = list(set(primefac(n)))
    return round(n ** 3 * np.prod([1.0 - 1.0 / (p * p) for p in primes]))


def get_cayley_n(num_nodes: int) -> int:
    """Smallest n such that |SL(2, Z_n)| >= num_nodes."""
    n = 1
    while cayley_graph_size(n) < num_nodes:
        n += 1
    return n


def build_truncated_cayley_adj(num_nodes: int) -> np.ndarray:
    """Build the EGP-style truncated Cayley expander adjacency matrix.

    Steps:
      1. Find smallest Cayley graph with >= num_nodes nodes
      2. Generate full Cayley edge_index via BFS
      3. Truncate: keep only edges where both endpoints < num_nodes
      4. Convert to (num_nodes, num_nodes) adjacency matrix with self-loops

    Returns:
      np.ndarray of shape (num_nodes, num_nodes), dtype float32.
    """
    n = get_cayley_n(num_nodes)
    edge_index = get_cayley_graph(n)  # (2, E)

    # Truncate to the input graph's number of nodes
    mask = (edge_index[0] < num_nodes) & (edge_index[1] < num_nodes)
    truncated = edge_index[:, mask]

    # Build adjacency matrix
    adj = np.zeros((num_nodes, num_nodes), dtype=np.float32)
    adj[truncated[0], truncated[1]] = 1.0

    # Add self-loops (consistent with PGN's initial adj_mat = eye(N))
    np.fill_diagonal(adj, 1.0)

    return adj


# ============================================================================
# EGP Processor
# ============================================================================

class EGP_Processor(processors.PGN):
    """Expander Graph Propagation processor using Cayley graphs.

    Interweaves message passing over two graph topologies:
      - Even time steps → original adj_mat  (the algorithm's actual graph)
      - Odd  time steps → truncated Cayley expander adjacency

    The Cayley adjacency is precomputed once per num_nodes and cached.
    It is materialized as a JAX constant during tracing, so there is no
    runtime overhead beyond the jnp.where selection.

    Interweaving is always active when time_step is provided (which is
    the case for all phases: training, validation, and inference via
    CustomNetWithTimeStep).  When time_step is None the processor falls
    back to the plain original adjacency as a safety net.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._cayley_adj_cache: Dict[int, np.ndarray] = {}

    def _get_cayley_adj(self, num_nodes: int) -> np.ndarray:
        """Get (or compute + cache) the Cayley adjacency for num_nodes."""
        if num_nodes not in self._cayley_adj_cache:
            self._cayley_adj_cache[num_nodes] = build_truncated_cayley_adj(
                num_nodes
            )
        return self._cayley_adj_cache[num_nodes]

    def __call__(
        self,
        node_fts: jnp.ndarray,   # (B, N, H)
        edge_fts: jnp.ndarray,   # (B, N, N, H)
        graph_fts: jnp.ndarray,  # (B, H)
        adj_mat: jnp.ndarray,    # (B, N, N) — original graph
        hidden: jnp.ndarray,     # (B, N, H)
        **kwargs,
    ) -> jnp.ndarray:
        """One message-passing step with adjacency interweaving.

        Kwargs consumed:
          time_step (int, traced): current algorithm time step
          nb_nodes  (int, concrete): number of nodes in the graph
        """
        time_step = kwargs.pop('time_step', None)
        nb_nodes = kwargs.pop('nb_nodes', node_fts.shape[1])

        if time_step is not None:
            # --- Precompute Cayley adj (runs once at trace time) ---
            cayley_adj_np = self._get_cayley_adj(nb_nodes)
            cayley_adj = jnp.array(cayley_adj_np)                 # (N, N)
            cayley_adj_batched = jnp.broadcast_to(
                cayley_adj[None, :, :], adj_mat.shape              # (B, N, N)
            )

            # --- Select based on parity (evaluated at runtime via jnp.where) ---
            is_even = (time_step % 2 == 0)
            adj_mat_to_use = jnp.where(
                is_even,          # condition (traced bool)
                adj_mat,          # even → original graph
                cayley_adj_batched  # odd  → Cayley expander
            )
        else:
            # Safety fallback: if time_step is not provided, use original adj.
            # In practice this should not happen because CustomNetWithTimeStep
            # always passes time_step for every phase (train / val / test).
            adj_mat_to_use = adj_mat

        return super().__call__(
            node_fts, edge_fts, graph_fts, adj_mat_to_use, hidden, **kwargs
        )


# ============================================================================
# Factory
# ============================================================================

def get_egp_factory(use_ln: bool, nb_triplet_fts: int, nb_heads=None):
    """Factory function that creates EGP_Processor instances."""
    def _factory(out_size: int):
        return EGP_Processor(
            out_size=out_size,
            msgs_mlp_sizes=[out_size, out_size],
            use_ln=use_ln,
            use_triplets=False,
            nb_triplet_fts=0,
        )
    return _factory

In [11]:
"""Custom Net that passes time_step to processor for parity-based matrix selection"""

from typing import Dict, List, Optional, Tuple
import functools
import jax
import jax.numpy as jnp
from clrs._src import nets
from clrs._src import decoders
from clrs._src import encoders
from clrs._src import probing
from clrs._src import specs
import haiku as hk

_Array = jnp.ndarray
_DataPoint = probing.DataPoint
_Trajectory = List[_DataPoint]
_Location = specs.Location
_Spec = specs.Spec
_Stage = specs.Stage
_Type = specs.Type


class CustomNetWithTimeStep(nets.Net):
  """Extended Net that passes time_step to processor for dynamic matrix selection
  
  This class extends the parent Net to pass the current time step (i) to the processor,
  allowing it to make decisions based on time step parity or other time-dependent logic.
  """

  def _msg_passing_step(self,
                        mp_state: nets._MessagePassingScanState,
                        i: int,  # ← TIME STEP
                        hints: List[_DataPoint],
                        repred: bool,
                        lengths: jnp.ndarray,
                        batch_size: int,
                        nb_nodes: int,
                        inputs: _Trajectory,
                        first_step: bool,
                        spec: _Spec,
                        encs: Dict[str, List[hk.Module]],
                        decs: Dict[str, Tuple[hk.Module]],
                        return_hints: bool,
                        return_all_outputs: bool
                        ):
    """Process one time step, passing the time step to the processor"""
    
    # Handle hint decoding (same as parent)
    if self.decode_hints and not first_step:
      assert self._hint_repred_mode in ['soft', 'hard', 'hard_on_eval']
      hard_postprocess = (self._hint_repred_mode == 'hard' or
                          (self._hint_repred_mode == 'hard_on_eval' and repred))
      decoded_hint = decoders.postprocess(spec,
                                          mp_state.hint_preds,
                                          sinkhorn_temperature=0.1,
                                          sinkhorn_steps=25,
                                          hard=hard_postprocess)
    
    # Handle current hints (same as parent)
    if repred and self.decode_hints and not first_step:
      cur_hint = []
      for hint in decoded_hint:
        cur_hint.append(decoded_hint[hint])
    else:
      cur_hint = []
      needs_noise = (self.decode_hints and not first_step and
                     self._hint_teacher_forcing < 1.0)
      if needs_noise:
        force_mask = jax.random.bernoulli(
            hk.next_rng_key(), self._hint_teacher_forcing,
            (batch_size,))
      else:
        force_mask = None
      for hint in hints:
        hint_data = jnp.asarray(hint.data)[i]
        _, loc, typ = spec[hint.name]
        if needs_noise:
          if (typ == _Type.POINTER and
              decoded_hint[hint.name].type_ == _Type.SOFT_POINTER):
            hint_data = hk.one_hot(hint_data, nb_nodes)
            typ = _Type.SOFT_POINTER
          hint_data = jnp.where(nets._expand_to(force_mask, hint_data),
                                hint_data,
                                decoded_hint[hint.name].data)
        cur_hint.append(
            probing.DataPoint(
                name=hint.name, location=loc, type_=typ, data=hint_data))

    # Call _one_step_pred with time_step (MODIFIED LINE)
    hiddens, output_preds_cand, hint_preds, lstm_state = self._one_step_pred(
        inputs, cur_hint, mp_state.hiddens,
        batch_size, nb_nodes, mp_state.lstm_state,
        spec, encs, decs, repred,
        time_step=i)  # ← PASS TIME STEP

    # Handle output predictions (same as parent)
    if first_step:
      output_preds = output_preds_cand
    else:
      output_preds = {}
      for outp in mp_state.output_preds:
        is_not_done = nets._is_not_done_broadcast(lengths, i,
                                                    output_preds_cand[outp])
        output_preds[outp] = is_not_done * output_preds_cand[outp] + (
            1.0 - is_not_done) * mp_state.output_preds[outp]

    new_mp_state = nets._MessagePassingScanState(
        hint_preds=hint_preds,
        output_preds=output_preds,
        hiddens=hiddens,
        lstm_state=lstm_state)
    
    return new_mp_state, new_mp_state

  def _one_step_pred(
      self,
      inputs: _Trajectory,
      hints: _Trajectory,
      hidden: _Array,
      batch_size: int,
      nb_nodes: int,
      lstm_state: Optional[hk.LSTMState],
      spec: _Spec,
      encs: Dict[str, List[hk.Module]],
      decs: Dict[str, Tuple[hk.Module]],
      repred: bool,
      time_step: Optional[int] = None,  # ← ADD PARAMETER
  ):
    """Generates one-step predictions with optional time_step awareness"""

    # Initialize features (same as parent)
    node_fts = jnp.zeros((batch_size, nb_nodes, self.hidden_dim))
    edge_fts = jnp.zeros((batch_size, nb_nodes, nb_nodes, self.hidden_dim))
    graph_fts = jnp.zeros((batch_size, self.hidden_dim))
    adj_mat = jnp.repeat(
        jnp.expand_dims(jnp.eye(nb_nodes), 0), batch_size, axis=0)

    # ENCODE (same as parent)
    trajectories = [inputs]
    if self.encode_hints:
      trajectories.append(hints)

    for trajectory in trajectories:
      for dp in trajectory:
        try:
          dp = encoders.preprocess(dp, nb_nodes)
          assert dp.type_ != _Type.SOFT_POINTER
          adj_mat = encoders.accum_adj_mat(dp, adj_mat)
          encoder = encs[dp.name]
          edge_fts = encoders.accum_edge_fts(encoder, dp, edge_fts)
          node_fts = encoders.accum_node_fts(encoder, dp, node_fts)
          graph_fts = encoders.accum_graph_fts(encoder, dp, graph_fts)
        except Exception as e:
          raise Exception(f'Failed to process {dp}') from e

    # PROCESS (MODIFIED - pass time_step to processor)
    nxt_hidden = hidden
    for _ in range(self.nb_msg_passing_steps):
      nxt_hidden, nxt_edge = self.processor(
          node_fts,
          edge_fts,
          graph_fts,
          adj_mat,
          nxt_hidden,
          batch_size=batch_size,
          nb_nodes=nb_nodes,
          time_step=time_step,  # ← PASS TIME STEP HERE
      )

    # Rest is same as parent
    if not repred:
      nxt_hidden = hk.dropout(hk.next_rng_key(), self._dropout_prob, nxt_hidden)

    if self.use_lstm:
      nxt_hidden, nxt_lstm_state = jax.vmap(self.lstm)(nxt_hidden, lstm_state)
    else:
      nxt_lstm_state = None

    h_t = jnp.concatenate([node_fts, hidden, nxt_hidden], axis=-1)
    if nxt_edge is not None:
      e_t = jnp.concatenate([edge_fts, nxt_edge], axis=-1)
    else:
      e_t = edge_fts

    # DECODE
    hint_preds, output_preds = decoders.decode_fts(
        decoders=decs,
        spec=spec,
        h_t=h_t,
        adj_mat=adj_mat,
        edge_fts=e_t,
        graph_fts=graph_fts,
        inf_bias=self.processor.inf_bias,
        inf_bias_edge=self.processor.inf_bias_edge,
        repred=repred,
    )

    return nxt_hidden, output_preds, hint_preds, nxt_lstm_state

In [12]:
import clrs
import haiku as hk
import functools
import numpy as np
import jax
import logging

def egp_clrs(seed):
  wandb.init(project="clrs-project", name=f"egp-training-{FLAGS.algorithms}")
  
  if FLAGS.hint_mode == 'encoded_decoded':
    encode_hints = True
    decode_hints = True
  elif FLAGS.hint_mode == 'decoded_only':
    encode_hints = False
    decode_hints = True
  elif FLAGS.hint_mode == 'none':
    encode_hints = False
    decode_hints = False
  else:
    raise ValueError('Hint mode not in {encoded_decoded, decoded_only, none}.')

  train_lengths = [int(x) for x in FLAGS.train_lengths]
  rng = np.random.RandomState(seed)
  rng_key = jax.random.PRNGKey(rng.randint(2**32))

  print("------> Creating samplers")
  (
      train_samplers,
      val_samplers,
      val_sample_counts,
      test_samplers,
      test_sample_counts,
      spec_list,
  ) = create_samplers(
      rng=rng,
      train_lengths=train_lengths,
      algorithms=FLAGS.algorithms,
      val_lengths=[np.amax(train_lengths)],
      test_lengths=[-1],
      train_batch_size=FLAGS.batch_size,
  )

  # ============================================================================
  # EGP processor factory — interweaving is always active.
  # CustomNetWithTimeStep passes time_step to the processor in every phase
  # (training, validation, inference), so the Cayley adjacency interweaving
  # is consistently used throughout.
  # ============================================================================
  processor_factory = get_egp_factory(
      use_ln=FLAGS.use_ln,
      nb_triplet_fts=FLAGS.nb_triplet_fts,
      nb_heads=FLAGS.nb_heads,
  )

  model_params = dict(
      processor_factory=processor_factory,
      hidden_dim=FLAGS.hidden_size,
      encode_hints=encode_hints,
      decode_hints=decode_hints,
      encoder_init=FLAGS.encoder_init,
      use_lstm=FLAGS.use_lstm,
      learning_rate=FLAGS.learning_rate,
      grad_clip_max_norm=FLAGS.grad_clip_max_norm,
      checkpoint_path=FLAGS.checkpoint_path,
      freeze_processor=FLAGS.freeze_processor,
      dropout_prob=FLAGS.dropout_prob,
      hint_teacher_forcing=FLAGS.hint_teacher_forcing,
      hint_repred_mode=FLAGS.hint_repred_mode,
      nb_msg_passing_steps=FLAGS.nb_msg_passing_steps,
  )

  print("------> Creating model")
  # Custom BaselineModel that uses CustomNetWithTimeStep so the processor
  # always receives time_step for Cayley adjacency interweaving.
  class CustomBaselineModel(clrs.models.BaselineModel):
    def _create_net_fns(self, hidden_dim, encode_hints, processor_factory,
                        use_lstm, encoder_init, dropout_prob,
                        hint_teacher_forcing, hint_repred_mode):
      """Override to use CustomNetWithTimeStep instead of Net."""
      def _use_net(*args, **kwargs):
        return CustomNetWithTimeStep(
            self._spec, hidden_dim, encode_hints, self.decode_hints,
            processor_factory, use_lstm, encoder_init,
            dropout_prob, hint_teacher_forcing,
            hint_repred_mode,
            self.nb_dims, self.nb_msg_passing_steps,
            self.debug)(*args, **kwargs)

      self.net_fn = hk.transform(_use_net)

      pmap_args = dict(axis_name='batch', devices=jax.local_devices())
      n_devices = jax.local_device_count()
      func, static_arg, extra_args = (
          (jax.jit, 'static_argnums', {}) if n_devices == 1 else
          (jax.pmap, 'static_broadcasted_argnums', pmap_args))
      pmean = functools.partial(jax.lax.pmean, axis_name='batch')
      self._maybe_pmean = pmean if n_devices > 1 else lambda x: x
      extra_args[static_arg] = 3
      self.jitted_grad = func(self._compute_grad, **extra_args)
      extra_args[static_arg] = 4
      self.jitted_feedback = func(self._feedback, donate_argnums=[0, 3],
                                  **extra_args)
      extra_args[static_arg] = [3, 4, 5]
      self.jitted_predict = func(self._predict, **extra_args)
      extra_args[static_arg] = [3, 4]

  # Single model for training + evaluation (like vanilla CLRS).
  # Interweaving is always active via CustomNetWithTimeStep → EGP_Processor.
  eval_model = CustomBaselineModel(
      spec=spec_list,
      dummy_trajectory=[next(t) for t in val_samplers],
      **model_params
  )
  if FLAGS.chunked_training:
    train_model = CustomBaselineModel(
        spec=spec_list,
        dummy_trajectory=[next(t) for t in train_samplers],
        **model_params
    )
  else:
    train_model = eval_model

  # ========================================================================
  # Training loop
  # ========================================================================
  print("------> Training loop initialization")
  best_score = -1.0
  current_train_items = [0] * len(FLAGS.algorithms)
  step = 0
  next_eval = 0
  val_scores = [-99999.9] * len(FLAGS.algorithms)
  length_idx = 0

  while step < FLAGS.train_steps:
    feedback_list = [next(t) for t in train_samplers]

    # Initialize model on the first step.
    if step == 0:
      all_features = [f.features for f in feedback_list]
      if FLAGS.chunked_training:
        all_length_features = [all_features] + [
            [next(t).features for t in train_samplers]
            for _ in range(len(train_lengths))]
        train_model.init(all_length_features[:-1], FLAGS.seed + 1)
      else:
        train_model.init(all_features, FLAGS.seed + 1)

    # Training step.
    for algo_idx in range(len(train_samplers)):
      feedback = feedback_list[algo_idx]
      rng_key, new_rng_key = jax.random.split(rng_key)
      if FLAGS.chunked_training:
        length_and_algo_idx = (length_idx, algo_idx)
      else:
        length_and_algo_idx = algo_idx

      cur_loss = train_model.feedback(rng_key, feedback, length_and_algo_idx)
      rng_key = new_rng_key

      if FLAGS.chunked_training:
        examples_in_chunk = np.sum(feedback.features.is_last).item()
      else:
        examples_in_chunk = len(feedback.features.lengths)
      current_train_items[algo_idx] += examples_in_chunk
      logging.info('Algo %s step %i current loss %f, current_train_items %i.',
                   FLAGS.algorithms[algo_idx], step,
                   cur_loss, current_train_items[algo_idx])
      wandb.log({"loss": float(cur_loss), "step": step})

    # Periodically evaluate model.
    if step >= next_eval:
      eval_model.params = train_model.params
      for algo_idx in range(len(train_samplers)):
        common_extras = {'examples_seen': current_train_items[algo_idx],
                         'step': step,
                         'algorithm': FLAGS.algorithms[algo_idx]}

        new_rng_key, rng_key = jax.random.split(rng_key)
        val_stats = collect_and_eval(
            val_samplers[algo_idx],
            functools.partial(eval_model.predict, algorithm_index=algo_idx),
            val_sample_counts[algo_idx],
            new_rng_key,
            extras=common_extras)
        logging.info('(val) algo %s step %d: %s',
                     FLAGS.algorithms[algo_idx], step, val_stats)
        val_scores[algo_idx] = val_stats['score']

      next_eval += FLAGS.eval_every

      msg = (f'best avg val score was '
             f'{best_score/len(FLAGS.algorithms):.3f}, '
             f'current avg val score is {np.mean(val_scores):.3f}, '
             f'val scores are: ')
      msg += ', '.join(
          ['%s: %.3f' % (x, y) for (x, y) in zip(FLAGS.algorithms, val_scores)])
      if (sum(val_scores) > best_score) or step == 0:
        best_score = sum(val_scores)
        logging.info('Checkpointing best model, %s', msg)
        train_model.save_model('best.pkl')
      else:
        logging.info('Not saving new best model, %s', msg)

      wandb.log({"score": float(np.mean(val_scores)), "step": step})

    step += 1
    length_idx = (length_idx + 1) % len(train_lengths)

  # ========================================================================
  # Testing
  # ========================================================================
  logging.info('Restoring best model from checkpoint...')
  eval_model.restore_model('best.pkl', only_load_processor=False)

  for algo_idx in range(len(train_samplers)):
    common_extras = {'examples_seen': current_train_items[algo_idx],
                     'step': step,
                     'algorithm': FLAGS.algorithms[algo_idx]}

    new_rng_key, rng_key = jax.random.split(rng_key)
    test_stats = collect_and_eval(
        test_samplers[algo_idx],
        functools.partial(eval_model.predict, algorithm_index=algo_idx),
        test_sample_counts[algo_idx],
        new_rng_key,
        extras=common_extras)
    logging.info('(test) algo %s : %s', FLAGS.algorithms[algo_idx], test_stats)

  logging.info('Done!')

In [13]:
egp_clrs(42)

[34m[1mwandb[0m: [32m[41mERROR[0m The nbformat package was not found. It is required to save notebook history.


0,1
loss,█▇▇▆▅▅▄▄▃▅▄▃▄▆▄▄▄▄▃▄▃▅▄▅▄▄▁▇▄▄▄▄▄▂▄█▄▁▄▄
score,▆▆▇▇▅▆▆▆▆▅▇▇▅▅▅▆▁█▆▅
step,▁▁▁▂▂▂▂▂▂▂▃▃▃▃▃▃▃▃▃▃▄▄▄▅▅▅▅▅▅▅▆▇▇▇▇█████

0,1
loss,11.03319
score,0.08228
step,199.0




------> Creating samplers




------> Creating model
------> Training loop initialization


In [14]:
# ============================================================================
# CGP (Cayley Graph Propagation) Processor
# ============================================================================
#
# Key difference from EGP:
#   EGP truncates the Cayley graph to match the input graph's node count,
#   which can damage the desirable spectral expansion properties.
#
#   CGP uses the COMPLETE Cayley graph by introducing virtual nodes,
#   preserving the full expander structure. Virtual nodes are padded
#   with zero features and removed after message passing.
#
# Interweaving strategy (same parity rule as EGP):
#   Even time steps → original adj_mat (padded, virtual nodes have self-loops)
#   Odd  time steps → complete Cayley adjacency (all N_cayley nodes connected)
# ============================================================================

class CGP_Processor(processors.PGN):
    """Cayley Graph Propagation processor with virtual nodes.

    Unlike EGP, which truncates the Cayley graph to match the number of
    input nodes (potentially breaking expansion properties), CGP pads
    the input graph to the Cayley graph size with virtual nodes and
    uses the *complete* Cayley structure.

    For each message-passing call:
      1. Pad node_fts, edge_fts, adj_mat, hidden → N_cayley with virtual nodes
      2. Select adjacency:
         - even time_step → original adj (padded, virtual nodes self-loop only)
         - odd  time_step → complete Cayley graph adjacency
      3. Run PGN message passing on the expanded graph
      4. Truncate output back to N (discard virtual node representations)

    Virtual node features are zero-initialized each step, acting as
    communication conduits through the Cayley expander topology.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._cayley_cache: Dict[int, tuple] = {}

    def _build_cayley(self, num_nodes: int):
        """Build and cache the complete Cayley graph adjacency + metadata.

        Returns:
            (cayley_adj_np, cayley_n_nodes, n_virtual, virtual_self_loop_np)
        """
        if num_nodes not in self._cayley_cache:
            n = get_cayley_n(num_nodes)
            cayley_n_nodes = cayley_graph_size(n)
            edge_index = get_cayley_graph(n)

            # Complete Cayley adjacency with self-loops
            cayley_adj = np.zeros((cayley_n_nodes, cayley_n_nodes), dtype=np.float32)
            cayley_adj[edge_index[0], edge_index[1]] = 1.0
            np.fill_diagonal(cayley_adj, 1.0)

            # Self-loop matrix for virtual nodes only (used with padded original adj)
            n_virtual = cayley_n_nodes - num_nodes
            virtual_self_loop = np.zeros((cayley_n_nodes, cayley_n_nodes), dtype=np.float32)
            if n_virtual > 0:
                np.fill_diagonal(virtual_self_loop[num_nodes:, num_nodes:], 1.0)

            self._cayley_cache[num_nodes] = (
                cayley_adj, cayley_n_nodes, n_virtual, virtual_self_loop
            )
        return self._cayley_cache[num_nodes]

    def __call__(
        self,
        node_fts: jnp.ndarray,   # (B, N, H)
        edge_fts: jnp.ndarray,   # (B, N, N, H)
        graph_fts: jnp.ndarray,  # (B, H)
        adj_mat: jnp.ndarray,    # (B, N, N)
        hidden: jnp.ndarray,     # (B, N, H)
        **kwargs,
    ) -> jnp.ndarray:
        """Message-passing step with virtual node expansion and Cayley interweaving."""
        time_step = kwargs.pop('time_step', None)
        nb_nodes = kwargs.pop('nb_nodes', node_fts.shape[1])

        cayley_adj_np, cayley_n_nodes, n_virtual, virtual_self_loop_np = \
            self._build_cayley(nb_nodes)

        # ---- Pad spatial dimensions from N → N_cayley ----
        if n_virtual > 0:
            # Pad node features:  (B, N, H) → (B, N_cayley, H)
            node_fts_padded = jnp.pad(
                node_fts, ((0, 0), (0, n_virtual), (0, 0)))
            # Pad hidden state:   (B, N, H) → (B, N_cayley, H)
            hidden_padded = jnp.pad(
                hidden, ((0, 0), (0, n_virtual), (0, 0)))
            # Pad edge features:  (B, N, N, H) → (B, N_cayley, N_cayley, H)
            edge_fts_padded = jnp.pad(
                edge_fts, ((0, 0), (0, n_virtual), (0, n_virtual), (0, 0)))
            # Pad adj_mat + add self-loops for virtual nodes
            adj_padded = jnp.pad(
                adj_mat, ((0, 0), (0, n_virtual), (0, n_virtual)))
            adj_padded = adj_padded + jnp.array(virtual_self_loop_np)[None]
        else:
            node_fts_padded = node_fts
            hidden_padded = hidden
            edge_fts_padded = edge_fts
            adj_padded = adj_mat

        # ---- Select adjacency based on time-step parity ----
        if time_step is not None:
            cayley_adj = jnp.array(cayley_adj_np)
            cayley_adj_batched = jnp.broadcast_to(
                cayley_adj[None, :, :], adj_padded.shape
            )
            is_even = (time_step % 2 == 0)
            adj_to_use = jnp.where(is_even, adj_padded, cayley_adj_batched)
        else:
            adj_to_use = adj_padded

        # ---- Run PGN on expanded graph ----
        result, tri_msgs = super().__call__(
            node_fts_padded, edge_fts_padded, graph_fts,
            adj_to_use, hidden_padded, **kwargs
        )

        # ---- Truncate back to original N nodes ----
        if n_virtual > 0:
            result = result[:, :nb_nodes, :]
            if tri_msgs is not None:
                tri_msgs = tri_msgs[:, :nb_nodes, :nb_nodes, :]

        return result, tri_msgs


# ============================================================================
# CGP Factory
# ============================================================================

def get_cgp_factory(use_ln: bool, nb_triplet_fts: int, nb_heads=None):
    """Factory function that creates CGP_Processor instances."""
    def _factory(out_size: int):
        return CGP_Processor(
            out_size=out_size,
            msgs_mlp_sizes=[out_size, out_size],
            use_ln=use_ln,
            use_triplets=False,
            nb_triplet_fts=0,
        )
    return _factory

In [15]:
import clrs
import haiku as hk
import functools
import numpy as np
import jax
import logging

def cgp_clrs(seed):
  """Train a CLRS model using Cayley Graph Propagation (CGP).

  CGP differs from EGP by using the complete Cayley graph with virtual
  nodes rather than truncating the Cayley graph. This preserves the
  full spectral expansion properties of the Cayley structure.

  Uses CustomNetWithTimeStep (defined in Cell 9) so the processor
  always receives time_step for adjacency interweaving.
  """
  wandb.init(project="clrs-project", name=f"cgp-training-{FLAGS.algorithms}")

  if FLAGS.hint_mode == 'encoded_decoded':
    encode_hints = True
    decode_hints = True
  elif FLAGS.hint_mode == 'decoded_only':
    encode_hints = False
    decode_hints = True
  elif FLAGS.hint_mode == 'none':
    encode_hints = False
    decode_hints = False
  else:
    raise ValueError('Hint mode not in {encoded_decoded, decoded_only, none}.')

  train_lengths = [int(x) for x in FLAGS.train_lengths]
  rng = np.random.RandomState(seed)
  rng_key = jax.random.PRNGKey(rng.randint(2**32))

  print("------> [CGP] Creating samplers")
  (
      train_samplers,
      val_samplers,
      val_sample_counts,
      test_samplers,
      test_sample_counts,
      spec_list,
  ) = create_samplers(
      rng=rng,
      train_lengths=train_lengths,
      algorithms=FLAGS.algorithms,
      val_lengths=[np.amax(train_lengths)],
      test_lengths=[-1],
      train_batch_size=FLAGS.batch_size,
  )

  # ============================================================================
  # CGP processor factory — uses complete Cayley graph with virtual nodes.
  # CustomNetWithTimeStep passes time_step so the processor can interweave.
  # ============================================================================
  processor_factory = get_cgp_factory(
      use_ln=FLAGS.use_ln,
      nb_triplet_fts=FLAGS.nb_triplet_fts,
      nb_heads=FLAGS.nb_heads,
  )

  model_params = dict(
      processor_factory=processor_factory,
      hidden_dim=FLAGS.hidden_size,
      encode_hints=encode_hints,
      decode_hints=decode_hints,
      encoder_init=FLAGS.encoder_init,
      use_lstm=FLAGS.use_lstm,
      learning_rate=FLAGS.learning_rate,
      grad_clip_max_norm=FLAGS.grad_clip_max_norm,
      checkpoint_path=FLAGS.checkpoint_path,
      freeze_processor=FLAGS.freeze_processor,
      dropout_prob=FLAGS.dropout_prob,
      hint_teacher_forcing=FLAGS.hint_teacher_forcing,
      hint_repred_mode=FLAGS.hint_repred_mode,
      nb_msg_passing_steps=FLAGS.nb_msg_passing_steps,
  )

  print("------> [CGP] Creating model")

  # Custom BaselineModel that uses CustomNetWithTimeStep so the processor
  # always receives time_step for Cayley adjacency interweaving.
  class CGPBaselineModel(clrs.models.BaselineModel):
    def _create_net_fns(self, hidden_dim, encode_hints, processor_factory,
                        use_lstm, encoder_init, dropout_prob,
                        hint_teacher_forcing, hint_repred_mode):
      """Override to use CustomNetWithTimeStep instead of Net."""
      def _use_net(*args, **kwargs):
        return CustomNetWithTimeStep(
            self._spec, hidden_dim, encode_hints, self.decode_hints,
            processor_factory, use_lstm, encoder_init,
            dropout_prob, hint_teacher_forcing,
            hint_repred_mode,
            self.nb_dims, self.nb_msg_passing_steps,
            self.debug)(*args, **kwargs)

      self.net_fn = hk.transform(_use_net)

      pmap_args = dict(axis_name='batch', devices=jax.local_devices())
      n_devices = jax.local_device_count()
      func, static_arg, extra_args = (
          (jax.jit, 'static_argnums', {}) if n_devices == 1 else
          (jax.pmap, 'static_broadcasted_argnums', pmap_args))
      pmean = functools.partial(jax.lax.pmean, axis_name='batch')
      self._maybe_pmean = pmean if n_devices > 1 else lambda x: x
      extra_args[static_arg] = 3
      self.jitted_grad = func(self._compute_grad, **extra_args)
      extra_args[static_arg] = 4
      self.jitted_feedback = func(self._feedback, donate_argnums=[0, 3],
                                  **extra_args)
      extra_args[static_arg] = [3, 4, 5]
      self.jitted_predict = func(self._predict, **extra_args)
      extra_args[static_arg] = [3, 4]

  eval_model = CGPBaselineModel(
      spec=spec_list,
      dummy_trajectory=[next(t) for t in val_samplers],
      **model_params
  )
  if FLAGS.chunked_training:
    train_model = CGPBaselineModel(
        spec=spec_list,
        dummy_trajectory=[next(t) for t in train_samplers],
        **model_params
    )
  else:
    train_model = eval_model

  # ========================================================================
  # Training loop
  # ========================================================================
  print("------> [CGP] Training loop initialization")
  best_score = -1.0
  current_train_items = [0] * len(FLAGS.algorithms)
  step = 0
  next_eval = 0
  val_scores = [-99999.9] * len(FLAGS.algorithms)
  length_idx = 0

  while step < FLAGS.train_steps:
    feedback_list = [next(t) for t in train_samplers]

    # Initialize model on the first step.
    if step == 0:
      all_features = [f.features for f in feedback_list]
      if FLAGS.chunked_training:
        all_length_features = [all_features] + [
            [next(t).features for t in train_samplers]
            for _ in range(len(train_lengths))]
        train_model.init(all_length_features[:-1], FLAGS.seed + 1)
      else:
        train_model.init(all_features, FLAGS.seed + 1)

    # Training step.
    for algo_idx in range(len(train_samplers)):
      feedback = feedback_list[algo_idx]
      rng_key, new_rng_key = jax.random.split(rng_key)
      if FLAGS.chunked_training:
        length_and_algo_idx = (length_idx, algo_idx)
      else:
        length_and_algo_idx = algo_idx

      cur_loss = train_model.feedback(rng_key, feedback, length_and_algo_idx)
      rng_key = new_rng_key

      if FLAGS.chunked_training:
        examples_in_chunk = np.sum(feedback.features.is_last).item()
      else:
        examples_in_chunk = len(feedback.features.lengths)
      current_train_items[algo_idx] += examples_in_chunk
      logging.info('[CGP] Algo %s step %i current loss %f, current_train_items %i.',
                   FLAGS.algorithms[algo_idx], step,
                   cur_loss, current_train_items[algo_idx])
      wandb.log({"loss": float(cur_loss), "step": step})

    # Periodically evaluate model.
    if step >= next_eval:
      eval_model.params = train_model.params
      for algo_idx in range(len(train_samplers)):
        common_extras = {'examples_seen': current_train_items[algo_idx],
                         'step': step,
                         'algorithm': FLAGS.algorithms[algo_idx]}

        new_rng_key, rng_key = jax.random.split(rng_key)
        val_stats = collect_and_eval(
            val_samplers[algo_idx],
            functools.partial(eval_model.predict, algorithm_index=algo_idx),
            val_sample_counts[algo_idx],
            new_rng_key,
            extras=common_extras)
        logging.info('[CGP] (val) algo %s step %d: %s',
                     FLAGS.algorithms[algo_idx], step, val_stats)
        val_scores[algo_idx] = val_stats['score']

      next_eval += FLAGS.eval_every

      msg = (f'best avg val score was '
             f'{best_score/len(FLAGS.algorithms):.3f}, '
             f'current avg val score is {np.mean(val_scores):.3f}, '
             f'val scores are: ')
      msg += ', '.join(
          ['%s: %.3f' % (x, y) for (x, y) in zip(FLAGS.algorithms, val_scores)])
      if (sum(val_scores) > best_score) or step == 0:
        best_score = sum(val_scores)
        logging.info('[CGP] Checkpointing best model, %s', msg)
        train_model.save_model('best.pkl')
      else:
        logging.info('[CGP] Not saving new best model, %s', msg)

      wandb.log({"score": float(np.mean(val_scores)), "step": step})

    step += 1
    length_idx = (length_idx + 1) % len(train_lengths)

  # ========================================================================
  # Testing
  # ========================================================================
  logging.info('[CGP] Restoring best model from checkpoint...')
  eval_model.restore_model('best.pkl', only_load_processor=False)

  for algo_idx in range(len(train_samplers)):
    common_extras = {'examples_seen': current_train_items[algo_idx],
                     'step': step,
                     'algorithm': FLAGS.algorithms[algo_idx]}

    new_rng_key, rng_key = jax.random.split(rng_key)
    test_stats = collect_and_eval(
        test_samplers[algo_idx],
        functools.partial(eval_model.predict, algorithm_index=algo_idx),
        test_sample_counts[algo_idx],
        new_rng_key,
        extras=common_extras)
    logging.info('[CGP] (test) algo %s : %s', FLAGS.algorithms[algo_idx], test_stats)

  logging.info('[CGP] Done!')

In [None]:
cgp_clrs(42)

0,1
loss,█▇▆▆▆▄▄▄▁▄▅▄▆▄▃▄▄▄▄▅▃▄▁▃▆▄▄▆▂▄▆▄▃▂▅▃▄▆▄▃
score,▁▁▂▃▂▁▂▁▂▂▁▂▁▄▄▆▃█▇▆
step,▁▁▁▁▁▁▁▁▂▂▂▂▂▂▂▃▃▃▃▃▄▄▄▄▅▅▅▅▅▆▆▆▆▇▇▇▇▇██

0,1
loss,16.31502
score,0.13013
step,199.0




------> [CGP] Creating samplers




------> [CGP] Creating model
------> [CGP] Training loop initialization
