Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle read does not read all data completely? #175

Closed
xunxunmimi5577 opened this issue Jun 14, 2022 · 31 comments
Closed

Shuffle read does not read all data completely? #175

xunxunmimi5577 opened this issue Jun 14, 2022 · 31 comments

Comments

@xunxunmimi5577
Copy link
Contributor

When running query64 of tpcds 10T data,Ifind a stage have shuffle wrote 1.3T of data,but I never find a stage which reads 1.3T of data accordingly.

@colinmjj
Copy link
Collaborator

Can you share the spark UI for stage IO?

@xunxunmimi5577
Copy link
Contributor Author

xunxunmimi5577 commented Jun 14, 2022

I can't upload pictures in my company

@xunxunmimi5577
Copy link
Contributor Author

Compared to the native Spark, Shuffle Write has the same amount of data, but Firestorm reads very little data during Shuffle Read. The label Task:Succeeded/Total in spark ui shows only one Task in Firestorm,but Spark shows 5000 tasks are successfully executed.

@colinmjj
Copy link
Collaborator

How about the result? Is it the same as the result with native Spark?
we passed result compare based on 1TB data, but haven't did this with 10TB data.

@xunxunmimi5577
Copy link
Contributor Author

Uploading IMG_20220615_092133.jpg…

@xunxunmimi5577
Copy link
Contributor Author

Uploading IMG_20220615_092112.jpg…

@xunxunmimi5577
Copy link
Contributor Author

How about the result? Is it the same as the result with native Spark? we passed result compare based on 1TB data, but haven't did this with 10TB data.

I need to confirm this, because we modified the SQL and did not collect the results

@xunxunmimi5577
Copy link
Contributor Author

Does Firestorm print partition lengths to MapStatus?

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

We record the length, aqe need the metrics.

@xunxunmimi5577
Copy link
Contributor Author

xunxunmimi5577 commented Jun 21, 2022 via email

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

Could you give me more detail information?

@xunxunmimi5577
Copy link
Contributor Author

Firestorm for spark2 does‘t support AQE?I saw that the implementation of the stop() method in RssShuffleWriter(Spark2) seems to fill the partitionLengthse with dummy value.

@xunxunmimi5577
Copy link
Contributor Author

xunxunmimi5577 commented Jun 21, 2022

However spark2 do support this configuration spark.sql.adaptive.enabled. If as mentioned above,then spark.sql.adaptive.enabled can't be set to true?

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

Firestorm for spark2 does‘t support AQE?I saw that the implementation of the stop() method in RssShuffleWriter(Spark2) seems to fill the partitionLengthse with dummy value.

spark2 don't support AQE.

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

The open source Spark2 don't support AQE, too.

@xunxunmimi5577
Copy link
Contributor Author

Firestorm for spark2 does‘t support AQE?I saw that the implementation of the stop() method in RssShuffleWriter(Spark2) seems to fill the partitionLengthse with dummy value.

spark2 don't support AQE.

But if I set spark.sql.adaptive.enabled=true,I will get the wrong result.

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

https://spark.apache.org/releases/spark-release-3-0-0.html
AQE is the Spark 3.0's feature.

@xunxunmimi5577
Copy link
Contributor Author

As far as I know, spark2 can also use configuration spark.sql.adaptive.enabled.

@xunxunmimi5577
Copy link
Contributor Author

Then ExchangeCoordinator.doEstimationIfNecessary() method will need mapOutputStatistics to determine the number of post-shuffle partitions.

@colinmjj
Copy link
Collaborator

@xunxunmimi5577 For RSS + Spark2, AQE is not supported with current implementation. This feature was announced in Spark3, so there is no plan to support AQE with Spark2.

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

It's not available feature in Spark2. Maybe some configurations were added first , but the implement isn't complete.

@xunxunmimi5577
Copy link
Contributor Author

If I use spark2 + firestorm + spark.sql.adaptive.enabled=true, partitionStartIndices from ExchangeCoordinator will be [0,200) instead of [0,1),[1,2),... ,then shuffleReader will only read partition 0,this is the phenomenon described in my issue,there were supposed to be 200 tasks to execute, but only one was executed.
I think users should at least be prompted of this.

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

If I use spark2 + firestorm + spark.sql.adaptive.enabled=true, partitionStartIndices from ExchangeCoordinator will be [0,200) instead of [0,1),[1,2),... ,then shuffleReader will only read partition 0,this is the phenomenon described in my issue,there were supposed to be 200 tasks to execute, but only one was executed. I think users should at least be prompted of this.

OK, We can check whether ADAPTIVE_EXECUTION_ENABLED is enabled in RssShuffleManager. If true, we can throw an illegal argument exception. Would you like to contribute it?

@colinmjj
Copy link
Collaborator

@xunxunmimi5577 thanks for report this, I think it should be described in readme for such unsupported case.

@xunxunmimi5577
Copy link
Contributor Author

If I use spark2 + firestorm + spark.sql.adaptive.enabled=true, partitionStartIndices from ExchangeCoordinator will be [0,200) instead of [0,1),[1,2),... ,then shuffleReader will only read partition 0,this is the phenomenon described in my issue,there were supposed to be 200 tasks to execute, but only one was executed. I think users should at least be prompted of this.

OK, We can check whether ADAPTIVE_EXECUTION_ENABLED is enabled in RssShuffleManager. If true, we can throw an illegal argument exception. Would you like to contribute it?

I would like to, or maybe you just want to describe it in readme?

@xunxunmimi5577
Copy link
Contributor Author

Moreover, is it possible to record an array of partitionLengths like Spark3?

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

If I use spark2 + firestorm + spark.sql.adaptive.enabled=true, partitionStartIndices from ExchangeCoordinator will be [0,200) instead of [0,1),[1,2),... ,then shuffleReader will only read partition 0,this is the phenomenon described in my issue,there were supposed to be 200 tasks to execute, but only one was executed. I think users should at least be prompted of this.

OK, We can check whether ADAPTIVE_EXECUTION_ENABLED is enabled in RssShuffleManager. If true, we can throw an illegal argument exception. Would you like to contribute it?

I would like to, or maybe you just want to describe it in readme?

Actually, we want to do two things. We want to add the parameter check in code. And we also want to increase document description.

@jerqi
Copy link
Collaborator

jerqi commented Jun 21, 2022

Moreover, is it possible to record an array of partitionLengths like Spark3?

It's not available Feature in Spark 2. We wouldn't do it.

@xunxunmimi5577
Copy link
Contributor Author

OK

@jerqi
Copy link
Collaborator

jerqi commented Jun 30, 2022

Could I close this issue? Is it solved?

@xunxunmimi5577
Copy link
Contributor Author

I think it's solved.Let me close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants