In [1]:
import tensorflow as tf
import numpy as np

In [2]:
def reset_graph(seed=None):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)
    
    


In [3]:
sess = tf.Session()

In [5]:
sess.list_devices()

[_DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 268435456, 15917525426429477155),
 _DeviceAttributes(/job:localhost/replica:0/task:0/device:GPU:0, GPU, 3168180633, 14077114956586335782)]

In [6]:
sess.close()

#### Managing the GPU RAM

In [7]:
config = tf.ConfigProto()

In [9]:
config.gpu_options.per_process_gpu_memory_fraction = .4

In [10]:
session = tf.Session(config=config)

In [11]:
# another option
config.gpu_options.allow_growth = True

In [12]:
sess.close()

#### Placing Operations on Devices

In [39]:
reset_graph()


with tf.device('/cpu:0'):
    a = tf.Variable(3.0)
    b = tf.Variable(4.0)

c = a * b


In [31]:
c.device, a.device, b.device

('', '/device:CPU:0', '/device:CPU:0')

#### Logging placements

In [32]:
config = tf.ConfigProto()
config.log_device_placement = True

In [40]:
sess = tf.Session(config=config)

In [45]:
for op in sess.graph.get_operations():
    print(op.name, op.device, op.type)

Variable/initial_value /device:CPU:0 Const
Variable /device:CPU:0 VariableV2
Variable/Assign /device:CPU:0 Assign
Variable/read /device:CPU:0 Identity
Variable_1/initial_value /device:CPU:0 Const
Variable_1 /device:CPU:0 VariableV2
Variable_1/Assign /device:CPU:0 Assign
Variable_1/read /device:CPU:0 Identity
mul  Mul


In [41]:
a.initializer.run(session=sess)
b.initializer.run(session=sess)

sess.run(c)

12.0

In [46]:
sess.close()

#### Dynamic placement function

In [49]:
reset_graph()


def variables_on_cpu(op):
    if op.name.startswith('Variable'):
        return '/cpu:0'
    return '/gpu:0'

with tf.device(variables_on_cpu):
    a = tf.Variable(4.0)
    b = tf.Variable(3.0)
    
    c = a * b

In [50]:
a.device, b.device, c.device

('/cpu:0', '/cpu:0', '/gpu:0')

#### Soft placement

In [51]:
reset_graph()

with tf.device('/gpu:0'):
    i = tf.Variable(3)
    
config = tf.ConfigProto()
config.allow_soft_placement = True

sess = tf.Session(config=config)

sess.run(i.initializer)

In [54]:
i.device, i.dtype

('/device:GPU:0', tf.int32_ref)

In [55]:
config.inter_op_parallelism_threads

0

In [56]:
config.intra_op_parallelism_threads

0

#### Local Server

In [3]:
reset_graph()

c = tf.constant('Hello distributed tensorflow!')
server = tf.train.Server.create_local_server()

This method is a convenience wrapper for creating a
`tf.train.Server` with a `tf.train.ServerDef` that specifies a
single-process cluster containing a single task in a job called
`"local"`.

In [4]:
# server.target -> Returns the target for a `tf.Session` to connect to this server
with tf.Session(target=server.target) as sess:
    print(sess.run(c))

b'Hello distributed tensorflow!'


In [45]:
server.target

b'grpc://localhost:59232'

#### Cluster

In [5]:
# tf.train.ClusterSpec() -> Represents a cluster as a set of "tasks", organized into "jobs".
cluster_spec = tf.train.ClusterSpec({
    'ps': [
        '127.0.0.1:2221',      # /job:ps/task:0
        '127.0.0.1:2222',      # /job:ps/task:1
    ],
    'worker': [
        '127.0.0.1:2223',       # /job:worker/task:0
        '127.0.0.1:2224',       # /job:worker/task:1
        '127.0.0.1:2225',       # /job:worker/task:2
    ],
})

