Skip to content

Commit

Permalink
GEOMESA-65 IngestFeatureCommand does not properly close FeatureWriter…
Browse files Browse the repository at this point in the history
… leading to truncated ingest

Added a CloseableFeatureWriter closure which is threaded through
the computation using Scalding's `using` facility.
  • Loading branch information
anthonyccri committed Apr 25, 2014
1 parent 9b07e5e commit afa2241
Showing 1 changed file with 11 additions and 7 deletions.
Expand Up @@ -31,12 +31,13 @@ import org.apache.commons.cli.{Option => Opt, Options, CommandLine}
import org.apache.commons.vfs2.impl.VFSClassLoader
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.util.ToolRunner
import org.geotools.data.{Transaction, DataStoreFinder, DataUtilities}
import org.geotools.data.{FeatureWriter, Transaction, DataStoreFinder, DataUtilities}
import org.geotools.factory.Hints
import org.geotools.filter.identity.FeatureIdImpl
import org.geotools.geometry.jts.JTSFactoryFinder
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.opengis.feature.simple.{SimpleFeatureType, SimpleFeature}

class IngestFeatureCommand extends Command {

Expand Down Expand Up @@ -227,17 +228,21 @@ class SFTIngest(args: Args) extends Job(args) {
"auths" -> auths
)
lazy val ds = DataStoreFinder.getDataStore(params)
lazy val fw = ds.getFeatureWriter(typeName, Transaction.AUTO_COMMIT)

class CloseableFeatureWriter {
val fw = ds.getFeatureWriter(typeName, Transaction.AUTO_COMMIT)

def release(): Unit = { fw.close() }
}

lazy val attributes = sft.getAttributeDescriptors
lazy val dtBuilder = buildDtBuilder
lazy val idBuilder = buildIDBuilder

TextLine(path)
.flatMap('line -> List('key, 'value)) { line: String => parseFeature(line) }
.write(NullSource)
TextLine(path).using(new CloseableFeatureWriter)
.foreach('line) { (cfw: CloseableFeatureWriter, line: String) => parseFeature(cfw.fw, line) }

def parseFeature(line: String): List[geomesa.core.index.KeyValuePair] = {
def parseFeature(fw: FeatureWriter[SimpleFeatureType, SimpleFeature], line: String): Unit = {
try {
val attrs = line.toString.split(delim)
val id = idBuilder(attrs)
Expand All @@ -261,7 +266,6 @@ class SFTIngest(args: Args) extends Job(args) {
} catch {
case t: Throwable => t.printStackTrace()
}
Nil
}

def buildIDBuilder: (Array[String]) => String = {
Expand Down

0 comments on commit afa2241

Please sign in to comment.