Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. #6873

Closed
fatenlouati opened this issue Dec 5, 2022 · 22 comments
Assignees

Comments

@fatenlouati
Copy link

Hello,
Im using docker container built by this BigDL image.
when I tried to collect the predictions using collect() this error occurs:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

this is the code:

def retrain(self, batch_size):    
        minibatch =random.sample(self.experience_replay, batch_size)
        for state, action, reward, next_state in minibatch:
            state = np.asmatrix(state)
            next_state = np.asmatrix(next_state)
            print('state type',state)
            print('next state type',next_state)
            target = self.q_network.predict(state)
            p= target.collect()          
            tt = self.target_network.predict(next_state)
            t=tt.collect()
            p[0][action] = reward+self.gamma * np.amax(t)           
            self.q_network.fit(state, p, verbose=0)
        self.dqn_update_time-=1
        print("***********",self.dqn_update_time,"************ ")
        if self.dqn_update_time==0: 
          self.dqn_update_time=100 #dqn_time
          self.alighn_target_model()
          print('model updated')

this is the error:

/tmp/ipykernel_1032/2958540146.py in retrain(self, batch_size)
     71             print('next state type',next_state)
     72             target = self.q_network.predict(state)
---> 73             p= target.collect()
     74 
     75             tt = self.target_network.predict(next_state)

/opt/work/spark-3.1.2/python/lib/pyspark.zip/pyspark/rdd.py in collect(self)
    947         """
    948         with SCCallSiteSync(self.context) as css:
--> 949             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    950         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    951 

/usr/local/envs/bigdl/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/opt/work/spark-3.1.2/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

/usr/local/envs/bigdl/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

could anyone explain why this error occured and how fix it

@qiuxin2012
Copy link
Contributor

Could you give a simple example to reproduce your error? I don't know what the variables' type is in your code. @ToutaF

@fatenlouati
Copy link
Author

@qiuxin2012 ok. First of all thank you for your reply.
the code was ran perfectly on my pc, the problem occured when I used the container BigDL. I believe the problem is that Bigdl uses rdds in predict(), that is why I used collect() however the error remains the same.
here, in the code: the variable state is a sample of a dataset.
the variables network and target network are two neural networks:

        # Build networks
        self.q_network = self._build_compile_model()
        self.target_network = self._build_compile_model()
        self.alighn_target_model()
        print ('agent built')

    def store(self, state, action, reward, next_state):
        self.experience_replay.append((state, action, reward, next_state))
    
    def _build_compile_model(self):
        hidden_size=100
        col=self.environment.state_size
        model = Sequential()
        # Add imput layer
        model.add(Dense(hidden_size, input_dim=col,name='input'))
        
        # Add hidden layers
        for layers in range(3):
            model.add(Dense(hidden_size, activation='relu'))
        # Add output layer    
        model.add(Dense(self._action_size, activation='softmax',name='output'))
        model.compile(loss='categorical_crossentropy', optimizer=self._optimizer)
        return model


@qiuxin2012
Copy link
Contributor

qiuxin2012 commented Dec 7, 2022

Is model = Sequential() bigdl.dllib.keras.Sequential?
It will predict the dataset in distributed mode by default, https://github.com/intel-analytics/BigDL/blob/47988fc6da4ce3a0f1eb2d3781ae312b06d88e2b/python/dllib/src/bigdl/dllib/keras/engine/topology.py#L381
So there maybe same problem in your SparkContext, can you try following test case(one of our unit test) in the docker container:

from bigdl.dllib.nncontext import *
from bigdl.dllib.keras.layers import *
from bigdl.dllib.keras.models import *
sc = init_nncontext()

input = Input(shape=(32, ))
dense1 = Dense(10)(input)
dense2 = Dense(12)(input)
model = Model(input, [dense1, dense2])
data = np.random.random([10, 32])
result = model.predict(data).collect()

In my docker container, it works fine.

And I need more information to locate the problem:

  1. How do you init your nncontext?
  2. The error logs under Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
  3. The log info upon /tmp/ipykernel_1032/2958540146.py in retrain(self, batch_size)

@fatenlouati
Copy link
Author

fatenlouati commented Dec 7, 2022

@qiuxin2012 ok sir,
nn context:

sc = init_nncontext("Anomaly Detection Example")
spark = SparkSession(sc)

state:
state = train_env.reset()
reset():

def reset(self):
        self.i=np.random.randint(0,self.data.shape[0])
        self.obs=self.data.iloc[self.i]
        return self.obs.values

the Java stack trace :

[Stage 0:>                                                          (0 + 7) / 8]
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_190/3129113790.py in <module>
     65 
     66       if len(agent.experience_replay) > batch_size:
---> 67           agent.retrain(batch_size)
     68   agent.epsilon = max(.05, agent.epsilon*decay)
     69   epsilon=agent.epsilon

/tmp/ipykernel_190/3663109806.py in retrain(self, batch_size)
     67             print('next state type',next_state)
     68             target = self.q_network.predict(state)
---> 69             p= target.collect()
     70             tt = self.target_network.predict(next_state)
     71             t=tt.collect()

/opt/work/spark-3.1.2/python/lib/pyspark.zip/pyspark/rdd.py in collect(self)
    947         """
    948         with SCCallSiteSync(self.context) as css:
