From afa224100191755394dbfa99b4559aa23ed5b111 Mon Sep 17 00:00:00 2001 From: Anthony Fox Date: Fri, 25 Apr 2014 08:28:54 -0400 Subject: [PATCH] GEOMESA-65 IngestFeatureCommand does not properly close FeatureWriter leading to truncated ingest Added a CloseableFeatureWriter closure which is threaded through the computation using Scalding's `using` facility. --- .../core/util/shell/IngestFeatureCommand.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/geomesa-shell/src/main/scala/geomesa/core/util/shell/IngestFeatureCommand.scala b/geomesa-shell/src/main/scala/geomesa/core/util/shell/IngestFeatureCommand.scala index 34720f6c5682..fea350640d6d 100644 --- a/geomesa-shell/src/main/scala/geomesa/core/util/shell/IngestFeatureCommand.scala +++ b/geomesa-shell/src/main/scala/geomesa/core/util/shell/IngestFeatureCommand.scala @@ -31,12 +31,13 @@ import org.apache.commons.cli.{Option => Opt, Options, CommandLine} import org.apache.commons.vfs2.impl.VFSClassLoader import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.util.ToolRunner -import org.geotools.data.{Transaction, DataStoreFinder, DataUtilities} +import org.geotools.data.{FeatureWriter, Transaction, DataStoreFinder, DataUtilities} import org.geotools.factory.Hints import org.geotools.filter.identity.FeatureIdImpl import org.geotools.geometry.jts.JTSFactoryFinder import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat +import org.opengis.feature.simple.{SimpleFeatureType, SimpleFeature} class IngestFeatureCommand extends Command { @@ -227,17 +228,21 @@ class SFTIngest(args: Args) extends Job(args) { "auths" -> auths ) lazy val ds = DataStoreFinder.getDataStore(params) - lazy val fw = ds.getFeatureWriter(typeName, Transaction.AUTO_COMMIT) + + class CloseableFeatureWriter { + val fw = ds.getFeatureWriter(typeName, Transaction.AUTO_COMMIT) + + def release(): Unit = { fw.close() } + } lazy val attributes = sft.getAttributeDescriptors lazy val dtBuilder = buildDtBuilder lazy val idBuilder = buildIDBuilder - TextLine(path) - .flatMap('line -> List('key, 'value)) { line: String => parseFeature(line) } - .write(NullSource) + TextLine(path).using(new CloseableFeatureWriter) + .foreach('line) { (cfw: CloseableFeatureWriter, line: String) => parseFeature(cfw.fw, line) } - def parseFeature(line: String): List[geomesa.core.index.KeyValuePair] = { + def parseFeature(fw: FeatureWriter[SimpleFeatureType, SimpleFeature], line: String): Unit = { try { val attrs = line.toString.split(delim) val id = idBuilder(attrs) @@ -261,7 +266,6 @@ class SFTIngest(args: Args) extends Job(args) { } catch { case t: Throwable => t.printStackTrace() } - Nil } def buildIDBuilder: (Array[String]) => String = {