Skip to content

Commit

Permalink
Enable python3 (#31)
Browse files Browse the repository at this point in the history
* enable python3
  • Loading branch information
Soojeong Kim authored and KyungGeun Lee committed Jun 29, 2019
1 parent 3e74a7f commit a3ed41e
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 60 deletions.
2 changes: 1 addition & 1 deletion doc/installation.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Installation
Parallax runs under Linux with Python 2.7; we haven't yet tested Parallax on other platforms and 3.3+.
Parallax runs under Linux with Python 2.7 and 3.6; we haven't yet tested Parallax on other platforms and 3.3+.
Parallax depends on a modified version of TensorFlow 1.6/1.11 and horovod 0.11.2 in parallax repository as submodules. *Each of these frameworks needs to be built and installed from source, which is explained in further detail below*. Parallax itself also requires installing from sources, and below explains the installation process step by step. We plan to provide binary files in the near future.

First, clone the parallax repository on your linux machine:
Expand Down
11 changes: 6 additions & 5 deletions parallax/parallax/core/python/common/graph_transform_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import re
import sys
import time
from functools import reduce

import tensorflow as tf
from tensorflow.core.framework import attr_value_pb2
Expand Down Expand Up @@ -483,8 +484,8 @@ def _replace_update_op_with_read_op(var_op, var_update_op, finish_op):
var_op_to_finish_op = {}
trainable_var_op_to_update_op = {}
non_trainable_var_op_to_update_op = {}
all_var_update_op_types = dense_var_update_op_types.keys() \
+ sparse_var_update_op_types.keys()
all_var_update_op_types = list(dense_var_update_op_types.keys()) \
+ list(sparse_var_update_op_types.keys())
for op in tf.get_default_graph().get_operations():
# Find variable update ops
if not op.type in all_var_update_op_types:
Expand Down Expand Up @@ -869,7 +870,7 @@ def _update_colocation(node, replica_id=None):
for i in range(len(class_list.s)):
s = class_list.s[i]
if s.startswith(BINARY_ENCODED_COLOCATION_PREFIX):
op_name_to_bind_to = s[len(BINARY_ENCODED_COLOCATION_PREFIX):]
op_name_to_bind_to = s[len(BINARY_ENCODED_COLOCATION_PREFIX):].decode("utf-8")
if op_name_to_bind_to in op_names_to_replicate:
# delete colocation constraint if shared op needs to be
# colocated with replica op
Expand Down Expand Up @@ -1807,8 +1808,8 @@ def _replace_update_op_with_read_op(var_op, var_update_op, finish_op):
var_op_to_finish_op = {}
trainable_var_op_to_update_op = {}
non_trainable_var_op_to_update_op = {}
all_var_update_op_types = sparse_var_update_op_types.keys() \
+ dense_var_update_op_types.keys()
all_var_update_op_types = list(sparse_var_update_op_types.keys()) \
+ list(dense_var_update_op_types.keys())

for op in tf.get_default_graph().get_operations():
# Find variable update ops
Expand Down
6 changes: 3 additions & 3 deletions parallax/parallax/core/python/common/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def remote_exec(bash_script,
python_venv=None,
port=22):
full_cmd = ' '.join(
map(lambda (k, v): 'export %s=%s;' % (k, v), env.iteritems()))
map(lambda k: 'export %s=%s;' % (k[0], k[1]), env.items()))
if python_venv is not None:
full_cmd += ' source %s/bin/activate; ' % python_venv
full_cmd += bash_script
Expand All @@ -99,7 +99,7 @@ def remote_exec(bash_script,


def _get_available_gpus(hostname):
result = subprocess.check_output('ssh %s ls /proc/driver/nvidia/gpus' % hostname, shell=True)
result = subprocess.check_output('ssh %s ls /proc/driver/nvidia/gpus' % hostname, shell=True).decode()
return list(range(len(result.strip().split('\n'))))


Expand Down Expand Up @@ -155,7 +155,7 @@ def serialize_machine(m):
return '%s:%s:%s' % (m['hostname'], ','.join([str(port) for port in m['port']]), ','.join([str(gpu) for gpu in m['gpus']]))
def serialize_machines(machines):
return '+'.join([serialize_machine(m) for m in machines])
return '^'.join(['%s_%s' % (type, serialize_machines(machines)) for type, machines in resource_info.iteritems()])
return '^'.join(['%s_%s' % (type, serialize_machines(machines)) for type, machines in resource_info.items()])


def deserialize_resource_info(resource_info_serialized):
Expand Down
45 changes: 24 additions & 21 deletions parallax/parallax/core/python/common/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
# ==============================================================================
import os
from multiprocessing.managers import BaseManager
import Queue
try:
import Queue as queue
except ImportError:
import queue
import numpy as np
from scipy import optimize
import time
Expand Down Expand Up @@ -63,7 +66,7 @@ def setup_manager(self):
if self.start is None:
self.start = time.time()
self.m = BaseManager(address=self.address, authkey='parallax_auth')
queue = Queue.Queue()
queue = queue.Queue()
BaseManager.register('queue', callable=lambda:queue)
self.m.start()
return self.m
Expand Down Expand Up @@ -94,28 +97,28 @@ def recv_exec_time(self, processes, cleanup, num_required):

if self.prev_p:
if self.prev_exec_time < curr_exec_time:
# decrease or stop
# decrease or stop
if self.prev_p > curr_p:
stop = True
else:
# search the oposite partitions
self.p_to_test = min(self.p_list) / 2
else:
else:
# search the oposite partitions
self.p_to_test = min(self.p_list) / 2
else:
assert (self.prev_exec_time / curr_exec_time) > 1
# keep increase or keep decrease
if self.prev_p < curr_p:
self.p_to_test *= 2
else:
self.p_to_test /= 2

if self.p_to_test < self.min_partitions:
stop = True
else:
# increase first
self.p_to_test *= 2

self.prev_p = curr_p
self.prev_exec_time = curr_exec_time
# keep increase or keep decrease
if self.prev_p < curr_p:
self.p_to_test *= 2
else:
self.p_to_test /= 2

if self.p_to_test < self.min_partitions:
stop = True
else:
# increase first
self.p_to_test *= 2

self.prev_p = curr_p
self.prev_exec_time = curr_exec_time
else:
# communication error when num partitions is small
if self.prev_p:
Expand Down
50 changes: 25 additions & 25 deletions parallax/parallax/core/python/common/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,42 +90,42 @@ def _parallax_run_master(single_gpu_meta_graph_def,
if search_p:
m = stat_collector.setup_manager()

if config.run_option == 'MPI' or \
(config.run_option == 'HYBRID' and len(sparse_grads) == 0):
if config.run_option == 'MPI' or \
(config.run_option == 'HYBRID' and len(sparse_grads) == 0):
num_workers = sum([max(1, len(w['gpus'])) for w in \
config.resource_info['worker']])
processes, cleanup = \
launch_mpi_driver(driver_path,
args,
config,
processes, cleanup = \
launch_mpi_driver(driver_path,
args,
config,
p_to_test,
m)
elif config.run_option == 'PS' or \
(config.run_option == 'HYBRID' and len(dense_grads) == 0):
num_workers = len(config.resource_info['worker'])
processes, logfiles, cleanup = \
launch_ps_driver(driver_path,
args,
config,
elif config.run_option == 'PS' or \
(config.run_option == 'HYBRID' and len(dense_grads) == 0):
num_workers = len(config.resource_info['worker'])
processes, logfiles, cleanup = \
launch_ps_driver(driver_path,
args,
config,
p_to_test,
m)
elif config.run_option == 'HYBRID':
num_workers = sum([max(1, len(w['gpus'])) for w in config.resource_info['worker']])
processes, cleanup = \
launch_hybrid_driver(driver_path,
args,
config,
elif config.run_option == 'HYBRID':
num_workers = sum([max(1, len(w['gpus'])) for w in config.resource_info['worker']])
processes, cleanup = \
launch_hybrid_driver(driver_path,
args,
config,
p_to_test,
m)
else:
raise ValueError("Run option must be one of MPI, PS or HYBRID")

if not search_p:
processes[0].wait()
break
else:
search_p, p_to_test = \
stat_collector.recv_exec_time(processes, cleanup, num_workers)
if not search_p:
processes[0].wait()
break
else:
search_p, p_to_test = \
stat_collector.recv_exec_time(processes, cleanup, num_workers)
except:
traceback.print_exc()
finally:
Expand Down
1 change: 0 additions & 1 deletion parallax/parallax/core/python/common/session_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import contextlib
from multiprocessing.managers import BaseManager
import os
import Queue
import threading
import time

Expand Down
2 changes: 1 addition & 1 deletion parallax/parallax/core/python/hybrid/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def create_mpi_script(driver_path, args, hostname, gpus, resource_info,
if partitions:
env[PARALLAX_PARTITIONS] = partitions
cmd_env = ' '.join(
map(lambda (k, v): 'export %s=%s;' % (k, v), env.iteritems()))
map(lambda k: 'export %s=%s;' % (k[0], k[1]), env.items()))
try:
cmd_venv = ' source %s/bin/activate; '\
% os.environ['VIRTUAL_ENV']
Expand Down
2 changes: 1 addition & 1 deletion parallax/parallax/core/python/mpi/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def create_mpi_script(driver_path, args, hostname, gpus, partitions, search,
env[PARALLAX_PARTITIONS] = partitions

cmd_env = ' '.join(
map(lambda (k, v): 'export %s=%s;' % (k, v), env.iteritems()))
map(lambda k: 'export %s=%s;' % (k[0], k[1]), env.items()))
try:
cmd_venv = ' source %s/bin/activate; '\
% os.environ['VIRTUAL_ENV']
Expand Down
2 changes: 1 addition & 1 deletion parallax/parallax/examples/lm1b/resource_info
Original file line number Diff line number Diff line change
@@ -1 +1 @@
123.456.78.90:1,2,4,5
123.456.78.90:1,2
2 changes: 1 addition & 1 deletion parallax/parallax/examples/nmt/model_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def avg_checkpoints(model_dir, num_last_checkpoints, global_step,
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
for p, assign_op, (name, value) in zip(placeholders, assign_ops,
six.iteritems(var_values)):
six.items(var_values)):
sess.run(assign_op, {p: value})

# Use the built saver to save the averaged checkpoint. Only keep 1
Expand Down

0 comments on commit a3ed41e

Please sign in to comment.