Skip to content

Commit

Permalink
Fix submitting parts of multiple outputs operand like svd (#42)
Browse files Browse the repository at this point in the history
* add ut to serialize part of svd results

* remove root distributor, add SchedulerDistributor and WorkerDistributor

* move the start_service logic in scheduler main to a sole file

* add missing file

* fix ut

* remove ut

* add worker service

* unify worker actor names(with prefix w:)

* add localDistributedCluster, fix test_api

* add file

* add local cluster session

* fix dashboard

* add web suppport

* finish web suppport

* ajust the way to calculate number of workers and schedulers, add ut for it

* fix fetch tensor when the result tensor is a scalar, fix tensor.execute that not every session supports n_parallel, add ut for local cluster execute

* fix session when the result is an scalar and not a numpy type

* add mechanism to check if scheduler is ready to prevent failing of prepare graph

* use actor pool's sleep instead of time.sleep

* limit shared memory size for testing local cluster

* add tests for svd on local cluster

* print info to check ci temporarily

* try to fix ut

* fix ut
  • Loading branch information
qinxuye committed Dec 18, 2018
1 parent 9637373 commit dff897a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 3 deletions.
35 changes: 34 additions & 1 deletion mars/deploy/local/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def testNSchedulersNWorkers(self):
self.assertEqual(LocalDistributedCluster._calc_scheduler_worker_n_process(
5, 3, 2, calc_cpu_count=calc_cpu_cnt), (3, 2))

def testTensorExecute(self):
def testSingleOutputTensorExecute(self):
with new_cluster(scheduler_n_process=2, worker_n_process=2) as cluster:
self.assertIs(cluster.session, Session.default_or_local())

Expand All @@ -106,3 +106,36 @@ def testTensorExecute(self):

res = r.execute()
self.assertLess(res, 39)

def testMultipleOutputTensorExecute(self):
with new_cluster(scheduler_n_process=2, worker_n_process=2) as cluster:
session = cluster.session

t = mt.random.rand(20, 5, chunks=5)
r = mt.linalg.svd(t)

res = session.run((t,) + r)

U, s, V = res[1:]
np.testing.assert_allclose(res[0], U.dot(np.diag(s).dot(V)))

raw = np.random.rand(20, 5)

# to test the fuse, the graph should be fused
t = mt.array(raw)
U, s, V = mt.linalg.svd(t)
r = U.dot(mt.diag(s).dot(V))

res = r.execute()
np.testing.assert_allclose(raw, res)

# test submit part of svd outputs
t = mt.array(raw)
U, s, V = mt.linalg.svd(t)

with new_session(cluster.endpoint) as session2:
U_result, s_result = session2.run(U, s)
U_expected, s_expectd, _ = np.linalg.svd(raw, full_matrices=False)

np.testing.assert_allclose(U_result, U_expected)
np.testing.assert_allclose(s_result, s_expectd)
2 changes: 2 additions & 0 deletions mars/graph.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ cdef class DirectedGraph:
continue
if self.count_successors(v) != 1:
continue
if len(v.op.outputs) != 1:
continue
selected = [v]
# add successors
cur_node = self.successors(v)[0]
Expand Down
3 changes: 2 additions & 1 deletion mars/operands/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@


class RandomOperand(Operand):
_state = TupleField('state')
_state = TupleField('state', on_serialize=lambda x: tuple(x) if x is not None else x,
on_deserialize=lambda x: State(x) if x is not None else x)

@property
def state(self):
Expand Down
2 changes: 1 addition & 1 deletion mars/tensor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def build_graph(self, graph=None, cls=DAG, tiled=False, compose=True):
nodes = list(c.data for c in self.chunks)
keys = list(c.key for c in self.chunks)
else:
nodes = [self]
nodes = list(self.op.outputs)
visited = set()
while len(nodes) > 0:
chunk = nodes.pop()
Expand Down
15 changes: 15 additions & 0 deletions mars/tensor/expressions/tests/test_linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import numpy as np

import mars.tensor as mt
from mars.graph import DirectedGraph


class Test(unittest.TestCase):
Expand Down Expand Up @@ -62,6 +63,20 @@ def testSVD(self):
self.assertEqual(len(s.chunks), 1)
self.assertEqual(len(V.chunks), 1)

rs = mt.random.RandomState(1)
a = rs.rand(9, 6, chunks=(3, 6))
U, s, V = mt.linalg.svd(a)

# test tensor graph
graph = DirectedGraph()
U.build_graph(tiled=False, graph=graph)
s.build_graph(tiled=False, graph=graph)
new_graph = DirectedGraph.from_json(graph.to_json())
self.assertEqual((len(new_graph)), 4)
new_outputs = [n for n in new_graph if new_graph.count_predecessors(n) == 1]
self.assertEqual(len(new_outputs), 3)
self.assertEqual(len(set([o.op for o in new_outputs])), 1)

def testLU(self):
a = mt.random.randint(1, 10, (6, 6), chunks=3)
p, l, u = mt.linalg.lu(a)
Expand Down

0 comments on commit dff897a

Please sign in to comment.