--> 949             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    950         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    951 

/usr/local/envs/bigdl/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/opt/work/spark-3.1.2/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

/usr/local/envs/bigdl/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7) (faten-VivoBook-ASUSLaptop-X509JB-X509JB.router executor driver): com.intel.analytics.bigdl.dllib.utils.InvalidOperationException: Linear: 
 The input to the layer needs to be a vector(or a mini-batch of vectors);
 please use the Reshape module to convert multi-dimensional input into vectors
 if appropriate"
    input dim 3
    at com.intel.analytics.bigdl.dllib.utils.Log4Error$.invalidOperationError(Log4Error.scala:38)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:291)
    at com.intel.analytics.bigdl.dllib.keras.Predictor$.$anonfun$predict$3(Predictor.scala:189)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
Caused by: com.intel.analytics.bigdl.dllib.utils.InvalidOperationException: Linear: 
 The input to the layer needs to be a vector(or a mini-batch of vectors);
 please use the Reshape module to convert multi-dimensional input into vectors
 if appropriate"
    input dim 3
    at com.intel.analytics.bigdl.dllib.utils.Log4Error$.invalidOperationError(Log4Error.scala:38)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:288)
    at com.intel.analytics.bigdl.dllib.nn.Sequential.updateOutput(Sequential.scala:39)
    at com.intel.analytics.bigdl.dllib.nn.internal.KerasLayer.updateOutput(KerasLayer.scala:275)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:285)
    ... 13 more
Caused by: java.lang.IllegalArgumentException: Linear: 
 The input to the layer needs to be a vector(or a mini-batch of vectors);
 please use the Reshape module to convert multi-dimensional input into vectors
 if appropriate"
    input dim 3
    at com.intel.analytics.bigdl.dllib.utils.Log4Error$.invalidInputError(Log4Error.scala:28)
    at com.intel.analytics.bigdl.dllib.nn.Linear.updateOutput(Linear.scala:85)
    at com.intel.analytics.bigdl.dllib.nn.Linear.updateOutput(Linear.scala:44)
    at com.intel.analytics.bigdl.dllib.nn.internal.KerasLayer.updateOutput(KerasLayer.scala:275)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:285)
    ... 16 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.intel.analytics.bigdl.dllib.utils.InvalidOperationException: Linear: 
 The input to the layer needs to be a vector(or a mini-batch of vectors);
 please use the Reshape module to convert multi-dimensional input into vectors
 if appropriate"
    input dim 3
    at com.intel.analytics.bigdl.dllib.utils.Log4Error$.invalidOperationError(Log4Error.scala:38)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:291)
    at com.intel.analytics.bigdl.dllib.keras.Predictor$.$anonfun$predict$3(Predictor.scala:189)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
Caused by: com.intel.analytics.bigdl.dllib.utils.InvalidOperationException: Linear: 
 The input to the layer needs to be a vector(or a mini-batch of vectors);
 please use the Reshape module to convert multi-dimensional input into vectors
 if appropriate"
    input dim 3
    at com.intel.analytics.bigdl.dllib.utils.Log4Error$.invalidOperationError(Log4Error.scala:38)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:288)
    at com.intel.analytics.bigdl.dllib.nn.Sequential.updateOutput(Sequential.scala:39)
    at com.intel.analytics.bigdl.dllib.nn.internal.KerasLayer.updateOutput(KerasLayer.scala:275)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:285)
    ... 13 more
Caused by: java.lang.IllegalArgumentException: Linear: 
 The input to the layer needs to be a vector(or a mini-batch of vectors);
 please use the Reshape module to convert multi-dimensional input into vectors
 if appropriate"
    input dim 3
    at com.intel.analytics.bigdl.dllib.utils.Log4Error$.invalidInputError(Log4Error.scala:28)
    at com.intel.analytics.bigdl.dllib.nn.Linear.updateOutput(Linear.scala:85)
    at com.intel.analytics.bigdl.dllib.nn.Linear.updateOutput(Linear.scala:44)
    at com.intel.analytics.bigdl.dllib.nn.internal.KerasLayer.updateOutput(KerasLayer.scala:275)
    at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:285)
    ... 16 more

@qiuxin2012
Copy link
Contributor

It seems the input state doesn't match the first Dense model.add(Dense(hidden_size, input_dim=col,name='input')).
Is your state a 3D tesnor? You can try to adjust the input_dim or input_shape to match them according to https://github.com/intel-analytics/BigDL/blob/a534b983a807478a71217597204f1156f5f1235c/python/dllib/src/bigdl/dllib/keras/layers/core.py#L319
Here is a simple unit test for reference, https://github.com/intel-analytics/BigDL/blob/dcd0e0be001a5f9ed81a192ac5ccb61f639307b6/python/dllib/test/bigdl/keras/test_simple_integration.py#L62

@fatenlouati
Copy link
Author

fatenlouati commented Dec 8, 2022

@qiuxin2012 Thank you for your responses
state is 2D tensor sate.shape=(1,94)

# input layer
model.add(Dense(hidden_size,input_shape=(col,),name='input')) # col =94

However, in addition to this error I encountered another one when I restarted the container and I tried to run nn context:
Exception: Java gateway process exited before sending its port number

@qiuxin2012
Copy link
Contributor

qiuxin2012 commented Dec 9, 2022

For the

Caused by: java.lang.IllegalArgumentException: Linear: 
 The input to the layer needs to be a vector(or a mini-batch of vectors);
 please use the Reshape module to convert multi-dimensional input into vectors
 if appropriate"
    input dim 3

I find it's caused by np.asmatrix(next_state), this asmatrix make state to np.matrix type, but this np.matrix will cause some error when convert it to a RDD.
There is two work around,

  1. Use local predict, add distributed=False to predict.
  2. Before you predict state, use resize to convert state to np.ndarray, state = np.resize(state, state.shape)

@qiuxin2012
Copy link
Contributor

For Exception: Java gateway process exited before sending its port number
I don't know how to reproduce this error, but I find a similar error in #3130.
Maybe something wrong in your container, you can try to create a new one.

@fatenlouati
Copy link
Author

fatenlouati commented Dec 9, 2022

@qiuxin2012 indeed I removed the container even the image then pull it again and I created a new container but the error remains the same.
Do you use this image:intelanalytics/bigdl:2.1.0to create your container?

@qiuxin2012
Copy link
Contributor

Yes, it's intelanalytics/bigdl:2.1.0

(base) xin@xin-dev:~$ sudo docker ps -a
CONTAINER ID   IMAGE                        COMMAND                  CREATED         STATUS                       PORTS     NAMES
d75aed7a49ee   intelanalytics/bigdl:2.1.0   "/opt/work/start-not…"   4 days ago      Up 4 days                              nervous_villani

Have you changed any environment variables? Could you give me the steps from creating container(included)? @fatenlouati

@fatenlouati
Copy link
Author

No I changed nothing.
I pulled the image docker pull intelanalytics/bigdl:2.1.0
then i ran this sudo docker run -it --name bigDL --hostname bigDL --net=host -e NotebookPort=12345 intelanalytics/bigdl:2.1.0

@Le-Zheng
Copy link
Contributor

Hi, I verified that creating container with sudo docker run -itd --net=host intelanalytics/bigdl:2.1.0 bash.
And running init_nncontext() in the container works well.
image

@fatenlouati
Copy link
Author

