## Basic utilities to go from RVine matrix to edge list and back

In [6]:
import copy
import time

import numpy as np

import pyvinecopulib as pv


def matrix_to_trees(M: np.ndarray):
  """
  Converts a vine array (lower triangular R-vine matrix) into a list of tree graphs.

  Parameters
  ----------
  M : np.ndarray
      A (d, d) lower-triangular R-vine matrix encoding the vine structure.

  Returns
  -------
  trees : list[dict]
      A list of trees, where each tree is a dictionary with:
          - 'nodes': node ids and their associated information
          - 'edges': list of (node1, node2, (a, b, C)) triples
            where (a, b) are conditioned variables and C is the conditioning set.
  """
  d = M.shape[0]
  trees = []

  for t in range(d - 1):
    edges = []

    # In tree T0, nodes are simply the variable indices
    if t == 0:
      prev_edges = {idx: idx for idx in range(d)}
    else:
      # In higher trees Tt, nodes correspond to edges of T(t-1)
      prev_edges = {idx: x[2] for idx, x in enumerate(trees[t - 1]["edges"])}

      # Build a lookup table:
      # (variable, conditioning set) -> node index
      lookup = {}
      for node_id, (a, b, cond_C) in prev_edges.items():
        lookup[(a, frozenset(cond_C.union({b})))] = node_id
        lookup[(b, frozenset(cond_C.union({a})))] = node_id

    for e in range(d - 1 - t):
      # Extract pair (a, b) and conditioning set C for each edge
      a = int(M[d - 1 - e, e])
      b = int(M[t, e])
      C = frozenset(map(int, M[:t, e]))

      edge_info = (a, b, C)

      if t == 0:
        # At tree 0, node ids are variables themselves
        node1, node2 = a, b
      else:
        # In higher trees, find corresponding node ids
        node1 = lookup.get((a, C), None)
        node2 = lookup.get((b, C), None)

      edges.append((node1, node2, edge_info))

    # Store the constructed tree
    trees.append({"nodes": prev_edges, "edges": edges})

  return trees


def trees_to_matrix(trees):
  """
  Converts a list of trees back into a vine array (lower triangular R-vine matrix).

  Parameters
  ----------
  trees : list[dict]
      A list of trees, as produced by `matrix_to_trees`, where each tree is a
      dictionary with 'nodes' and 'edges'.

  Returns
  -------
  M : np.ndarray
      A (d, d) lower-triangular R-vine matrix encoding the vine structure.
  """
  if len(trees) < 1:
    raise ValueError("The input trees list must contain at least one tree.")
  d = len(trees[0]["nodes"])  # Infer d from first tree nodes
  trunc_lvl = len(trees)
  M = np.zeros((d, d), dtype=np.int64)
  order = [0] * d

  # Work on a deep copy to modify edges without changing original trees
  trees = copy.deepcopy(trees)

  for col in range(d - 1):
    # At each column, we reconstruct one diagonal element and its upper entries
    t = max(min(trunc_lvl, d - 1 - col), 1)  # matrix above trunc_lvl is empty
    # t = d - 1 - col  # Corresponding tree level (from top down)

    tree_edges = trees[t - 1]["edges"]
    degrees = {}

    # Compute degree (number of connections) for each node
    for n1, n2, _ in tree_edges:
      degrees[n1] = degrees.get(n1, 0) + 1
      degrees[n2] = degrees.get(n2, 0) + 1

    # Find an edge attached to a leaf node (degree = 1)
    for idx, (n1, n2, (a, b, C)) in enumerate(tree_edges):
      if degrees[n1] == 1 or degrees[n2] == 1:
        # Choose the leaf node and the "other" variable
        if degrees[n1] == 1:
          leaf_var, other_var = a, b
        else:
          leaf_var, other_var = b, a

        # Set the diagonal element (leaf variable)
        order[col] = leaf_var

        # Set the first off-diagonal element
        M[t - 1, col] = other_var

        # Save the conditioning set associated with the edge
        ning_set = set(C)

        # Remove the used edge to prevent reuse
        del tree_edges[idx]
        break
    else:
      raise RuntimeError("No leaf edge found in tree.")

    # Fill the rest of the column by traversing to lower trees
    for k in range(1, t):
      check_set = set(ning_set)
      check_set.add(order[col])  # Update with current diagonal variable

      tree_edges = trees[t - 1 - k]["edges"]

      # Find an edge in the lower tree matching the expanded conditioning set
      for idx, (n1, n2, (a, b, C)) in enumerate(tree_edges):
        all_indices = {a, b}.union(C)

        if all_indices == check_set:
          # Found the matching edge
          next_var = b if a == order[col] else a

          # Fill the corresponding matrix entry
          M[t - 1 - k, col] = next_var

          # Update conditioning set
          ning_set = set(C)

          # Remove the used edge
          del tree_edges[idx]
          break
      else:
        raise RuntimeError("No matching edge found in lower tree.")

  # The last diagonal element: copy from the second-to-last column
  M[0, d - 1] = M[0, d - 2]

  # Fill the remaining diagonal elements
  for col in range(d - 1):
    M[d - 1 - col, col] = order[col]

  return M


### Example usage

In [7]:
d = 6
t = time.time()
rvs = pv.RVineStructure.simulate(d)
print("Simulation time:", time.time() - t)
t = time.time()
mat = rvs.matrix
print("Matrix time:", time.time() - t)
t = time.time()
trees = matrix_to_trees(mat)
print("Trees extraction time:", time.time() - t)
t = time.time()
reconstructed_mat = trees_to_matrix(trees)
print("Matrix reconstruction time:", time.time() - t)
assert np.array_equal(mat, reconstructed_mat)

Simulation time: 0.0001251697540283203
Matrix time: 4.506111145019531e-05
Trees extraction time: 8.225440979003906e-05
Matrix reconstruction time: 0.00020623207092285156


### Works for any truncation level

In [None]:
trunc_level = 2
reconstructed_mat = trees_to_matrix(trees[:trunc_level])
print(mat)
print(reconstructed_mat)

## Merging two RVine matrices

In [22]:
d = 4
rvs1 = pv.RVineStructure.simulate(d)
rvs2 = pv.RVineStructure.simulate(d)
mat1 = rvs1.matrix
mat2 = rvs2.matrix + d * np.fliplr(np.triu(np.ones((d, d), dtype=np.uint64)))
print(mat1)
print(mat2)

[[2 2 2 2]
 [4 4 4 0]
 [1 1 0 0]
 [3 0 0 0]]
[[5 7 7 7]
 [7 5 5 0]
 [8 8 0 0]
 [6 0 0 0]]


In [56]:
# Split edges into sublists according to their tree level
def split_edges_by_tree_level(
  edges: list[tuple[int, int, tuple[int, ...]]],
) -> list[list[tuple[int, int, tuple[int, ...]]]]:
  """
  Splits edges into sublists according to their tree level.
  """
  levels = {}
  for a, b, C in edges:
    t = len(C)
    if t not in levels:
      levels[t] = []
    levels[t].append((a, b, C))
  return [levels[t] for t in sorted(levels.keys(), reverse=True)]


def edges_to_matrix_partial(
  triplets: list[tuple[int, int, tuple[int, ...]]],
) -> np.ndarray:
  """
  Safely reconstruct a (possibly partial) vine matrix from unordered or merged triplets.
  Assigns each triplet to the first available column compatible with its tree level.

                                   Parameters:
  -----------
  triplets : list of (a, b, C), where C is the conditioning set

  Returns:
  --------
  M : np.ndarray of shape (d, d)
  """
  d = max(max(a, b, *C) for a, b, C in edges)

  M = np.zeros((d, d), dtype=np.uint64)
  order = [0] * d  # anti-diagonal

  # Step 1: organize edges by tree level
  trees = split_edges_by_tree_level(edges)

  # Copy trees since we will mutate them
  trees = [list(level) for level in trees]
  trunc_lvl = len(trees)

  ning_set = []
  used_edges = [set() for _ in range(trunc_lvl)]

  for col in range(d - 1):
    t = max(min(trunc_lvl, d - 1 - col), 1) - 1
    # if t == 0:
    #   raise ValueError("Cannot build R-vine with insufficient edges.")

    # import pdb

    # pdb.set_trace()

    # Step 1: pick a leaf edge from tree t
    for idx, (a, b, C) in enumerate(trees[t]):
      if idx in used_edges[t]:
        continue

      # determine degrees in this tree
      deg = {}
      for aa, bb, _ in trees[t]:
        deg[aa] = deg.get(aa, 0) + 1
        deg[bb] = deg.get(bb, 0) + 1

      # pick an edge with a leaf node
      if deg[a] == 1 or deg[b] == 1:
        leaf, other = (a, b) if deg[a] == 1 else (b, a)
        order[col] = leaf
        M[t - 1, col] = other
        ning_set = list(C)
        used_edges[t].add(idx)
        break
    else:
      raise ValueError(f"No leaf found in tree {t} at col {col}")

    # Step 2: fill rest of the column from tree t-1 down to 1
    for k in range(1, t):
      target_set = set([order[col]] + ning_set)

      for idx, (a, b, C) in enumerate(trees[t - k]):
        if idx in used_edges[t - k]:
          continue
        all_vars = {a, b, *C}
        if all_vars != target_set:
          continue

        conditioned = (a, b)
        if order[col] == b:
          conditioned = (b, a)

        M[t - k - 1, col] = conditioned[1]
        ning_set = list(C)
        used_edges[t - k].add(idx)
        break

  # Last anti-diagonal entry
  order[d - 1] = M[0, d - 2]

  # Write anti-diagonal to matrix
  for j in range(d):
    M[d - 1 - j, j] = order[j]

  return M, order


edges1 = matrix_to_edges(mat1)
edges2 = matrix_to_edges(mat2)
edges = edges1 + edges2
mat = edges_to_matrix_partial(edges)
print(mat)

(array([[0, 0, 0, 0, 0, 4, 0, 0],
       [2, 2, 2, 5, 7, 0, 3, 0],
       [0, 0, 0, 0, 0, 3, 0, 0],
       [0, 0, 0, 0, 8, 0, 0, 0],
       [0, 0, 0, 6, 0, 0, 0, 0],
       [0, 0, 4, 0, 0, 0, 0, 0],
       [0, 1, 0, 0, 0, 0, 0, 0],
       [3, 0, 0, 0, 0, 0, 1, 0]], dtype=uint64), [3, 1, 4, 6, 8, 3, 3, np.uint64(0)])
