-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix offset bug in periodic commit #11027
Fix offset bug in periodic commit #11027
Conversation
Periodic commit used to commit BEFORE the specified number of CSV rows. For example USING PERIODIC COMMIT 100 would commit after 99 rows, then after 199 rows, then after 299 rows et.c. After this change it commits after 100 rows, then after 200 rows, then after 300 rows, et.c. Furthermore LOAD CSV WITH HEADERS counted the header row the same as a data row, meaning that USING PERIODIC COMMIT 100 would actually commit after 98 data rows, then after 298 data rows, et.c. After this change it will commit after 100 data rows, regardless of whether the CSV file has headers or not.
@@ -122,7 +122,7 @@ class PeriodicCommitAcceptanceTest extends ExecutionEngineFunSuite | |||
val (_, txCounts) = executeAndTrackTxCounts(queryText) | |||
|
|||
// then | |||
txCounts should equal(TxCounts(commits = 3, rollbacks = 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test used to commit after 9 rows, then after 19 rows, and then finally when the execution was over.
Now it commits after 10 rows, and then when the execution is over (after 20 rows).
updateCounter.resetIfPastLimit(batchRowCount)(commitAndRestartTx()) | ||
updateCounter += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is what makes PERIODIC COMMIT
commit after the specified number of rows have been processed, rather than just before the specified number of rows is fed into the pipeline.
if (uncommittedRows != 0) | ||
throw new IllegalStateException("Header offset must be accounted for at the beginning") | ||
uncommittedRows = -1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how we ensure that header rows are not counted the same way as data rows.
@@ -103,7 +103,7 @@ case class LoadCSVPipe(source: Pipe, | |||
implicit val s = state | |||
val url = getImportURL(urlExpression(context).asInstanceOf[String], state.query) | |||
|
|||
val iterator: Iterator[Array[String]] = state.resources.getCsvIterator(url, fieldTerminator) | |||
val iterator: Iterator[Array[String]] = state.resources.getCsvIterator(url, fieldTerminator, format match {case HasHeaders => true; case _ => false}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LoadCsvPeriodicCommitObserver
needs to know if it should discount a header row or not.
@@ -269,8 +269,7 @@ class ProfilerAcceptanceTest extends ExecutionEngineFunSuite with CreateTempFile | |||
val result = legacyProfile(query) | |||
|
|||
// then | |||
val expectedTxCount = 1 + // First tx used to compile the query | |||
10 // One per 10 rows of CSV file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment here was wrong. The extra one was not a transaction used by the planner. The planner had already executed on line 264. The extra transaction was because of the offset issue with PERIODIC COMMIT
.
I added a few comments to this PR in order to explain why these changes were made. |
Periodic commit used to commit BEFORE the specified number of CSV rows. For example
USING PERIODIC COMMIT 100
would commit after 99 rows, then after 199 rows, then after 299 rows et.c. After this change it commits after 100 rows, then after 200 rows, then after 300 rows, et.c.Furthermore
LOAD CSV WITH HEADERS
counted the header row the same as a data row, meaning thatUSING PERIODIC COMMIT 100
would actually commit after 98 data rows, then after 298 data rows, et.c. After this change it will commit after 100 data rows, regardless of whether the CSV file has headers or not.This PR commit targets 2.3 in order to get this fixed even for people who use
CYPHER 2.3 ...
in later versions of Neo4j.