In [1]:
import tensorflow as tf
import numpy as np
import csv
import os
import math

In [2]:
batch_size = 4096
emb_dim = 300
vocab_size = 539952

In [3]:
in_dir = r"D:\stocktwits_text\2018 all\skipgrams_noneg"
file_names = os.listdir(in_dir)
file_names = [os.path.join(in_dir, file_name) for file_name in file_names]
file_names

['D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\0.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\1.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\2.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\3.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\4.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\5.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\6.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\7.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\8.csv',
 'D:\\stocktwits_text\\2018 all\\skipgrams_noneg\\9.csv']

In [4]:
line_counts = [586828054, 677051750, 598760681, 583052985, 547069608,
               536936140, 551959422, 552639908, 551994643, 571124852]
steps = math.ceil(sum(line_counts)/batch_size)
steps_gpu = math.ceil(steps/2)
# each 100 step takes about 12 seconds
minutes = int(steps_gpu/100*12/60)
print("Total steps:", steps)
print("Total steps with 2 GPUs:", steps_gpu)
print("Total estimated minutes (GPU):", minutes)

Total steps: 1405620
Total steps with 2 GPUs: 702810
Total estimated minutes (GPU): 1405


In [5]:
def _parse_line(line):
    fields = tf.decode_csv(line, [[0], [0]])
    return {"word": fields[0]}, fields[1]

In [6]:
def train_input_fn_distributed():
    dataset = tf.data.TextLineDataset(file_names)
    dataset = dataset.shuffle(batch_size*100)
    dataset = dataset.batch(batch_size).map(_parse_line, num_parallel_calls=2).prefetch(batch_size)
    return dataset

In [7]:
def train_input_fn():
    dataset = tf.data.TextLineDataset(file_names)
    dataset = dataset.shuffle(batch_size*100)
    dataset = dataset.batch(batch_size).map(_parse_line, num_parallel_calls=2).prefetch(batch_size)
    data_iter = dataset.make_one_shot_iterator()
    return data_iter.get_next()     

In [8]:
def sg_model_fn(features, labels, mode):
    with tf.name_scope("embeddings"):
        embeddings = tf.get_variable("embedding", shape=[vocab_size, emb_dim])
        embed = tf.nn.embedding_lookup(embeddings, features["word"])
        print("Embedded shape:", embed.shape)
    with tf.name_scope("weights"):
        nce_weights = tf.get_variable("W", shape=[vocab_size, emb_dim])
        print("nce_weights shape:", nce_weights.shape)
    with tf.name_scope("biases"):
        nce_biases = tf.get_variable("b", shape=[vocab_size])
        print("nce_biases shape:", nce_biases.shape)

    with tf.name_scope("loss"):
        loss = tf.reduce_mean(tf.nn.nce_loss(
            weights=nce_weights, biases=nce_biases,
            inputs=embed, labels=labels[:, None], 
            num_sampled=5, num_classes=vocab_size))
    tf.summary.scalar("loss", loss)
    tf.summary.scalar("my_global_step", tf.train.get_global_step())
    merged = tf.summary.merge_all()

    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = tf.train.AdamOptimizer()
        train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
        return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)    

In [9]:
# %%time
# # batch 65536; no device specification
# classifier = tf.estimator.Estimator(model_fn=sg_model_fn, model_dir=r"F:\tf_model_dir6")
# classifier.train(input_fn=train_input_fn, steps=1000)

In [10]:
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=2, prefetch_on_device=True)
config = tf.estimator.RunConfig(
    save_summary_steps=500,
    train_distribute=strategy,
#     save_checkpoints_secs = 20*60,
    save_checkpoints_steps = 5000,
    keep_checkpoint_max = 3,
    model_dir=r"F:\w2v_model_dir_4096_shuffle")
classifier = tf.estimator.Estimator(model_fn=sg_model_fn, 
                                    config=config)

INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': 'F:\\w2v_model_dir_4096_shuffle', '_tf_random_seed': None, '_save_summary_steps': 500, '_save_checkpoints_steps': 5000, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 3, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x0000029CC6E2DE80>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x0000029CC6E2DFD0>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief':

In [None]:
%%time
classifier.train(input_fn=train_input_fn_distributed)

INFO:tensorflow:Device is available but not used by distribute strategy: /device:CPU:0
INFO:tensorflow:Configured nccl all-reduce.
INFO:tensorflow:Calling model_fn.
Embedded shape: (?, 300)
nce_weights shape: (539952, 300)
nce_biases shape: (539952,)
INFO:tensorflow:Calling model_fn.
Embedded shape: (?, 300)
nce_weights shape: (539952, 300)
nce_biases shape: (539952,)
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into F:\w2v_model_dir_4096_shuffle\model.ckpt.
INFO:tensorflow:loss = 38.72106, step = 0
INFO:tensorflow:global_step/sec: 11.9008
INFO:tensorflow:loss = 36.711246, step = 100 (8.403 sec)
INFO:tensorflow:global_step/sec: 13.3508
INFO:tensorflow:loss = 50.080864, step = 200 (7.489 sec)
INFO:tensorflow:global_step/sec: 13.6513
INFO:tensorflow:loss