@Le-Zheng Thank you for your reply. This error was fixed.
However the main issue is not fixed yet. where I follow the answer of @qiuxin2012 using:

state = np.resize(state, state.shape)
next_state = np.resize(next_state, next_state.shape)
target = self.q_network.predict(state, distributed=False)
t = self.target_network.predict(next_state, distributed=False)

I got this error:

/tmp/ipykernel_163/3024936701.py in retrain(self, batch_size)
---> 73             target = self.q_network.predict(state, distributed=False)
     74             t = self.target_network.predict(next_state, distributed=False)

/opt/work/bigdl-2.1.0/python/bigdl-spark_3.1.2-2.1.0-python-api.zip/bigdl/dllib/keras/engine/topology.py in predict(self, x, batch_per_thread, distributed, feature_cols, prediction_col, transform)
    438                                       self.value,
    439                                       self._to_jtensors(x),
--> 440                                       batch_per_thread)
    441                 return [Layer.convert_output(result) for result in results]
    442             else:

/opt/work/bigdl-2.1.0/python/bigdl-spark_3.1.2-2.1.0-python-api.zip/bigdl/dllib/utils/file_utils.py in callZooFunc(bigdl_type, name, *args)
    262             if not ("does not exist" in str(e)
    263                     and "Method {}".format(name) in str(e)):
--> 264                 invalidOperationError(False, str(e), cause=e)
    265         else:
    266             return result

/opt/work/bigdl-2.1.0/python/bigdl-spark_3.1.2-2.1.0-python-api.zip/bigdl/dllib/utils/log4Error.py in invalidOperationError(condition, errMsg, fixMsg, cause)
     38         outputUserMessage(errMsg, fixMsg)
     39         if cause:
---> 40             raise cause
     41         else:
     42             raise RuntimeError(errMsg)

/opt/work/bigdl-2.1.0/python/bigdl-spark_3.1.2-2.1.0-python-api.zip/bigdl/dllib/utils/file_utils.py in callZooFunc(bigdl_type, name, *args)
    256         try:
    257             api = getattr(jinvoker, name)
--> 258             java_result = api(*args)
    259             result = _java2py(gateway, java_result)
    260         except Exception as e:

/usr/local/envs/bigdl/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/envs/bigdl/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o45.zooPredict.
: com.intel.analytics.bigdl.dllib.utils.UnKnownException: Sequential[2de6b34b]
Dense[input]
mat.size(2) 94 should match vec.size(1) 4
	at com.intel.analytics.bigdl.dllib.utils.Log4Error$.unKnowExceptionError(Log4Error.scala:60)
	at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:295)
	at com.intel.analytics.bigdl.dllib.optim.Predictor$.$anonfun$predictSamples$1(Predictor.scala:71)
	at com.intel.analytics.bigdl.dllib.optim.Predictor$.$anonfun$predictSamples$1$adapted(Predictor.scala:70)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:662)
	at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:639)
	at scala.collection.generic.GenericTraversableTemplate.$anonfun$flatten$1(GenericTraversableTemplate.scala:174)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.generic.GenericTraversableTemplate.flatten(GenericTraversableTemplate.scala:173)
	at scala.collection.generic.GenericTraversableTemplate.flatten$(GenericTraversableTemplate.scala:171)
	at scala.collection.AbstractTraversable.flatten(Traversable.scala:108)
	at com.intel.analytics.bigdl.dllib.optim.LocalPredictor.$anonfun$predict$6(LocalPredictor.scala:150)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.TraversableOnce$FlattenOps$$anon$2.hasNext(TraversableOnce.scala:469)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at scala.collection.AbstractIterator.to(Iterator.scala:1429)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
	at com.intel.analytics.bigdl.dllib.optim.LocalPredictor.predict(LocalPredictor.scala:151)
	at com.intel.analytics.bigdl.dllib.common.PythonZoo.zooPredict(PythonZoo.scala:225)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.intel.analytics.bigdl.dllib.utils.UnKnownException: Dense[input]
mat.size(2) 94 should match vec.size(1) 4
	at com.intel.analytics.bigdl.dllib.utils.Log4Error$.unKnowExceptionError(Log4Error.scala:60)
	at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:295)
	at com.intel.analytics.bigdl.dllib.nn.Sequential.updateOutput(Sequential.scala:39)
	at com.intel.analytics.bigdl.dllib.nn.internal.KerasLayer.updateOutput(KerasLayer.scala:275)
	at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:285)
	... 54 more
