Skip to content

Commit

Permalink
Fix concurrent dagrequest issue (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
birdstorm committed Jun 4, 2019
1 parent 0f327f8 commit 2ae1a4a
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
Expand Up @@ -261,7 +261,7 @@ case class RegionTaskExec(child: SparkPlan,
taskCount += 1
val task = new Callable[util.Iterator[TiRow]] {
override def call(): util.Iterator[TiRow] =
CoprocessIterator.getRowIterator(dagRequest.copy(), tasks, session)
CoprocessIterator.getRowIterator(dagRequest, tasks, session)
}
completionService.submit(task)
}
Expand Down
Expand Up @@ -43,7 +43,7 @@ public static SchemaInfer create(TiDAGRequest dagRequest) {
}

public static SchemaInfer create(TiDAGRequest dagRequest, boolean readHandle) {
return new SchemaInfer(dagRequest, readHandle);
return new SchemaInfer(dagRequest.copy(), readHandle);
}

private SchemaInfer(TiDAGRequest dagRequest, boolean readHandle) {
Expand Down
Expand Up @@ -71,12 +71,13 @@ public abstract class CoprocessIterator<T> implements Iterator<T> {
*/
public static CoprocessIterator<Row> getRowIterator(
TiDAGRequest req, List<RegionTask> regionTasks, TiSession session) {
TiDAGRequest dagRequest = req.copy();
return new DAGIterator<Row>(
req.buildTableScan(),
dagRequest.buildTableScan(),
regionTasks,
session,
SchemaInfer.create(req),
req.getPushDownType()) {
SchemaInfer.create(dagRequest),
dagRequest.getPushDownType()) {
@Override
public Row next() {
if (hasNext()) {
Expand Down

0 comments on commit 2ae1a4a

Please sign in to comment.