Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kill 1 rss which is being shuffle read/wirte, application will be re-run by yarn with new attempt id #48

Closed
wangyi2021 opened this issue Dec 27, 2021 · 3 comments

Comments

@wangyi2021
Copy link

wangyi2021 commented Dec 27, 2021

Test it by Terasort job.

If an application needs to run for 5 hours and the one RSS is abnormal halfway, it will waste a lot of time.

Four cases were tested,

  1. shuffle write & 1replicas: app re-run
  2. shuffle write & 2replicas: app re-run
  3. shuffle read & 1replicas: app re-run
  4. shuffle read & 2replicas: no problem

Shuffle read can be run normally by config 2 replicas. But for shuffle write, there is no way to avoid application re-run.
For shuffle write, is it possible to make driver re-registershuffle to get new reachable rss?

@colinmjj
Copy link
Collaborator

@wangyi2021 Currently, multiple replicas with LocalFile is not ready. We already did some refactor with shuffle read and multiple replicas should be supported next according to these refactor.

@wangyi2021
Copy link
Author

@wangyi2021 Currently, multiple replicas with LocalFile is not ready. We already did some refactor with shuffle read and multiple replicas should be supported next according to these refactor.

case 4 tested by following configuration:
spark.rss.data.replica = 2
spark.rss.storage.type = LOCALFILE
result: application no re-run, application exec time is same as no rss abnormal.

task 195 error log (shuffle fetch for partiton 94 data):

2021-12-27 11:35:09,568 | INFO  | [dispatcher-Executor] | Got assigned task 195 | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
2021-12-27 11:35:09,568 | INFO  | [Executor task launch worker for task 195] | Running task 94.0 in stage 1.0 (TID 195) | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
2021-12-27 11:35:09,581 | INFO  | [Executor task launch worker for task 195] | Get taskId cost 1 ms, and request expected blockIds from 100 tasks for shuffleId[0], partitionId[94] | org.apache.spark.shuffle.RssShuffleManager.getReader(RssShuffleManager.java:290)
2021-12-27 11:35:09,581 | WARN  | [Executor task launch worker for task 195] | Get shuffle result is failed from ShuffleServerInfo{id[xx.xx.xx.rss-abnormal], host[xx.xx.xx.rss-abnormal], port[19999]} for appId[application_id_20211227], shuffleId[0] | com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleResult(ShuffleWriteClientImpl.java:313)
2021-12-27 11:35:09,583 | INFO  | [Executor task launch worker for task 195] | Get shuffle blockId cost 2 ms, and get 150 blockIds for shuffleId[0], partitionId[94] | org.apache.spark.shuffle.RssShuffleManager.getReaderImpl(RssShuffleManager.java:353)
2021-12-27 11:35:09,615 | INFO  | [Executor task launch worker for task 195] | Shuffle read started:appId=application_id_20211227, shuffleId=0,taskId=195_0, partitions: [94, 95), maps: [0, 2147483647) | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:116)
2021-12-27 11:35:09,616 | INFO  | [Executor task launch worker for task 195] | Inserting aggregated records to sorter | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:137)
2021-12-27 11:35:09,616 | WARN  | [Executor task launch worker for task 195] | Failed to read shuffle data with ShuffleServerGrpcClient for host[xx.xx.xx.rss-abnormal], port[19999] | com.tencent.rss.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:69)
io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at com.tencent.rss.proto.ShuffleServerGrpc$ShuffleServerBlockingStub.getShuffleData(ShuffleServerGrpc.java:607)
	at com.tencent.rss.client.impl.grpc.ShuffleServerGrpcClient.getShuffleData(ShuffleServerGrpcClient.java:422)
	at com.tencent.rss.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:64)
	at com.tencent.rss.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:180)
	at com.tencent.rss.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:119)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:97)
	at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:211)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:200)
	at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:139)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:352)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:316)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:575)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1422)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:578)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /xx.xx.xx.rss-abnormal:19999
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
	at io.grpc.netty.shaded.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)
	at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:243)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

It will retry 3 times, then task succeed.

2021-12-27 11:35:10,075 | INFO  | [Executor task launch worker for task 195] | GetShuffleData for appId[application_id_20211227], shuffleId[0], partitionId[94] cost 1 ms | com.tencent.rss.client.impl.grpc.ShuffleServerGrpcClient.getShuffleData(ShuffleServerGrpcClient.java:425)
2021-12-27 11:35:10,075 | INFO  | [Executor task launch worker for task 195] | Metrics for shuffleId[0], partitionId[94], read data cost 129 ms, copy data cost 4 ms, crc check cost 2 ms | com.tencent.rss.client.impl.ShuffleReadClientImpl.logStatics(ShuffleReadClientImpl.java:218)
2021-12-27 11:35:10,076 | INFO  | [Executor task launch worker for task 195] | Fetch 0 bytes cost 137 ms and 3 ms to serialize, 87 ms to decompress with unCompressionLength[105941276] | org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:126)
2021-12-27 11:35:10,076 | INFO  | [Executor task launch worker for task 195] | Inserted aggregated records to sorter: millis:460 | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:140)
2021-12-27 11:35:19,555 | INFO  | [Executor task launch worker for task 195] | Finished task 94.0 in stage 1.0 (TID 195). 1364 bytes result sent to driver | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)

In rss-abnormal server log, there is no log about this reduce task for partition 94.
In backup rss server, log about partition 94:

[INFO] 2021-12-27 11:35:10,027 Grpc-454 ShuffleServerGrpcService getShuffleData - Successfully getShuffleData cost 4 ms for appId[application_id_20211227], shuffleId[0], partitionId[94] with 4682982 bytes and 19 blocks
[INFO] 2021-12-27 11:35:10,075 Grpc-462 ShuffleServerGrpcService getShuffleData - Successfully getShuffleData cost 0 ms for appId[application_id_20211227], shuffleId[0], partitionId[94] with 0 bytes and 0 blocks

@colinmjj
Copy link
Collaborator

@wangyi2021 For shuffle write phase, we did implementation to mark task as successful if write data to any shuffle server successfully, but it is reverted because a lot of situations should be considered to make sure data is corrected. We have some ideas on better replica support already, and I hope it can be available next month.

@jerqi jerqi closed this as completed Jan 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants