Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config support for legacy quote escaping in LOAD CSV #8903

Merged
merged 2 commits into from Feb 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.cypher.acceptance

import java.io.PrintWriter

import org.neo4j.csv.reader.MissingEndQuoteException
import org.neo4j.cypher.internal.{ExecutionEngine, RewindableExecutionResult}
import org.neo4j.cypher.internal.compatibility.ExecutionResultWrapperFor3_0
import org.neo4j.cypher.internal.compiler.v3_0.test_helpers.CreateTempFileTestSupport
import org.neo4j.cypher.javacompat.internal.GraphDatabaseCypherService
import org.neo4j.cypher.{ExecutionEngineFunSuite, NewPlannerTestSupport, RunWithConfigTestSupport}
import org.neo4j.graphdb.factory.GraphDatabaseSettings

class LoadCsvWithQuotesAcceptanceTest extends ExecutionEngineFunSuite with NewPlannerTestSupport with RunWithConfigTestSupport with CreateTempFileTestSupport {
def csvUrls(f: PrintWriter => Unit) = Seq(
createCSVTempFileURL(f),
createGzipCSVTempFileURL(f),
createZipCSVTempFileURL(f)
)

test("import rows with messy quotes using legacy mode as default") {
runWithConfig() { db =>
val urls = csvUrls({
writer =>
writer.println("name,x")
writer.println("'Quotes 0',\"\"")
writer.println("'Quotes 1',\"\\\"\"")
writer.println("'Quotes 2',\"\"\"\"")
writer.println("'Quotes 3',\"\\\"\\\"\"")
writer.println("'Quotes 4',\"\"\"\"\"\"")
})
for (url <- urls) {
val result = executeUsingCostPlannerOnly(db, s"LOAD CSV WITH HEADERS FROM '$url' AS line RETURN line.x")
assert(result.toList === List(
Map("line.x" -> ""),
Map("line.x" -> "\""),
Map("line.x" -> "\""),
Map("line.x" -> "\"\""),
Map("line.x" -> "\"\"")
))
}
}
}

test("import rows with messy quotes using legacy mode") {
runWithConfig(GraphDatabaseSettings.csv_legacy_quote_escaping -> "true") { db =>
val urls = csvUrls({
writer =>
writer.println("name,x")
writer.println("'Quotes 0',\"\"")
writer.println("'Quotes 1',\"\\\"\"")
writer.println("'Quotes 2',\"\"\"\"")
writer.println("'Quotes 3',\"\\\"\\\"\"")
writer.println("'Quotes 4',\"\"\"\"\"\"")
})
for (url <- urls) {
val result = executeUsingCostPlannerOnly(db, s"LOAD CSV WITH HEADERS FROM '$url' AS line RETURN line.x")
assert(result.toList === List(
Map("line.x" -> ""),
Map("line.x" -> "\""),
Map("line.x" -> "\""),
Map("line.x" -> "\"\""),
Map("line.x" -> "\"\"")
))
}
}
}

test("import rows with messy quotes using rfc4180 mode") {
runWithConfig(GraphDatabaseSettings.csv_legacy_quote_escaping -> "false") { db =>
val urls = csvUrls({
writer =>
writer.println("name,x")
writer.println("'Quotes 0',\"\"")
writer.println("'Quotes 2',\"\"\"\"")
writer.println("'Quotes 4',\"\"\"\"\"\"")
writer.println("'Quotes 5',\"\\\"\"\"")
})
for (url <- urls) {
val result = executeUsingCostPlannerOnly(db, s"LOAD CSV WITH HEADERS FROM '$url' AS line RETURN line.x")
assert(result.toList === List(
Map("line.x" -> ""),
Map("line.x" -> "\""),
Map("line.x" -> "\"\""),
Map("line.x" -> "\\\"")
))
}
}
}

test("fail to import rows with java quotes when in rfc4180 mode") {
runWithConfig(GraphDatabaseSettings.csv_legacy_quote_escaping -> "false") { db =>
val urls = csvUrls({
writer =>
writer.println("name,x")
writer.println("'Quotes 0',\"\"")
writer.println("'Quotes 1',\"\\\"\"")
writer.println("'Quotes 2',\"\"\"\"")
})
for (url <- urls) {
intercept[MissingEndQuoteException] {
executeUsingCostPlannerOnly(db, s"LOAD CSV WITH HEADERS FROM '$url' AS line RETURN line.x")
}.getMessage should include("which started on line 2")
}
}
}

def executeUsingCostPlannerOnly(db: GraphDatabaseCypherService, query: String) =
new ExecutionEngine(db).execute(s"CYPHER planner=COST $query", Map.empty[String, Any], db.session()) match {
case e: ExecutionResultWrapperFor3_0 => RewindableExecutionResult(e)
}

}
Expand Up @@ -67,6 +67,7 @@ case class CypherCompilerConfiguration(queryCacheSize: Int,
idpIterationDuration: Long,
errorIfShortestPathFallbackUsedAtRuntime: Boolean,
errorIfShortestPathHasCommonNodesAtRuntime: Boolean,
legacyCsvQuoteEscaping: Boolean,
nonIndexedLabelWarningThreshold: Long)

