Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Convert Instant to epoch millis when passing into RangeQueryBuilder f…
Browse files Browse the repository at this point in the history
…or rollup search request (#373)
  • Loading branch information
qreshi committed Jan 7, 2021
1 parent e1af452 commit 9207224
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder
fun Rollup.getRollupSearchRequest(metadata: RollupMetadata): SearchRequest {
val query = if (metadata.continuous != null) {
RangeQueryBuilder(this.getDateHistogram().sourceField)
.from(metadata.continuous.nextWindowStartTime, true)
.to(metadata.continuous.nextWindowEndTime, false)
.from(metadata.continuous.nextWindowStartTime.toEpochMilli(), true)
.to(metadata.continuous.nextWindowEndTime.toEpochMilli(), false)
} else {
MatchAllQueryBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,78 @@ class RollupInterceptorIT : RollupRestTestCase() {
rawAggBucket["avg"]!!["value"], rollupAggBucket["avg"]!!["value"])
}
}

// Skipping this test for now as the date format of the 'tpep_pickup_datetime' field is causing exceptions
// when ZonedDateTime.parse() is called for determining continuous rollup start/end time.
// TODO: Enable this test to verify when the above issue is fixed
fun `skip test continuous rollup search`() {
generateNYCTaxiData("source_continuous_rollup_search")
val rollup = Rollup(
id = "basic_term_query_continuous_rollup_search",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_continuous_rollup_search",
targetIndex = "target_continuous_rollup_search",
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = true,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(sourceField = "passenger_count", targetField = "passenger_count", metrics = listOf(Sum(), Min(), Max(),
ValueCount(), Average())),
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertTrue("Rollup has not caught up yet", Instant.now().isBefore(rollupMetadata.continuous!!.nextWindowEndTime))
}

refreshAllIndices()

// Term query
var req = """
{
"size": 0,
"query": {
"term": {
"RatecodeID": 1
}
},
"aggs": {
"min_passenger_count": {
"min": {
"field": "passenger_count"
}
}
}
}
""".trimIndent()
var rawRes = client().makeRequest("POST", "/source_continuous_rollup_search/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
var rollupRes = client().makeRequest("POST", "/target_continuous_rollup_search/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupRes.restStatus() == RestStatus.OK)
var rawAggRes = rawRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
var rollupAggRes = rollupRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"Source and rollup index did not return same min results",
rawAggRes.getValue("min_passenger_count")["value"],
rollupAggRes.getValue("min_passenger_count")["value"]
)
}
}

0 comments on commit 9207224

Please sign in to comment.