Caused by: com.intel.analytics.bigdl.dllib.utils.UnKnownException: mat.size(2) 94 should match vec.size(1) 4
	at com.intel.analytics.bigdl.dllib.utils.Log4Error$.unKnowExceptionError(Log4Error.scala:60)
	at com.intel.analytics.bigdl.dllib.tensor.DenseTensorMath$.addmv(DenseTensorMath.scala:702)
	at com.intel.analytics.bigdl.dllib.tensor.DenseTensor.addmv(DenseTensor.scala:1436)
	at com.intel.analytics.bigdl.dllib.nn.Linear.updateOutput(Linear.scala:93)
	at com.intel.analytics.bigdl.dllib.nn.Linear.updateOutput(Linear.scala:44)
	at com.intel.analytics.bigdl.dllib.nn.internal.KerasLayer.updateOutput(KerasLayer.scala:275)
	at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:285)
	... 57 more

How to fix it please? where is the problem exactly. The code worked fine outside of bigdl :(

@qiuxin2012
Copy link
Contributor

qiuxin2012 commented Dec 14, 2022

It's caused by an error input size.
Do you remove the asmatrix? Please don't remove them.

state = np.asmatrix(state)
next_state = np.asmatrix(next_state)

Your state is a 1D vector, our predict API assume your input is a batch of input. Asmatrix will help you to resize your input as a batch-like input(to 2D, [1, 94]).
So you loop should be something like this:

for state, action, reward, next_state in minibatch:
            state = np.asmatrix(state)
            state = np.resize(state, state.shape)
            next_state = np.asmatrix(next_state)
            next_state = np.resize(next_state, next_state.shape)
            p = self.q_network.predict(state, distributed=False)     
            t = self.target_network.predict(next_state, distributed=False)
            p[0][action] = reward+self.gamma * np.amax(t)           
            self.q_network.fit(state, p, verbose=0)

@jason-dai
Copy link
Contributor

Can we add more check and better error message?

@fatenlouati
Copy link
Author

fatenlouati commented Dec 14, 2022

@qiuxin2012 Thank you for your responses.
unfortunatly I got another error:

/tmp/ipykernel_26/244920096.py in retrain(self, batch_size)
     82             target[0][action] = reward+self.gamma * np.amax(t)
     83 
---> 84             self.q_network.fit(state, target)
     85 
     86 

/opt/work/bigdl-2.1.0/python/bigdl-spark_3.1.2-2.1.0-python-api.zip/bigdl/dllib/keras/engine/topology.py in fit(self, x, y, batch_size, nb_epoch, validation_split, validation_data, distributed, feature_cols, label_cols, transform)
    289                     return
    290             else:
--> 291                 invalidInputError(False, "Unsupported training data type: %s" % type(x))
    292             callZooFunc(self.bigdl_type, "zooFit",
    293                         self.value,

/opt/work/bigdl-2.1.0/python/bigdl-spark_3.1.2-2.1.0-python-api.zip/bigdl/dllib/utils/log4Error.py in invalidInputError(condition, errMsg, fixMsg)
     31     if not condition:
     32         outputUserMessage(errMsg, fixMsg)
---> 33         raise RuntimeError(errMsg)
     34 
     35 

RuntimeError: Unsupported training data type: <class 'numpy.ndarray'>


​according to this issue, it may be caused by a compatibility issue between Spark and Java.
How to fix please??

@qiuxin2012
Copy link
Contributor

qiuxin2012 commented Dec 15, 2022

The error means your state is not a numpy.ndarray, but state should be a numpy.array, we just used it in predict.
So I check the code, and maybe it's caused by the target's type, the target type is not a numpy.array. Please check it's type and, convert target to a nparray.

Does your q_network.fit means train a single batch? Please add batch_size=1, nb_epoch=1 to your fit.
And your batchsize is 1, dllib require batch_size > cores, but init_nncontext will use all cores by default, so you should change your init_nncontext("Anomaly Detection Example") to init_spark_on_local(cores=1) like below:

from bigdl.dllib.nncontext import *
sc = init_spark_on_local(cores=1)

@fatenlouati
Copy link
Author

fatenlouati commented Dec 15, 2022

@qiuxin2012 I followed your instructions,

from bigdl.dllib.nncontext import *
sc = init_spark_on_local(cores=1)

This is the updated loop

def retrain(self, batch_size,num_actions):
        minibatch =random.sample(self.experience_replay, batch_size)
        for state, action, reward, next_state in minibatch:
            state = np.asmatrix(state)
            state = np.resize(state, state.shape)
            next_state = np.asmatrix(next_state)
            next_state = np.resize(next_state, next_state.shape)
            target = self.q_network.predict(state, distributed=False)
            t = self.target_network.predict(next_state, distributed=False)
            target[0][action] = reward+self.gamma * np.amax(t)
            target = np.array(target)
            self.q_network.fit(state, target.reshape(-1, num_actions), nb_epoch=1, batch_size=1)

But I got an infinite loop of those errors (or warnings) :

22-12-15 23:40:35 INFO  DistriOptimizer$:830 - caching training rdd ...
                                                                                
22-12-15 23:40:36 INFO  DistriOptimizer$:655 - Cache thread models...
22-12-15 23:40:36 INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 1712
22-12-15 23:40:36 INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 1712
22-12-15 23:40:36 INFO  DistriOptimizer$:638 - model thread pool size is 1
22-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0weights0, which does not exist
22-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
2022-12-15 23:40:36 WARN  BlockManager:69 - Asked to remove block test_0gradients0, which does not exist
22-12-15 23:40:36 INFO  DistriOptimizer$:657 - Cache thread models... done
22-12-15 23:40:36 INFO  DistriOptimizer$:163 - Count dataset
22-12-15 23:40:36 INFO  DistriOptimizer$:167 - Count dataset complete. Time elapsed: 0.036979541s
22-12-15 23:40:36 INFO  DistriOptimizer$:175 - config  {
	computeThresholdbatchSize: 100
	maxDropPercentage: 0.0
	warmupIterationNum: 200
	isLayerwiseScaled: false
	dropPercentage: 0.0
 }
22-12-15 23:40:36 INFO  DistriOptimizer$:179 - Shuffle data
22-12-15 23:40:36 INFO  DistriOptimizer$:182 - Shuffle data complete. Takes 0.016121103s
22-12-15 23:40:36 WARN  DistriOptimizer$:239 - Warning: for better training speed, total batch size is recommended to be at least two times of core number1, please tune your batch size accordingly
22-12-15 23:40:37 INFO  DistriOptimizer$:432 - [Epoch 1 1/1][Iteration 1][Wall Clock 0.122008065s] Trained 1.0 records in 0.122008065 seconds. Throughput is 8.196179 records/second. Loss is 0.0. Sequentialc87d69f3's hyper parameters: Current learning rate is 0.01. Current dampening is 1.7976931348623157E308.  
22-12-15 23:40:37 INFO  DistriOptimizer$:474 - [Epoch 1 1/1][Iteration 1][Wall Clock 0.122008065s] Epoch finished. Wall clock time is 129.846862 ms

@qiuxin2012
Copy link
Contributor

The fit runs successfully on spark now, no errors.
22-12-15 23:40:37 INFO DistriOptimizer$:474 - [Epoch 1 1/1][Iteration 1][Wall Clock 0.122008065s] Epoch finished.
You can turnoff the spark warnings using sc = init_spark_on_local(cores=1, spark_log_level="ERROR")
I think it should be slower than your origin framework. As the data is small, spark is too heavy to deal with small dataset. Spark is good at very large scale dataset.

If you just want to Accelerate your tf or pytorch application, our Nano may be a better option. See the Nano in choosing-the-right-bigdl-library.

@fatenlouati
Copy link
Author

fatenlouati commented Dec 16, 2022

@qiuxin2012 I setted spark_log_level="ERROR" but this did not turnoff warning and info

@qiuxin2012
Copy link
Contributor

@fatenlouati I have opened another issue for it, spark_log_level="ERROR" should close the warning and info from spark, but the info from bigdl doesn't work.

@fatenlouati
Copy link
Author

@qiuxin2012 Thank you very much

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants