New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help please: I get error trying to feed data via spark RDD - Jupyter notebook #86

Closed
jaideepjoshi opened this Issue May 26, 2017 · 3 comments

Comments

Projects
None yet
2 participants
@jaideepjoshi

jaideepjoshi commented May 26, 2017

I am running TFOS_spark_demo.ipynb on a Standalone 6-node Spark cluster.
#Feed data via Spark RDD
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
cluster.train(dataRDD, args.epochs)

Error Below:

Py4JJavaError Traceback (most recent call last)
in ()
3 labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
4 dataRDD = images.zip(labels)
----> 5 cluster.train(dataRDD, args.epochs)

/usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.pyc in train(self, dataRDD, num_epochs, qname)
83 rdds.append(dataRDD)
84 unionRDD = self.sc.union(rdds)
---> 85 unionRDD.foreachPartition(TFSparkNode.train(self.cluster_info, self.cluster_meta, qname))
86
87 def inference(self, dataRDD, qname='input'):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in foreachPartition(self, f)
765 except TypeError:
766 return iter([])
--> 767 self.mapPartitions(func).count() # Force evaluation
768
769 def collect(self):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in count(self)
1006 3
1007 """
-> 1008 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1009
1010 def stats(self):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in sum(self)
997 6.0
998 """
--> 999 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1000
1001 def count(self):

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
871 # zeroValue provided to each partition is unique from the one provided
872 # to the final reduce call
--> 873 vals = self.mapPartitions(func).collect()
874 return reduce(op, vals, zeroValue)
875

/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.pyc in collect(self)
774 """
775 with SCCallSiteSync(self.context) as css:
--> 776 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
777 return list(_load_from_socket(port, self._jrdd_deserializer))
778

/opt/mapr/spark/spark-2.0.1/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/opt/mapr/spark/spark-2.0.1/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/opt/mapr/spark/spark-2.0.1/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 1.0 failed 4 times, most recent failure: Lost task 44.3 in stage 1.0 (TID 96, 172.16.10.54): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 762, in func
r = f(it)
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 432, in _train
mgr = _get_manager(cluster_info, socket.gethostname(), os.getppid())
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 65, in _get_manager
logging.info("Connected to TFSparkNode.mgr on {0}, ppid={1}, state={2}".format(host, ppid, str(TFSparkNode.mgr.get('state'))))
AttributeError: 'NoneType' object has no attribute 'get'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1893)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1933)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/opt/mapr/spark/spark-2.0.1/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 2371, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/opt/mapr/spark/spark-2.0.1/python/pyspark/rdd.py", line 762, in func
r = f(it)
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 432, in _train
mgr = _get_manager(cluster_info, socket.gethostname(), os.getppid())
File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 65, in _get_manager
logging.info("Connected to TFSparkNode.mgr on {0}, ppid={1}, state={2}".format(host, ppid, str(TFSparkNode.mgr.get('state'))))
AttributeError: 'NoneType' object has no attribute 'get'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
@leewyang

This comment has been minimized.

Show comment
Hide comment
@leewyang

leewyang May 26, 2017

Contributor

@jaideepjoshi can you try modifying the ipynb script to set num_executors = 6? In that particular script, we aren't inferring the number of executors from the environment, so you'll have to tell the app how many executors are actually present.

Contributor

leewyang commented May 26, 2017

@jaideepjoshi can you try modifying the ipynb script to set num_executors = 6? In that particular script, we aren't inferring the number of executors from the environment, so you'll have to tell the app how many executors are actually present.

@jaideepjoshi

This comment has been minimized.

Show comment
Hide comment
@jaideepjoshi

jaideepjoshi May 26, 2017

jaideepjoshi commented May 26, 2017

@leewyang

This comment has been minimized.

Show comment
Hide comment
@leewyang

leewyang May 26, 2017

Contributor

At the moment, yes, since we're mostly targeting YARN use cases internally. In these use cases, you would typically use spark-submit --num-executors N to start a TF cluster of size N. It would be a bit weird to start 10 spark executors to run 2 TF nodes. And even if we made this work, then data feeding via InputMode.SPARK would require another network hop to send data from a non-TF node to a TF node.

Contributor

leewyang commented May 26, 2017

At the moment, yes, since we're mostly targeting YARN use cases internally. In these use cases, you would typically use spark-submit --num-executors N to start a TF cluster of size N. It would be a bit weird to start 10 spark executors to run 2 TF nodes. And even if we made this work, then data feeding via InputMode.SPARK would require another network hop to send data from a non-TF node to a TF node.

@leewyang leewyang closed this Jul 19, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment