Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用firestorm-0.4.0 运行spark3.1.1官方的JavaWordCount报如下错误,并且在yarn-client模式下driver端进程一直不退出 #124

Closed
sfwang218 opened this issue May 5, 2022 · 10 comments

Comments

@sfwang218
Copy link

java.io.StreamCorruptedException: invalid stream header: 74001673 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.createKVIterator(RssShuffleDataIterator.java:71) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:118) at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:125) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.scheduler.Task.run(Task.scala:134) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:535) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:545) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

@jerqi
Copy link
Collaborator

jerqi commented May 5, 2022

We had never seen this problem, Can you give us the configuration, environment, and driver stack?

@jerqi
Copy link
Collaborator

jerqi commented May 5, 2022

In our integration test, we test the wordcount with Spark 3.1 version.

@sfwang218
Copy link
Author

./bin/spark-submit --class org.apache.spark.examples.JavaWordCount
--master yarn
--deploy-mode client
--driver-memory 4g
--executor-memory 2g
--executor-cores 1
--queue root.offline.default
--conf spark.dynamicAllocation.enabled=false
--conf spark.sql.shuffle.partitions=1
--conf spark.default.parallelism=1
examples/jars/spark-examples*.jar
<hdfs文件路径>
这个是我提交任务的命令,麻烦你们看下能不能复现
我试了下如果加上这个参数 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 指定KryoSerializer序列化方式就能正常执行,默认应该是JavaSerializer,会报上面的错误

@sfwang218
Copy link
Author

We had never seen this problem, Can you give us the configuration, environment, and driver stack?

我的执行环境:
spark:3.1.1
firestorm:0.4.0

配置参数:
rss-client:
spark.shuffle.manager org.apache.spark.shuffle.DelegationRssShuffleManager
spark.rss.storage.type MEMORY_LOCALFILE
spark.shuffle.service.enabled false

shuffle-server:
rss.storage.type MEMORY_LOCALFILE

driver stack:
`Driver stacktrace:
22/05/06 12:11:53 INFO scheduler.DAGScheduler main: Job 0 failed: collect at JavaWordCount.java:53, took 9.802690 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) (tjtx162-32-201.58os.org executor 2): java.io.StreamCorruptedException: invalid stream header: 74001673
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.(ObjectInputStream.java:299)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:64)
at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:64)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.createKVIterator(RssShuffleDataIterator.java:71)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:118)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:125)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2254)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2202)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2441)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2383)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2372)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:53)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.StreamCorruptedException: invalid stream header: 74001673
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.(ObjectInputStream.java:299)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:64)
at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:64)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.createKVIterator(RssShuffleDataIterator.java:71)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:118)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:125)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)`

@jerqi
Copy link
Collaborator

jerqi commented May 6, 2022

./bin/spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue root.offline.default --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=1 --conf spark.default.parallelism=1 examples/jars/spark-examples*.jar <hdfs文件路径> 这个是我提交任务的命令,麻烦你们看下能不能复现 我试了下如果加上这个参数 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 指定KryoSerializer序列化方式就能正常执行,默认应该是JavaSerializer,会报上面的错误

KryoSerializer should be the default serializer. JavaSerializer don't work in origin Spark Shuffle System. JavaSerializer don't guarantee the relocation in my mind.

@jerqi
Copy link
Collaborator

jerqi commented May 6, 2022

We had never seen this problem, Can you give us the configuration, environment, and driver stack?

我的执行环境: spark:3.1.1 firestorm:0.4.0

配置参数: rss-client: spark.shuffle.manager org.apache.spark.shuffle.DelegationRssShuffleManager spark.rss.storage.type MEMORY_LOCALFILE spark.shuffle.service.enabled false

shuffle-server: rss.storage.type MEMORY_LOCALFILE

driver stack: `Driver stacktrace: 22/05/06 12:11:53 INFO scheduler.DAGScheduler main: Job 0 failed: collect at JavaWordCount.java:53, took 9.802690 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) (tjtx162-32-201.58os.org executor 2): java.io.StreamCorruptedException: invalid stream header: 74001673 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.createKVIterator(RssShuffleDataIterator.java:71) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:118) at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:125) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2254) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2203) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2202) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2202) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2441) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2383) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2372) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362) at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) at org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:53) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.StreamCorruptedException: invalid stream header: 74001673 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.createKVIterator(RssShuffleDataIterator.java:71) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:118) at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:125) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)`

You should use RssShuffleManager.

@sfwang218
Copy link
Author

./bin/spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue root.offline.default --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=1 --conf spark.default.parallelism=1 examples/jars/spark-examples*.jar <hdfs文件路径> 这个是我提交任务的命令,麻烦你们看下能不能复现 我试了下如果加上这个参数 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 指定KryoSerializer序列化方式就能正常执行,默认应该是JavaSerializer,会报上面的错误

KryoSerializer should be the default serializer. JavaSerializer don't work in origin Spark Shuffle System. JavaSerializer don't guarantee the relocation in my mind.

但是我用原生的shuffle,并且使用JavaSerializer,是能正常执行的,是使用rss就必须得使用KryoSerializer么?

另外 0.4.0版本不是有个 Access check的功能么,我要使用这个功能,所以配置了DelegationRssShuffleManager,这个应该没啥问题吧?是该功能还不稳定么?

@jerqi
Copy link
Collaborator

jerqi commented May 6, 2022

./bin/spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue root.offline.default --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=1 --conf spark.default.parallelism=1 examples/jars/spark-examples*.jar <hdfs文件路径> 这个是我提交任务的命令,麻烦你们看下能不能复现 我试了下如果加上这个参数 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 指定KryoSerializer序列化方式就能正常执行,默认应该是JavaSerializer,会报上面的错误

KryoSerializer should be the default serializer. JavaSerializer don't work in origin Spark Shuffle System. JavaSerializer don't guarantee the relocation in my mind.

但是我用原生的shuffle,并且使用JavaSerializer,是能正常执行的,是使用rss就必须得使用KryoSerializer么?

另外 0.4.0版本不是有个 Access check的功能么,我要使用这个功能,所以配置了DelegationRssShuffleManager,这个应该没啥问题吧?是该功能还不稳定么?

You'd better to use KrySerilaizer.If you use the Access Check, you can use DelegationRssShuffleManager. In our production environment, we already use that.

@jerqi
Copy link
Collaborator

jerqi commented May 6, 2022

The below is Spark‘ code. RSS need Seriliazer support relocation.
/**

  • :: Private ::
  • Returns true if this serializer supports relocation of its serialized objects and false
  • otherwise. This should return true if and only if reordering the bytes of serialized objects
  • in serialization stream output is equivalent to having re-ordered those elements prior to
  • serializing them. More specifically, the following should hold if a serializer supports
  • relocation:
  • {{{
  • serOut.open()
  • position = 0
  • serOut.write(obj1)
  • serOut.flush()
  • position = # of bytes written to stream so far
  • obj1Bytes = output[0:position-1]
  • serOut.write(obj2)
  • serOut.flush()
  • position2 = # of bytes written to stream so far
  • obj2Bytes = output[position:position2-1]
  • serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
  • }}}
  • In general, this property should hold for serializers that are stateless and that do not
  • write special metadata at the beginning or end of the serialization stream.
  • This API is private to Spark; this method should not be overridden in third-party subclasses
  • or called in user code and is subject to removal in future Spark releases.
  • See SPARK-7311 for more details.
    */
    @Private
    private[spark] def supportsRelocationOfSerializedObjects: Boolean = false

@sfwang218
Copy link
Author

OK,Thanks

@jerqi jerqi closed this as completed May 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants