Skip to content

Commit

Permalink
GEOMESA-3353 Add geomesa-kafka playback command
Browse files Browse the repository at this point in the history
  • Loading branch information
Walter Schultz authored and elahrvivaz committed Apr 17, 2024
1 parent f1b6b84 commit 0f911a3
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 0 deletions.
65 changes: 65 additions & 0 deletions docs/user/kafka/commandline.rst
Expand Up @@ -96,6 +96,71 @@ The ``--delay`` argument should be specified as a duration, in plain language. F
or ``1 second``. The ingest will pause after creating each ``SimpleFeature`` for the specified delay.
This can be used to simulate a live data stream.

``playback``
^^^^^^^^^^^^

The playback command can simulate a data stream by replaying features from a file directly on to a Kafka Data Store.
Features are returned based on a
date attribute in the feature. For example, if replaying three features that have dates that are each one second apart,
each feature will be emitted after a delay of one second. The rate of export can be modified to speed up or slow down
the original time differences.

======================== =========================================================
Argument Description
======================== =========================================================
``-c, --catalog *`` The catalog table containing schema metadata
``-f, --feature-name *`` The name of the schema
``--dtg`` Date attribute to base playback on. If not specified,
will use the default schema date field
``--rate`` Rate multiplier to speed-up (or slow down) features being
returned, as a float
``--live`` Will modify the returned dates to match the current time
``--config`` Properties file used to configure the Kafka producer
``-s, --spec`` SimpleFeatureType specification as a GeoTools spec string, SFT config, or file with either
``-C, --converter`` GeoMesa converter specification as a config string, file name, or name of an available converter
``--input-format`` File format of input files (shp, csv, tsv, avro, etc). Optional, auto-detection will be attempted
``--src-list`` Input files are text files with lists of files, one per line, to ingest.
``--partitions`` The number of partitions used for the Kafka topic
``--replication`` The replication factor for the Kafka topic
``--serialization`` The serialization format to use
``--schema-registry`` URL to a Confluent Schema Registry
``<files>...`` Input files to ingest
======================== =========================================================

The playback command is an extension of the :ref:`cli_ingest` command, and accepts all the parameters outlined there.
However, playback cannot run in distributed mode.

Also, note that the input files (specified in ``--src-list`` or ``<files>...``) must be time-ordered by the ``--dtg``
attribute before ingest or the playback will not work as expected.

The ``--rate`` parameter can be used to speed up or slow down the replay. It is specified as a floating point
number. For example ``--rate 10`` will make replay ten times faster, while ``--rate 0.1`` will make replay
ten times slower.

The ``--src-list`` argument is useful when you have more files to ingest than the command line will allow you to
specify. This file instructs GeoMesa to treat input files as new-line-separated file lists. As this makes it very
easy to run ingest jobs that can take days it's recommended to split lists into reasonable chunks that can be completed
in a couple hours.

The ``--force`` argument can be used to suppress any confirmation prompts (generally from converter inference),
which can be useful when scripting commands.

The ``<files>...`` argument specifies the files to be ingested. ``*`` may be used as a wild card in file paths.
GeoMesa can handle **gzip**, **bzip** and **xz** file compression as long as the file extensions match the
compression type. GeoMesa supports ingesting files from local disks or HDFS. In addition, Amazon's S3
and Microsoft's Azure file systems are supported with a few configuration changes. See
:doc:`/user/cli/filesystems` for details. Note: The behavior of this argument is changed by the ``--src-list`` argument.

By using a single ``-`` for the input files, input data may be piped directly to the ingest command using standard
shell redirection. Note that this will only work in local mode, and will only use a single thread for ingestion.
Schema inference is disabled in this case, and progress indicators may not be entirely accurate, as the total size
isn't known up front.

For example::

$ cat foo.csv | geomesa-accumulo ingest ... -
$ geomesa-accumulo ingest ... - <foo.csv

``listen``
^^^^^^^^^^

Expand Down
Expand Up @@ -8,6 +8,7 @@

package org.locationtech.geomesa.kafka.tools

import org.locationtech.geomesa.kafka.tools.ingest.KafkaPlaybackCommand
import org.locationtech.geomesa.tools.{Command, Runner}

object KafkaRunner extends Runner {
Expand All @@ -23,6 +24,7 @@ object KafkaRunner extends Runner {
new export.KafkaListenCommand,
new export.KafkaExportCommand,
new ingest.KafkaIngestCommand,
new KafkaPlaybackCommand,
new status.KafkaDescribeSchemaCommand,
new status.KafkaGetSftConfigCommand,
new status.KafkaGetTypeNamesCommand
Expand Down
@@ -0,0 +1,74 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.kafka.tools.ingest

import com.beust.jcommander.{Parameter, ParameterException, Parameters}
import com.typesafe.config.Config
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.jobs.Awaitable
import org.locationtech.geomesa.kafka.data.KafkaDataStore
import org.locationtech.geomesa.kafka.tools.KafkaDataStoreCommand.KafkaDistributedCommand
import org.locationtech.geomesa.kafka.tools.{KafkaDataStoreCommand, ProducerDataStoreParams}
import org.locationtech.geomesa.kafka.tools.ingest.KafkaIngestCommand.KafkaIngestParams
import org.locationtech.geomesa.tools.{Command, ConverterConfigParam, OptionalFeatureSpecParam, OptionalForceParam, OptionalInputFormatParam, OptionalTypeNameParam}
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode
import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs}
import org.locationtech.geomesa.tools.ingest._
import org.locationtech.geomesa.tools.utils.ParameterConverters.DurationConverter
import org.locationtech.geomesa.utils.iterators.SimplePlaybackIterator
import org.locationtech.geomesa.utils.collection.{CloseableIterator, SelfClosingIterator}

import scala.concurrent.duration.Duration

class KafkaPlaybackCommand extends IngestCommand[KafkaDataStore] with KafkaDataStoreCommand {

import scala.collection.JavaConverters._

override val name = "playback"
override val params = new KafkaPlaybackCommand.KafkaPlaybackParams()

// override to add delay in writing messages
override protected def startIngest(
mode: RunMode,
ds: KafkaDataStore,
sft: SimpleFeatureType,
converter: Config,
inputs: Inputs): Awaitable = {
if (mode != RunModes.Local) {
throw new ParameterException("Distributed ingest is not supported for playback")
}

val dtg = Option(params.dtg)
val rate: Float = Option(params.rate).map(_.floatValue()).getOrElse(1f)
val live = Option(params.live).exists(_.booleanValue())

Command.user.info(s"Starting playback...")
new LocalConverterIngest(ds, connection.asJava, sft, converter, inputs, 1) {
override protected def features(iter: CloseableIterator[SimpleFeature]): CloseableIterator[SimpleFeature] =
new SimplePlaybackIterator(iter, sft, dtg, null, rate, live)
}
}
}

object KafkaPlaybackCommand {
@Parameters(commandDescription = "Playback features onto Kafka from time-ordered file(s), based on the feature date")
class KafkaPlaybackParams extends IngestParams with ProducerDataStoreParams {

@Parameter(names = Array("--dtg"), description = "Date attribute to base playback on")
var dtg: String = _

@Parameter(names = Array("--rate"), description = "Rate multiplier to speed-up (or slow down) features being returned")
var rate: java.lang.Float = _

@Parameter(names = Array("--live"), description = "Simulate live data by projecting the dates to current time")
var live: java.lang.Boolean = _
}
}
@@ -0,0 +1,86 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.utils.iterators

import com.typesafe.scalalogging.StrictLogging
import org.geotools.api.data.Query
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.geotools.factory.CommonFactoryFinder
import org.geotools.feature.simple.SimpleFeatureImpl
import org.locationtech.geomesa.utils.collection.{CloseableIterator, SelfClosingIterator}
import org.locationtech.geomesa.utils.iterators.PlaybackIterator.ff

import java.util.Date

/**
* Query over a time frame and return the features in sorted order, delayed based on the date of each feature
* to simulate the original ingestion stream
*
* Requires the iterator to be sorted by time
*
* @param dtg date attribute to sort by
* @param filter additional filter predicate, if any
* @param transforms query transforms, if any
* @param rate multiplier for the rate of returning features, applied to the original delay between features
* @param live project dates to current time
*/
class SimplePlaybackIterator(
iterator: CloseableIterator[SimpleFeature],
sft: SimpleFeatureType,
dtg: Option[String] = None,
transforms: Array[String] = null,
rate: Float = 10f,
live: Boolean = false
) extends CloseableIterator[SimpleFeature] with StrictLogging {

import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

// require(interval._2.after(interval._1), s"Interval is not ordered correctly: ${interval._1}/${interval._2}")

private val dtgName = dtg.orElse(sft.getDtgField).getOrElse {
throw new IllegalArgumentException("Schema does not have a default date field")
}
private val tdefs = transforms match {
case null => null
case t if t.indexOf(dtgName) == -1 => t :+ dtgName
case t => t
}
private val dtgIndex = tdefs match {
case null => sft.indexOf(dtgName)
case t => t.indexOf(dtgName)
}
require(dtgIndex != -1, "Invalid date field")
private var start: Long = -1
private var eventStart: Long = -1

override def hasNext: Boolean = iterator.hasNext

override def next(): SimpleFeature = {
val feature = iterator.next();
val featureTime = feature.getAttribute(dtgIndex).asInstanceOf[Date].getTime
if (start == -1L) {
// emit the first feature as soon as it's available, and set the clock to start timing from here
start = System.currentTimeMillis()
logger.info("Starting replay clock at: {}", start)
eventStart = featureTime
}
val featureRelativeTime = start + ((featureTime - eventStart) / rate).toLong
val sleep = featureRelativeTime - System.currentTimeMillis()
if (sleep > 0) {
Thread.sleep(sleep)
}
if (live) {
feature.setAttribute(dtgIndex, new Date(featureRelativeTime))
}
feature
}

override def close(): Unit = iterator.close()
}
@@ -0,0 +1,91 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.utils.iterators

import com.typesafe.scalalogging.LazyLogging
import org.geotools.api.feature.simple.SimpleFeature
import org.geotools.data.memory.MemoryDataStore
import org.geotools.feature.simple.SimpleFeatureBuilder
import org.geotools.filter.text.ecql.ECQL
import org.junit.runner.RunWith
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.io.WithClose
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner
import org.locationtech.geomesa.utils.collection.CloseableIterator

import java.util.Date
import scala.concurrent.duration.Duration

@RunWith(classOf[JUnitRunner])
class SimplePlaybackIteratorTest extends Specification with LazyLogging {

val sft = SimpleFeatureTypes.createType("test", "name:String,dtg:Date,*geom:Point:srid=4326")
val builder = new SimpleFeatureBuilder(sft)
val features = Seq.tabulate(10) { i =>
builder.addAll(s"name$i", s"2018-01-01T00:00:0${9 - i}.000Z", s"POINT (4$i 55)")
builder.buildFeature(s"$i")
}

val dtg = Some("dtg")
val interval: (Date, Date) = {
val start = features.last.getAttribute("dtg").asInstanceOf[Date].getTime - 1
val end = features.head.getAttribute("dtg").asInstanceOf[Date].getTime + 1
(new Date(start), new Date(end))
}

"PlaybackIterator" should {
"replicate original rate" in {
val filteredFeatures = features.toStream
.filter(sf => sf.getAttribute("name").equals("name7") || sf.getAttribute("name").equals("name8"))
.sortWith(_.getAttribute("dtg").asInstanceOf[Date].getTime < _.getAttribute("dtg").asInstanceOf[Date].getTime)
WithClose(new SimplePlaybackIterator(CloseableIterator(filteredFeatures.iterator), sft, dtg, rate = 10f)) { iter =>
// don't time the first result, as it will be inconsistent due to setup and querying
iter.hasNext must beTrue
iter.next()
val start = System.currentTimeMillis()
iter.hasNext must beTrue
// should block until second feature time has elapsed, 100 millis from first feature (due to 10x rate)
iter.next()
val elapsed = System.currentTimeMillis() - start
// due to system load, this test tends to fail in travis, so we don't cause an explicit test failure
if (elapsed > 130L) {
logger.warn(s"PlaybackIteratorTest - playback result was delayed longer than expected 100ms: ${elapsed}ms")
}

iter.hasNext must beFalse
}
}
"project to current time" in {
// dates are 1 and 2 seconds into interval
val filteredFeatures = features.toStream
.filter(sf => sf.getAttribute("name").equals("name7") || sf.getAttribute("name").equals("name8"))
.sortWith(_.getAttribute("dtg").asInstanceOf[Date].getTime < _.getAttribute("dtg").asInstanceOf[Date].getTime)

WithClose(new SimplePlaybackIterator(CloseableIterator(filteredFeatures.iterator), sft, dtg, rate = 10f, live = true)) { iter =>
// don't time the first result, as it will be inconsistent due to setup and querying
iter.hasNext must beTrue
val dtg1 = iter.next().getAttribute("dtg").asInstanceOf[Date]
val start = System.currentTimeMillis()
iter.hasNext must beTrue
// should block until second feature time has elapsed, 100 millis from first feature (due to 10x rate)
val dtg2 = iter.next().getAttribute("dtg").asInstanceOf[Date]
val elapsed = System.currentTimeMillis() - start
// due to system load, this test tends to fail in travis, so we don't cause an explicit test failure
if (elapsed > 130L) {
logger.warn(s"PlaybackIteratorTest - playback result was delayed longer than expected 100ms: ${elapsed}ms")
}
dtg1.getTime must beCloseTo(start, 30)
dtg2.getTime must beCloseTo(start + 100, 30) // + 100 due to 10x rate

iter.hasNext must beFalse
}
}
}
}

0 comments on commit 0f911a3

Please sign in to comment.