In [6]:
task_ps0 = tf.train.Server(cluster_spec, job_name='ps', task_index=0)
task_ps1 = tf.train.Server(cluster_spec, job_name='ps', task_index=1)
task_worker0 = tf.train.Server(cluster_spec, job_name='worker', task_index=0)
task_worker1 = tf.train.Server(cluster_spec, job_name='worker', task_index=1)
task_worker2 = tf.train.Server(cluster_spec, job_name='worker', task_index=2)

#### Pinning operations across devices and servers


In [7]:
reset_graph()

with tf.device("/job:ps"):
    a = tf.Variable(3.0, name='a')
    
with tf.device('/job:worker'):
    b = a + 3
    
with tf.device('/job:worker/task:1'):
    c = a + b

In [67]:
task_ps0.target

b'grpc://localhost:2221'

In [10]:
with tf.Session(task_worker0.target) as sess:
    sess.run(a.initializer)
    print(sess.run(c))

9.0


In [18]:
reset_graph()

with tf.device(tf.train.replica_device_setter(cluster=cluster_spec)):
    v1 = tf.Variable(1.0)
    v2 = tf.Variable(2.0)
    v3 = tf.Variable(3.0)
    v4 = tf.Variable(4.0)
    v5 = tf.Variable(5.0)
    
    s = v1 + v2
    with tf.device('/task:1'):
        p1 = 2 * s
    with tf.device('/cpu:0'):
        p2 = 3 * s
        
config = tf.ConfigProto()
config.log_device_placement = True

with tf.Session(target=task_ps0.target, config=config) as sess:
    sess.run([v1.initializer, v2.initializer])
    print(s.eval(), p1.eval(), p2.eval())

3.0 6.0 9.0


In [19]:
v1.device, v2.device, v3.device, v4.device, v5.device

('/job:ps/task:0',
 '/job:ps/task:1',
 '/job:ps/task:0',
 '/job:ps/task:1',
 '/job:ps/task:0')

In [20]:
s.device, p1.device, p2.device

('/job:worker', '/job:worker/task:1', '/job:worker/device:CPU:0')

#### Reader - the old way

In [21]:
with open('my_test.csv', 'w') as f:
    f.write('x1, x2, target\n')
    f.write('1., 2., 0\n')
    f.write('4., 5, 1\n')
    f.write('7., , 0')

In [33]:
reset_graph()

# filename queue
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(dtype=tf.string, shape=())
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

# reader
reader = tf.TextLineReader(skip_header_lines=1)
key, val = reader.read(filename_queue)
x1, x2, target = tf.decode_csv(val, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

# instance queue
instance_queue = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, 
                                       dtypes=[tf.float32, tf.int32], 
                                       shapes=[[2], []], name='instance_q', 
                                       shared_name='shared_instance_q')
enqueue_instace = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

# dequeue 
minibatch_features, minibatch_target = instance_queue.dequeue_up_to(n=2)


with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename:'my_test.csv'})
    sess.run(close_filename_queue)
    
    try:
        while True:
            sess.run(enqueue_instace)
    except tf.errors.OutOfRangeError as ex:
        print('no more files to read')
    sess.run(close_instance_queue)
        
    try:
        while True:
            print(sess.run([minibatch_features, minibatch_target]))
    except tf.errors.OutOfRangeError as ex:
        print('no more instances to read')
            

no more files to read
[array([[4., 5.],
       [7., 0.]], dtype=float32), array([1, 0])]
[array([[1., 2.]], dtype=float32), array([0])]
no more instances to read


#### Queue runners and coordinators


In [39]:
reset_graph()


# filename queue
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string, [])
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

# file reader
line_reader = tf.TextLineReader(skip_header_lines=1)
key, value = line_reader.read(filename_queue)
x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

# instance queue
instance_queue = tf.RandomShuffleQueue(capacity=10, dtypes=[tf.float32, tf.int32], shapes=[[2],[]], 
                                       min_after_dequeue=2, name='instance_q', shared_name='shared_instance_q')
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

# instance dequeue
minibatch_features, minibatch_target = instance_queue.dequeue_up_to(2)

# queue runner & coordinator
num_threads = 4
coord = tf.train.Coordinator()
queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance]*num_threads)


with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: 'my_test.csv'})
    sess.run(close_filename_queue)
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_features, minibatch_target]))
    except tf.errors.OutOfRangeError as ex:
        print('no more file to read')

    

[array([[1., 2.],
       [4., 5.]], dtype=float32), array([0, 1])]
[array([[7., 0.]], dtype=float32), array([0])]
no more file to read


In [44]:
# read multiple files simultaneously
reset_graph()

# filename queue
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string, [])
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

# instance queue
instance_queue = tf.RandomShuffleQueue(capacity=10, dtypes=[tf.float32, tf.int32], shapes=[[2], []], 
                                       min_after_dequeue=2, name='instance_q', shared_name='shared_instance_q')
close_instance_queue = instance_queue.close()
minibatch_features, minibatch_target = instance_queue.dequeue_up_to(2)

# read multi-file operations
def read_push_instance(filename_queue, instance_queue):
    reader = tf.TextLineReader(skip_header_lines=1)
    key, values = reader.read(filename_queue)
    x1, x2, target = tf.decode_csv(values, record_defaults=[[-1.], [-1.], [-1]])
    features = tf.stack([x1, x2])
    enqueue_instance = instance_queue.enqueue([features, target])
    return enqueue_instance

read_and_enqueue_ops = [read_push_instance(filename_queue, instance_queue) 
                        for i in range(5)]
coord = tf.train.Coordinator()
queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: 'my_test.csv'})
    sess.run(close_filename_queue)
    
    queue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_features, minibatch_target]))
    except tf.errors.OutOfRangeError as ex:
        print('no more file to read')

[array([[1., 2.],
       [7., 0.]], dtype=float32), array([0, 0])]
[array([[4., 5.]], dtype=float32), array([1])]
no more file to read


#### Data API

In [78]:
tf.reset_default_graph()

data = tf.data.Dataset.from_tensor_slices(np.arange(10))

data = data.repeat(3).batch(7)

iterator = data.make_one_shot_iterator()

next_batch = iterator.get_next()

In [79]:
with tf.Session() as sess:
    
    try:
        while True:
            print(sess.run([next_batch, next_batch]))
            
    except tf.errors.OutOfRangeError as ex:
        print("out of data")

[array([0, 1, 2, 3, 4, 5, 6]), array([0, 1, 2, 3, 4, 5, 6])]
[array([7, 8, 9, 0, 1, 2, 3]), array([7, 8, 9, 0, 1, 2, 3])]
[array([4, 5, 6, 7, 8, 9, 0]), array([4, 5, 6, 7, 8, 9, 0])]
[array([1, 2, 3, 4, 5, 6, 7]), array([1, 2, 3, 4, 5, 6, 7])]
[array([8, 9]), array([8, 9])]
out of data


In [81]:
tf.reset_default_graph()

data = tf.data.Dataset.from_tensor_slices(np.arange(10))
data = data.repeat(3).batch(7)
data = data.interleave(lambda v: tf.data.Dataset.from_tensor_slices(v), cycle_length=3, block_length=2)

In [82]:
iterator = data.make_one_shot_iterator()
next_elements = iterator.get_next()

In [84]:
with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_elements), end=', ')
    except tf.errors.OutOfRangeError:
        print('done')

0, 1, 7, 8, 4, 5, 2, 3, 9, 0, 6, 7, 4, 5, 1, 2, 8, 9, 6, 3, 0, 1, 2, 8, 9, 3, 4, 5, 6, 7, done


#### Readers – the new way


In [103]:
tf.reset_default_graph()

dataset = tf.data.TextLineDataset(['my_test.csv'])

def reader(line):
    x1, x2, y = tf.decode_csv(line, record_defaults=[[-1.], [-1.], [-1]])
    x = tf.stack([x1, x2])
    return x, y

dataset = dataset.skip(1).map(reader)

iterator = dataset.make_one_shot_iterator()
X, y = iterator.get_next()

In [102]:
with tf.Session() as sess:
    try:
        while True:
            a, b = sess.run([X, y])
            print(a, b)
    except tf.errors.OutOfRangeError:
        print('done')

[1. 2.] 0
[4. 5.] 1
[7. 0.] 0
done
