Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyError: 'input' error #94

Closed
xiaoyongzhu opened this issue Jun 17, 2017 · 3 comments
Closed

KeyError: 'input' error #94

xiaoyongzhu opened this issue Jun 17, 2017 · 3 comments

Comments

@xiaoyongzhu
Copy link

Hello,

I am trying TFoS, and successfully converted the MNIST zip files into HDFS files following the link here. However, when I try to run the sample named "Run distributed MNIST training (using feed_dict)", I encountered the following error saying "KeyError: 'input' error" and I have no clue on what's going on.

My environment is:
Spark 2.1.1 + YARN/Hadoop 2.6 + latest TFoS (master branch) + latest TF (1.2.0). I am using YARN-client mode. Below is the command line I am using. I have a small cluster of 2 nodes (I remember some issue mentioned that the executor number should be better be set to the cluster node number, not quite sure though) and I am sure that libhdfs.so and libjvm.so is in the LD_LIBRARY_PATH.

Also I did a bit research and found a similar issue #32 with a couple of other issues - However since that issue was on Feb, I thought all the fixes should be merged in the latest branches. That's why I am raising this issue again.

I am also attaching the full logs Untitled-1.txt in case you need more information.

Thanks for looking at it!


spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--queue ${QUEUE} \
--executor-memory 4G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives wasb:///Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server:/usr/hdp/2.6.0.10-29/usr/lib/" \
--conf spark.executorEnv.PYSPARK_PYTHON="Python/bin/python" \
--jars ecosystem/hadoop/target/tensorflow-hadoop-1.0-SNAPSHOT.jar \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images wasb:///user/sshuser/mnist/csv/train/images \
--labels wasb:///user/sshuser/mnist/csv/train/labels \
--mode train \
--model mnist_model




17/06/17 05:20:09 INFO TaskSetManager: Starting task 6.3 in stage 1.0 (TID 17, 10.0.0.11, executor 1, partition 6, PROCESS_LOCAL, 6768 bytes)
17/06/17 05:20:09 INFO TaskSetManager: Lost task 5.3 in stage 1.0 (TID 16) on 10.0.0.11, executor 1: org.apache.spark.api.python.PythonException (Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 345, in func
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 793, in func
  File "/home/sshuser/TensorFlowOnSpark/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 433, in _train
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 567, in _create
    id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 105, in dispatch
    raise convert_to_error(kind, result)
RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 207, in handle_request
    result = func(c, *args, **kwds)
  File "/home/sshuser/Python/lib/python2.7/multiprocessing/managers.py", line 386, in create
    obj = callable(*args, **kwds)
  File "./tfspark.zip/tensorflowonspark/TFManager.py", line 34, in <lambda>
    TFManager.register('get_queue', callable=lambda qname: qdict[qname])
KeyError: 'input'
---------------------------------------------------------------------------

@leewyang
Copy link
Contributor

Hi @xiaoyongzhu, this error generally occurs when a data feeding task is assigned to the executor running the PS node. And, this can only occur if you're configured to run more than one task per executor.

So, for example, you have two hosts, and you started a PS node on one executor and a worker node on the other, per:

17/06/17 05:20:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.0.0.11, executor 1, partition 0, PROCESS_LOCAL, 6088 bytes)
17/06/17 05:20:02 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.0.0.14, executor 2, partition 1, PROCESS_LOCAL, 6088 bytes)
17/06/17 05:20:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.0.14:36036 (size: 7.8 KB, free: 2004.6 MB)
17/06/17 05:20:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.0.11:44572 (size: 7.8 KB, free: 2004.6 MB)
2017-06-17 05:20:03,426 INFO (MainThread-15141) waiting for 1 reservations
2017-06-17 05:20:04,427 INFO (MainThread-15141) all reservations completed
2017-06-17 05:20:04,427 INFO (MainThread-15141) All TFSparkNodes started
2017-06-17 05:20:04,427 INFO (MainThread-15141) {'addr': '/tmp/pymp-mWKGgU/listener-dQ0ETu', 'task_index': 0, 'port': 43064, 'authkey': '\xb6\x05\x10\x08\xef\x94@&\x97\x89a\x16\x90\x98\xd0\xbd', 'worker_num': 1, 'host': 'wn1-xiaoyz', 'ppid': 28794, 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0}
2017-06-17 05:20:04,427 INFO (MainThread-15141) {'addr': ('wn0-xiaoyz', 35537), 'task_index': 0, 'port': 39157, 'authkey': '+\xa5\x05\x17\xd93Db\x80\x16\x19\xa9\x13\x9b%U', 'worker_num': 0, 'host': 'wn0-xiaoyz', 'ppid': 31410, 'job_name': 'ps', 'tb_pid': 0, 'tb_port': 0}

However, the data feeding job reports:

17/06/17 05:20:04 INFO YarnScheduler: Adding task set 1.0 with 10 tasks
17/06/17 05:20:04 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 10.0.0.11, executor 1, partition 0, PROCESS_LOCAL, 6768 bytes)
17/06/17 05:20:04 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 10.0.0.14, executor 2, partition 1, PROCESS_LOCAL, 6768 bytes)
17/06/17 05:20:04 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, 10.0.0.11, executor 1, partition 2, PROCESS_LOCAL, 6768 bytes)
17/06/17 05:20:04 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 5, 10.0.0.14, executor 2, partition 3, PROCESS_LOCAL, 6768 bytes)
...
17/06/17 05:20:05 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 6, 10.0.0.14, executor 2, partition 4, PROCESS_LOCAL, 6768 bytes)
...
17/06/17 05:20:05 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 7, 10.0.0.11, executor 1, partition 5, PROCESS_LOCAL, 6768 bytes)
17/06/17 05:20:05 INFO TaskSetManager: Starting task 6.0 in stage 1.0 (TID 8, 10.0.0.11, executor 1, partition 6, PROCESS_LOCAL, 6768 bytes)

So you will need to configure spark to run one task per executor, e.g. setting --conf spark.executor.cores=1.

@renato2099
Copy link

Hi @leewyang
Does this mean that we always have to run with --conf spark.executor.cores=1 and scale with the number of executors instead? i.e. create more executors rather than assigning more cpus to them? Is this correct?

@leewyang
Copy link
Contributor

leewyang commented Jul 19, 2017

@renato2099 yes. Keep in mind that the spark.executor.cores is a resource allocation hint and not a hardware limit. It just tells Spark that you only want to run one task at a time on each executor, which in the TFoS setting means that we only want to run one TensorFlow node on each executor. And this was chosen as the simplest/easiest-to-reason-about "level of abstraction" (vs. one TensorFlow node per task)... for example, each executor's log will only contain log statements from one TensorFlow node.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants