Skip to content

Commit

Permalink
Tests for acyclic flow
Browse files Browse the repository at this point in the history
  • Loading branch information
pablormier committed May 5, 2024
1 parent 0fabc00 commit 20c1baa
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
12 changes: 8 additions & 4 deletions corneto/backend/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,10 @@ def AcyclicFlow(
) -> ProblemDef:
if not varname:
varname = VAR_FLOW
if isinstance(lb, list):
lb = np.array(lb)
if isinstance(ub, list):
ub = np.array(ub)
if not isinstance(lb, np.ndarray):
lb = np.array([lb] * g.num_edges)
if not isinstance(ub, np.ndarray):
Expand Down Expand Up @@ -820,8 +824,8 @@ def AcyclicFlow(
e_ix = np.array([i for i, (s, t) in e_pos if len(s) > 0 and len(t) > 0])
edges = [g.get_edge(i) for i in e_ix]
# Get the index of the source / target vertices of the edge
s_idx = np.array([vix[list(s)[0]] for (s, _) in edges])
t_idx = np.array([vix[list(t)[0]] for (_, t) in edges])
s_idx = np.array([vix[next(iter(s))] for (s, _) in edges])
t_idx = np.array([vix[next(iter(t))] for (_, t) in edges])
# The layer position in a DAG of the target vertex of the edge
# has to be greater than the source vertex, otherwise Ip (pos flow) has to be 0
if len(e_ix) > 0:
Expand All @@ -837,8 +841,8 @@ def AcyclicFlow(
e_ix = np.array([i for i, (s, t) in e_neg if len(s) > 0 and len(t) > 0])
edges = [g.get_edge(i) for i in e_ix]
# Get the index of the source / target vertices of the edge
s_idx = np.array([vix[list(s)[0]] for (s, _) in edges])
t_idx = np.array([vix[list(t)[0]] for (_, t) in edges])
s_idx = np.array([vix[next(iter(s))] for (s, _) in edges])
t_idx = np.array([vix[next(iter(t))] for (_, t) in edges])
if len(e_ix) > 0:
P += L[s_idx] - L[t_idx] >= In[e_ix] + (1 - g.num_vertices) * (
1 - In[e_ix]
Expand Down
36 changes: 36 additions & 0 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,42 @@ def test_acyclic_flow_undirected_edge(backend):
assert np.allclose(sol, vsol1) or np.allclose(sol, vsol2)


def test_feasible_loop(backend):
G = Graph()
G.add_edge((), 'A')
G.add_edge('A', ())
G.add_edge('A', 'B')
G.add_edge('B', 'A')
G.add_edge((), 'B')
G.add_edge('B', ())
P = backend.Flow(G)
P += P.expr.flow[2] >= 1
P += P.expr.flow[3] >= 1
if isinstance(backend, PicosBackend):
P.solve(solver="glpk")
else:
P.solve()
assert np.sum(P.expr.flow.value) >= 2


def test_acyclic_unfeasible_loop(backend):
G = Graph()
G.add_edge((), 'A')
G.add_edge('A', ())
G.add_edge('A', 'B')
G.add_edge('B', 'A')
G.add_edge((), 'B')
G.add_edge('B', ())
P = backend.AcyclicFlow(G)
P += P.expr.with_flow[2] == 1
P += P.expr.with_flow[3] == 1
if isinstance(backend, PicosBackend):
P.solve(solver="glpk", primals=False)
else:
P.solve()
assert np.all(P.expr.flow.value == None)


@pytest.mark.skip(reason="Only a small subset of solvers support this")
def test_l2_norm(backend):
x = np.array([1, 2])
Expand Down

0 comments on commit 20c1baa

Please sign in to comment.