object CypherCompilerFactory {
Expand Down
Expand Up @@ -31,8 +31,8 @@ class LoadCsvPeriodicCommitObserver(batchRowCount: Long, resources: ExternalCSVR
val updateCounter = new UpdateCounter
var outerLoadCSVIterator: Option[LoadCsvIterator] = None

def getCsvIterator(url: URL, fieldTerminator: Option[String] = None): Iterator[Array[String]] = {
val innerIterator = resources.getCsvIterator(url, fieldTerminator)
def getCsvIterator(url: URL, fieldTerminator: Option[String], legacyCsvQuoteEscaping: Boolean): Iterator[Array[String]] = {
val innerIterator = resources.getCsvIterator(url, fieldTerminator, legacyCsvQuoteEscaping)
if (outerLoadCSVIterator.isEmpty) {
val iterator = new LoadCsvIterator(url, innerIterator)(onNext())
outerLoadCSVIterator = Some(iterator)
Expand Down
Expand Up @@ -39,7 +39,8 @@ class LoadCSVBuilder extends PlanBuilder {
val item: LoadCSV = findLoadCSVItem(plan).get
plan.copy(
query = plan.query.copy(start = plan.query.start.replace(Unsolved(item), Solved(item))),
pipe = new LoadCSVPipe(plan.pipe, if (item.withHeaders) HasHeaders else NoHeaders, item.url, item.variable, item.fieldTerminator)()
pipe = new LoadCSVPipe(plan.pipe, if (item.withHeaders) HasHeaders else NoHeaders, item.url, item.variable,
item.fieldTerminator, legacyCsvQuoteEscaping = true)() // Rule planner supports only legacy quotes
)
}
}
Expand Up @@ -22,11 +22,11 @@ package org.neo4j.cypher.internal.compiler.v3_0.pipes
import java.net.URL

trait ExternalCSVResource {
def getCsvIterator(url: URL, fieldTerminator: Option[String] = None): Iterator[Array[String]]
def getCsvIterator(url: URL, fieldTerminator: Option[String], legacyCsvQuoteEscaping: Boolean): Iterator[Array[String]]
}

object ExternalCSVResource {
def empty: ExternalCSVResource = new ExternalCSVResource {
override def getCsvIterator(url: URL, fieldTerminator: Option[String]): Iterator[Array[String]] = Iterator.empty
override def getCsvIterator(url: URL, fieldTerminator: Option[String], legacyCsvQuoteEscaping: Boolean): Iterator[Array[String]] = Iterator.empty
}
}
Expand Up @@ -39,7 +39,8 @@ case class LoadCSVPipe(source: Pipe,
format: CSVFormat,
urlExpression: Expression,
variable: String,
fieldTerminator: Option[String])
fieldTerminator: Option[String],
legacyCsvQuoteEscaping: Boolean)
(val estimatedCardinality: Option[Double] = None)(implicit pipeMonitor: PipeMonitor)
extends PipeWithSource(source, pipeMonitor) with RonjaPipe {

Expand Down Expand Up @@ -105,7 +106,7 @@ case class LoadCSVPipe(source: Pipe,
val urlString: String = urlExpression(context).asInstanceOf[String]
val url = getImportURL(urlString, state.query)

val iterator: Iterator[Array[String]] = state.resources.getCsvIterator(url, fieldTerminator)
val iterator: Iterator[Array[String]] = state.resources.getCsvIterator(url, fieldTerminator, legacyCsvQuoteEscaping)
format match {
case HasHeaders =>
val headers = iterator.next().toSeq // First row is headers
Expand Down
Expand Up @@ -91,7 +91,8 @@ case class CostBasedExecutablePlanBuilder(monitors: Monitors,
queryGraphSolver, notificationLogger = notificationLogger, useErrorsOverWarnings = config.useErrorsOverWarnings,
errorIfShortestPathFallbackUsedAtRuntime = config.errorIfShortestPathFallbackUsedAtRuntime,
errorIfShortestPathHasCommonNodesAtRuntime = config.errorIfShortestPathHasCommonNodesAtRuntime,
config = QueryPlannerConfiguration.default.withUpdateStrategy(updateStrategy))
legacyCsvQuoteEscaping = config.legacyCsvQuoteEscaping,
config = QueryPlannerConfiguration.default.withUpdateStrategy(updateStrategy))

val (periodicCommit, plan) = queryPlanner.plan(unionQuery)(context)

Expand Down
Expand Up @@ -318,8 +318,8 @@ case class ActualPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe, r
val rowProcessing = ProcedureCallRowProcessing(signature)
ProcedureCallPipe(source, signature.name, callMode, callArgumentCommands, rowProcessing, call.callResultTypes, call.callResultIndices)()

case LoadCSVPlan(_, url, variableName, format, fieldTerminator) =>
LoadCSVPipe(source, format, toCommandExpression(url), variableName.name, fieldTerminator)()
case LoadCSVPlan(_, url, variableName, format, fieldTerminator, legacyCsvQuoteEscaping) =>
LoadCSVPipe(source, format, toCommandExpression(url), variableName.name, fieldTerminator, legacyCsvQuoteEscaping)()

case ProduceResult(columns, _) =>
ProduceResultsPipe(source, columns)()
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.cypher.internal.compiler.v3_0.planner.logical

import org.neo4j.csv.reader.Configuration.DEFAULT_LEGACY_STYLE_QUOTING
import org.neo4j.cypher.internal.compiler.v3_0.planner.logical.Metrics.QueryGraphSolverInput
import org.neo4j.cypher.internal.compiler.v3_0.planner.logical.plans.{IdName, LogicalPlan, StrictnessMode}
import org.neo4j.cypher.internal.compiler.v3_0.planner.logical.steps.LogicalPlanProducer
Expand All @@ -37,6 +38,7 @@ case class LogicalPlanningContext(planContext: PlanContext,
useErrorsOverWarnings: Boolean = false,
errorIfShortestPathFallbackUsedAtRuntime: Boolean = false,
errorIfShortestPathHasCommonNodesAtRuntime: Boolean = true,
legacyCsvQuoteEscaping: Boolean = DEFAULT_LEGACY_STYLE_QUOTING,
config: QueryPlannerConfiguration = QueryPlannerConfiguration.default,
leafPlanUpdater: LogicalPlan => LogicalPlan = identity) {
def withStrictness(strictness: StrictnessMode) = copy(input = input.withPreferredStrictness(strictness))
Expand Down
Expand Up @@ -27,7 +27,8 @@ case class LoadCSV(source: LogicalPlan,
url: Expression,
variableName: IdName,
format: CSVFormat,
fieldTerminator: Option[String])
fieldTerminator: Option[String],
legacyCsvQuoteEscaping: Boolean)
(val solved: PlannerQuery with CardinalityEstimation) extends LogicalPlan {

override def availableSymbols = source.availableSymbols + variableName
Expand Down
Expand Up @@ -35,7 +35,7 @@ case object cleanUpEager extends Rewriter {
unwind.copy(left = eager.copy(inner = source)(eager.solved))(eager.solved)

// E LCSV => LCSV E
case eager@Eager(loadCSV@LoadCSV(source, _, _, _, _)) =>
case eager@Eager(loadCSV@LoadCSV(source, _, _, _, _, _)) =>
loadCSV.copy(source = eager.copy(inner = source)(eager.solved))(eager.solved)
})

Expand Down
Expand Up @@ -413,7 +413,7 @@ case class LogicalPlanProducer(cardinalityModel: CardinalityModel) extends ListS
def planLoadCSV(inner: LogicalPlan, variableName: IdName, url: Expression, format: CSVFormat, fieldTerminator: Option[StringLiteral])
(implicit context: LogicalPlanningContext) = {
val solved = inner.solved.updateTailOrSelf(_.withHorizon(LoadCSVProjection(variableName, url, format, fieldTerminator)))
LoadCSVPlan(inner, url, variableName, format, fieldTerminator.map(_.value))(solved)
LoadCSVPlan(inner, url, variableName, format, fieldTerminator.map(_.value), context.legacyCsvQuoteEscaping)(solved)
}

def planUnwind(inner: LogicalPlan, name: IdName, expression: Expression)(implicit context: LogicalPlanningContext) = {
Expand Down
Expand Up @@ -40,20 +40,22 @@ object CSVResources {
val DEFAULT_BUFFER_SIZE: Int = 2 * 1024 * 1024
val DEFAULT_QUOTE_CHAR: Char = '"'

private val defaultConfig = new Configuration {
private def config(legacyCsvQuoteEscaping: Boolean) = new Configuration {
override def quotationCharacter(): Char = DEFAULT_QUOTE_CHAR

override def bufferSize(): Int = DEFAULT_BUFFER_SIZE

override def multilineFields(): Boolean = true

override def emptyQuotedStringsAsNull(): Boolean = true

override def legacyStyleQuoting(): Boolean = legacyCsvQuoteEscaping
}
}

class CSVResources(cleaner: TaskCloser) extends ExternalCSVResource {

def getCsvIterator(url: URL, fieldTerminator: Option[String] = None): Iterator[Array[String]] = {
def getCsvIterator(url: URL, fieldTerminator: Option[String], legacyCsvQuoteEscaping: Boolean): Iterator[Array[String]] = {
val inputStream = openStream(url)

val reader = if (url.getProtocol == "file") {
Expand All @@ -62,7 +64,7 @@ class CSVResources(cleaner: TaskCloser) extends ExternalCSVResource {
Readables.wrap(inputStream, url.toString, StandardCharsets.UTF_8)
}
val delimiter: Char = fieldTerminator.map(_.charAt(0)).getOrElse(CSVResources.DEFAULT_FIELD_TERMINATOR)
val seeker = CharSeekers.charSeeker(reader, CSVResources.defaultConfig, true)
val seeker = CharSeekers.charSeeker(reader, CSVResources.config(legacyCsvQuoteEscaping), true)
val extractor = new Extractors(delimiter).string()
val intDelimiter = delimiter.toInt
val mark = new Mark
Expand Down
Expand Up @@ -28,13 +28,13 @@ class CheckForEagerLoadCsvTest extends CypherFunSuite {
implicit val monitor = mock[PipeMonitor]

test("should notify for EagerPipe on top of LoadCsvPipe") {
val pipe = EagerPipe(LoadCSVPipe(AllNodesScanPipe("a")(), HasHeaders, Literal("foo"), "bar", None)())()
val pipe = EagerPipe(LoadCSVPipe(AllNodesScanPipe("a")(), HasHeaders, Literal("foo"), "bar", None, false)())()

checkForEagerLoadCsv(pipe) should equal(Some(EagerLoadCsvNotification))
}

test("should not notify for LoadCsv on top of eager pipe") {
val pipe = LoadCSVPipe(EagerPipe(AllNodesScanPipe("a")())(), HasHeaders, Literal("foo"), "bar", None)()
val pipe = LoadCSVPipe(EagerPipe(AllNodesScanPipe("a")())(), HasHeaders, Literal("foo"), "bar", None, false)()

checkForEagerLoadCsv(pipe) should equal(None)
}
Expand Down
Expand Up @@ -51,15 +51,15 @@ class CheckForLoadCsvAndMatchOnLargeLabelTest extends CypherFunSuite {
private val checker = CheckForLoadCsvAndMatchOnLargeLabel(planContext, THRESHOLD)

test("should notify when doing LoadCsv on top of large label scan") {
val loadCsvPipe = LoadCSVPipe(SingleRowPipe(), HasHeaders, Literal("foo"), "bar", None)()
val loadCsvPipe = LoadCSVPipe(SingleRowPipe(), HasHeaders, Literal("foo"), "bar", None, false)()
val pipe = NodeStartPipe(loadCsvPipe, "foo",
NodeByLabelEntityProducer(NodeByLabel("bar", labelOverThreshold), indexFor(labelOverThreshold)))()

checker(pipe) should equal(Some(LargeLabelWithLoadCsvNotification))
}

test("should not notify when doing LoadCsv on top of a large label scan") {
val loadCsvPipe = LoadCSVPipe(SingleRowPipe(), HasHeaders, Literal("foo"), "bar", None)()
val loadCsvPipe = LoadCSVPipe(SingleRowPipe(), HasHeaders, Literal("foo"), "bar", None, false)()
val pipe = NodeStartPipe(loadCsvPipe, "foo",
NodeByLabelEntityProducer(NodeByLabel("bar", labelUnderThrehsold), indexFor(labelUnderThrehsold)))()

Expand All @@ -68,7 +68,7 @@ class CheckForLoadCsvAndMatchOnLargeLabelTest extends CypherFunSuite {

test("should not notify when doing LoadCsv on top of large label scan") {
val startPipe = NodeStartPipe(SingleRowPipe(), "foo", NodeByLabelEntityProducer(NodeByLabel("bar", labelOverThreshold), indexFor(labelOverThreshold)))()
val pipe = LoadCSVPipe(startPipe, HasHeaders, Literal("foo"), "bar", None)()
val pipe = LoadCSVPipe(startPipe, HasHeaders, Literal("foo"), "bar", None, false)()

checker(pipe) should equal(None)
}
Expand Down
Expand Up @@ -22,10 +22,12 @@ package org.neo4j.cypher.internal.compiler.v3_0.executionplan
import java.net.URL

import org.mockito.Matchers
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.neo4j.cypher.internal.compiler.v3_0.pipes.ExternalCSVResource
import org.neo4j.cypher.internal.compiler.v3_0.spi.{QueryTransactionalContext, QueryContext}
import org.neo4j.cypher.internal.frontend.v3_0.test_helpers.CypherFunSuite

class LoadCsvPeriodicCommitObserverTest extends CypherFunSuite {

var resourceUnderTest: LoadCsvPeriodicCommitObserver = _
Expand All @@ -35,10 +37,10 @@ class LoadCsvPeriodicCommitObserverTest extends CypherFunSuite {

test("writing should not trigger tx restart until next csv line is fetched") {
// Given
when(resource.getCsvIterator(Matchers.eq(url), Matchers.any())).thenReturn(Iterator(Array("yo")))
when(resource.getCsvIterator(Matchers.eq(url), any(), any())).thenReturn(Iterator(Array("yo")))

// When
val iterator = resourceUnderTest.getCsvIterator(url)
val iterator = resourceUnderTest.getCsvIterator(url, None, false)
verify(transactionalContext, never()).commitAndRestartTx()

iterator.next()
Expand All @@ -48,11 +50,11 @@ class LoadCsvPeriodicCommitObserverTest extends CypherFunSuite {

test("multiple iterators are still handled correctly only commit when the first iterator advances") {
// Given
when(resource.getCsvIterator(Matchers.eq(url), Matchers.any())).
when(resource.getCsvIterator(Matchers.eq(url), any(), any())).
thenReturn(Iterator(Array("yo"))).
thenReturn(Iterator(Array("yo")))
val iterator1 = resourceUnderTest.getCsvIterator(url)
val iterator2 = resourceUnderTest.getCsvIterator(url)
val iterator1 = resourceUnderTest.getCsvIterator(url, None, false)
val iterator2 = resourceUnderTest.getCsvIterator(url, None, false)

// When
iterator2.next()
Expand All @@ -65,10 +67,10 @@ class LoadCsvPeriodicCommitObserverTest extends CypherFunSuite {

test("if a custom iterator is specified should be passed to the wrapped resource") {
// Given
resourceUnderTest.getCsvIterator(url, Some(";"))
resourceUnderTest.getCsvIterator(url, Some(";"), false)

// When
verify(resource, times(1)).getCsvIterator(url, Some(";"))
verify(resource, times(1)).getCsvIterator(url, Some(";"), false)
}

override protected def beforeEach() {
Expand Down
Expand Up @@ -43,7 +43,8 @@ class RuleExecutablePlanBuilderTest extends CypherFunSuite {
idpMaxTableSize = DefaultIDPSolverConfig.maxTableSize,
idpIterationDuration = DefaultIDPSolverConfig.iterationDurationLimit,
errorIfShortestPathFallbackUsedAtRuntime = false,
errorIfShortestPathHasCommonNodesAtRuntime = true
errorIfShortestPathHasCommonNodesAtRuntime = true,
legacyCsvQuoteEscaping = false
)
val planBuilder = new LegacyExecutablePlanBuilder(mock[Monitors], config, RewriterStepSequencer.newValidating,
typeConverter = IdentityTypeConverter)
Expand Down