New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous mode not working #102
Comments
Hi, I have the same problem. I am trying to run a simple keras MLP model in asynchronous mode and its gives a traceback similar to yours, but there is another message about Spark Context too:
I tried to set this option but it doesn't fix it.
|
HI, Just to follow up, you got any success with async mode? |
@iam-armanahmed @GiuliaBarone that urllib error indicates the address can't be found (i.e. it's unlikely you have the right connection string, but no access rights). Possible reasons are:
The HTTP parameter server is started and determines and sets the master url here: elephas/elephas/parameter/server.py Line 81 in e0105e8
The respective clients get initialized by the same method call: elephas/elephas/parameter/client.py Line 49 in 9eb3252
Now, I don't know how you set up your experiments / how your cluster looks like. What you can help me (and you) with is figuring out what the connection strings are. Maybe the master url is off? a port mismatch? If you can give me some feedback on what you see, we can figure this out. |
e.g. print |
I ran into such issues when running code in Jupyter notebook. The issue had to do with the way flask server was being started within an ipython session. I will send a PR today and hopefully that solves your issues @iam-armanahmed (if you're running in Jupyter) |
@jomivega can you give more specifics? I'd be curious to hear what went wrong. In general I'd just like to state here again that elephas wraps Flask in the most boring way possible. So if asynchronous mode does "not work" it will in most cases have to do with setup. (app not starting, app down, app not reachable, no authentication or access rights etc.) |
Disclaimer: On Mobile Device atm @maxpumperla https://stackoverflow.com/questions/41831929/debug-flask-server-inside-jupyter-notebook And https://stackoverflow.com/questions/52457582/flask-application-inside-jupyter-notebook Basically, has to do with using debug as true and use_reloader as true when starting flask server. My fix is detecting if running in ipython session and setting the correct parameters. |
@maxpumperla can you point out the paper that is used to implement the synchronous mode in elephas ? |
@iam-armanahmed async mode is modeled after
synchronous is just the implementation that's closest to that, that works with what Spark provides out of the box (simple map-reduce). maybe there's a paper for this, too. didn't check. |
I solved this issue. |
@jovigb can you please open a pull request? |
I am trying to run keras autoencoder model in asyncronous mode and its giving following error
------------------------ CODE --------------------------------------------
Create Spark Model
spark_model = SparkModel(model, mode='asynchronous', frequency='epoch')
Fit the Spark model
start= time.time()
spark_model.fit(train_rdd, epochs=hyperparameters['epochs'], batch_size=1, verbose=2, validation_split=0.1)
print('Time Taken : ', time.time() - start)
---------------------------------------Error -------------------------------------
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
Py4JJavaError Traceback (most recent call last)
in ()
4 # Fit the Spark model
5 start= time.time()
----> 6 spark_model.fit(train_rdd, epochs=hyperparameters['epochs'], batch_size=1, verbose=2, validation_split=0.1)
7 print('Time Taken : ', time.time() - start)
~/anaconda3/envs/thesis/lib/python3.5/site-packages/elephas/spark_model.py in fit(self, rdd, epochs, batch_size, verbose, validation_split)
143
144 if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 145 self._fit(rdd, epochs, batch_size, verbose, validation_split)
146 else:
147 raise ValueError("Choose from one of the modes: asynchronous, synchronous or hogwild")
~/anaconda3/envs/thesis/lib/python3.5/site-packages/elephas/spark_model.py in _fit(self, rdd, epochs, batch_size, verbose, validation_split)
169 if self.mode in ['asynchronous', 'hogwild']:
170 worker = AsynchronousSparkWorker(yaml, parameters, mode, train_config, freq, optimizer, loss, metrics, custom)
--> 171 rdd.mapPartitions(worker.train).collect()
172 new_parameters = self.client.get_parameters()
173 elif self.mode == 'synchronous':
~/anaconda3/envs/thesis/lib/python3.5/site-packages/pyspark/rdd.py in collect(self)
832 """
833 with SCCallSiteSync(self.context) as css:
--> 834 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
835 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
836
~/anaconda3/envs/thesis/lib/python3.5/site-packages/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~/anaconda3/envs/thesis/lib/python3.5/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 1254, in do_open
h.request(req.get_method(), req.selector, req.data, headers)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 1107, in request
self._send_request(method, url, body, headers)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 1152, in _send_request
self.endheaders(body)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 1103, in endheaders
self._send_output(message_body)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 934, in _send_output
self.send(msg)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 877, in send
self.connect()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 849, in connect
(self.host,self.port), self.timeout, self.source_address)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/socket.py", line 712, in create_connection
raise err
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/socket.py", line 703, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/elephas/worker.py", line 98, in train
weights_before_training = self.client.get_parameters()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/elephas/parameter/client.py", line 55, in get_parameters
pickled_weights = urllib2.urlopen(request).read()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 163, in urlopen
return opener.open(url, data, timeout)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 466, in open
response = self._open(req, data)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 484, in _open
'_open', req)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 444, in _call_chain
result = func(*args)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 1282, in http_open
return self.do_open(http.client.HTTPConnection, req)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 1256, in do_open
raise URLError(err)
urllib.error.URLError: <urlopen error [Errno 111] Connection refused>
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 1254, in do_open
h.request(req.get_method(), req.selector, req.data, headers)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 1107, in request
self._send_request(method, url, body, headers)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 1152, in _send_request
self.endheaders(body)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 1103, in endheaders
self._send_output(message_body)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 934, in _send_output
self.send(msg)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 877, in send
self.connect()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/http/client.py", line 849, in connect
(self.host,self.port), self.timeout, self.source_address)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/socket.py", line 712, in create_connection
raise err
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/socket.py", line 703, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/elephas/worker.py", line 98, in train
weights_before_training = self.client.get_parameters()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/site-packages/elephas/parameter/client.py", line 55, in get_parameters
pickled_weights = urllib2.urlopen(request).read()
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 163, in urlopen
return opener.open(url, data, timeout)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 466, in open
response = self._open(req, data)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 484, in _open
'_open', req)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 444, in _call_chain
result = func(*args)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 1282, in http_open
return self.do_open(http.client.HTTPConnection, req)
File "/home/dexter/anaconda3/envs/thesis/lib/python3.5/urllib/request.py", line 1256, in do_open
raise URLError(err)
urllib.error.URLError: <urlopen error [Errno 111] Connection refused>
The text was updated successfully, but these errors were encountered: