-
Notifications
You must be signed in to change notification settings - Fork 128
/
edge_swaps.py
215 lines (165 loc) · 6.97 KB
/
edge_swaps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from typing import Optional
import numba as nb
import numpy as np
from beartype import beartype
from scipy.sparse import csr_array, lil_matrix
from sklearn.utils import check_scalar
from graspologic.preconditions import check_argument
from graspologic.types import AdjacencyMatrix, Tuple
from graspologic.utils import import_graph, is_loopless, is_symmetric, is_unweighted
# Code based on: https://github.com/joelnish/double-edge-swap-mcmc/blob/master/dbl_edge_mcmc.py
class EdgeSwapper:
"""
Degree Preserving Edge Swaps
This class allows for performing degree preserving edge swaps to
generate new networks with the same degree sequence as the input network.
Attributes
----------
adjacency : np.ndarray OR csr_array, shape (n_verts, n_verts)
The initial adjacency matrix to perform edge swaps on. Must be unweighted and undirected.
edge_list : np.ndarray, shape (n_verts, 2)
The corresponding edgelist for the input network
seed: int, optional
Random seed to make outputs reproducible, must be positive
References
----------
.. [1] Fosdick, B. K., Larremore, D. B., Nishimura, J., & Ugander, J. (2018).
Configuring random graph models with fixed degree sequences.
Siam Review, 60(2), 315-355.
.. [2] Carstens, C. J., & Horadam, K. J. (2017).
Switching edges to randomize networks: what goes wrong and how to fix it.
Journal of Complex Networks, 5(3), 337-351.
.. [3] https://github.com/joelnish/double-edge-swap-mcmc/blob/master/dbl_edge_mcmc.py
"""
@beartype
def __init__(self, adjacency: AdjacencyMatrix, seed: Optional[int] = None):
weight_check = is_unweighted(adjacency)
check_argument(weight_check, "adjacency must be unweighted")
loop_check = is_loopless(adjacency)
check_argument(loop_check, "adjacency cannot have loops")
direct_check = is_symmetric(adjacency)
check_argument(direct_check, "adjacency must be undirected")
max_seed = np.iinfo(np.uint32).max
if seed is None:
seed = np.random.randint(max_seed, dtype=np.int64)
seed = check_scalar(
seed, "seed", (int, np.integer), min_val=0, max_val=max_seed
)
self._rng = np.random.default_rng(seed)
adjacency = import_graph(adjacency, copy=True)
if isinstance(adjacency, csr_array):
# more efficient for manipulations which change sparsity structure
adjacency = lil_matrix(adjacency)
self._edge_swap_function = _edge_swap
else:
# for numpy input, use numba for JIT compilation
# NOTE: not convinced numba is helping much here, look into optimizing
self._edge_swap_function = _edge_swap_numba
self.adjacency = adjacency
edge_list = self._do_setup()
check_argument(len(edge_list) >= 2, "there must be at least 2 edges")
self.edge_list = edge_list
def _do_setup(self) -> np.ndarray:
"""
Computes the edge_list from the adjancency matrix
Returns
-------
edge_list : np.ndarray, shape (n_verts, 2)
The corresponding edge_list of adjacency
"""
# get edges for upper triangle of undirected graph
row_inds, col_inds = np.nonzero(self.adjacency)
upper = row_inds < col_inds
row_inds = row_inds[upper]
col_inds = col_inds[upper]
edge_list = np.stack((row_inds, col_inds)).T
return edge_list
def swap_edges(self, n_swaps: int = 1) -> Tuple[AdjacencyMatrix, np.ndarray]:
"""
Performs a number of edge swaps on the graph
Parameters
----------
n_swaps : int (default 1), optional
The number of edge swaps to be performed
Returns
-------
adjacency : np.ndarray OR csr.matrix, shape (n_verts, n_verts)
The adjancency matrix after a number of edge swaps are performed on the graph
edge_list : np.ndarray (n_verts, 2)
The edge_list after a number of edge swaps are perfomed on the graph
"""
# Note: for some reason could not get reproducibility w/o setting seed
# inside of the _edge_swap_function itself
max_seed = np.iinfo(np.int32).max
for _ in range(n_swaps):
self.adjacency, self.edge_list = self._edge_swap_function(
self.adjacency,
self.edge_list,
seed=self._rng.integers(max_seed),
)
adjacency = self.adjacency
if isinstance(adjacency, lil_matrix):
adjacency = csr_array(adjacency)
else:
adjacency = adjacency.copy()
return adjacency, self.edge_list.copy()
def _edge_swap(
adjacency: AdjacencyMatrix, edge_list: np.ndarray, seed: Optional[int] = None
) -> Tuple[AdjacencyMatrix, np.ndarray]:
"""
Performs the edge swap on the adjacency matrix. If adjacency is
np.ndarray, then nopython=True is used in numba, but if adjacency
is csr_array, then forceobj=True is used in numba
Parameters
----------
adjacency : np.ndarray OR csr_array, shape (n_verts, n_verts)
The initial adjacency matrix in which edge swaps are performed on it
edge_list : np.ndarray, shape (n_verts, 2)
The corresponding edge_list of adjacency
seed: int, optional
Random seed to make outputs reproducible, must be positive
Returns
-------
adjacency : np.ndarray OR csr_array, shape (n_verts, n_verts)
The adjancency matrix after an edge swap is performed on the graph
edge_list : np.ndarray (n_verts, 2)
The edge_list after an edge swap is perfomed on the graph
"""
# need to use np.random here instead of the generator for numba compatibility
if seed is not None:
np.random.seed(seed)
# choose two indices at random
# NOTE: using np.random here for current numba compatibility
orig_inds = np.random.choice(len(edge_list), size=2, replace=False)
u, v = edge_list[orig_inds[0]]
# two types of swap orientations for undirected graph
if np.random.rand() < 0.5:
x, y = edge_list[orig_inds[1]]
else:
y, x = edge_list[orig_inds[1]]
# ensures no initial loops
if u == v or x == y:
return adjacency, edge_list
# ensures no loops after swap (must be swap on 4 distinct nodes)
if u == x or v == y:
return adjacency, edge_list
# save edge values
w_ux = adjacency[u, x]
w_vy = adjacency[v, y]
# ensures no multigraphs after swap
if w_ux >= 1 or w_vy >= 1:
return adjacency, edge_list
# perform the swap
adjacency[u, v] = 0
adjacency[v, u] = 0
adjacency[x, y] = 0
adjacency[y, x] = 0
adjacency[u, x] = 1
adjacency[x, u] = 1
adjacency[v, y] = 1
adjacency[y, v] = 1
# update edge list
edge_list[orig_inds[0]] = [u, x]
edge_list[orig_inds[1]] = [v, y]
return adjacency, edge_list
_edge_swap_numba = nb.jit(_edge_swap, nopython=False)