Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Add BatchID Successible/Predecessible (#716)
Browse files Browse the repository at this point in the history
* Add BatchID Successible/Predecessible

* Remove stay import

* Fix possible end-point issue

* Return an old method for binary compat

* simplify the fix

* Use successible in toInterval
  • Loading branch information
johnynek committed Mar 7, 2017
1 parent 476d58e commit df8b4b0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 18 deletions.
Expand Up @@ -16,15 +16,17 @@ limitations under the License.

package com.twitter.summingbird.batch

import com.twitter.algebird.Monoid
import com.twitter.algebird.{
Empty,
Interval,
Intersection,
InclusiveLower,
ExclusiveLower,
ExclusiveUpper,
InclusiveLower,
InclusiveUpper,
ExclusiveLower,
Intersection,
Interval,
Monoid,
Successible,
Predecessible,
Universe
}
import com.twitter.bijection.{ Bijection, Injection }
Expand All @@ -49,6 +51,16 @@ object BatchID {
import OrderedFromOrderingExt._
implicit val equiv: Equiv[BatchID] = Equiv.by(_.id)

implicit def batchIdSuccessible: Successible[BatchID] =
Successible.fromNextOrd[BatchID] { bid =>
if (bid.id < Long.MaxValue) Some(bid.next) else None
}

implicit def batchIdPredecessible: Predecessible[BatchID] =
Predecessible.fromPrevOrd[BatchID] { bid =>
if (bid.id > Long.MinValue) Some(bid.prev) else None
}

// Enables BatchID(someBatchID.toString) roundtripping
def apply(str: String) = new BatchID(str.split("\\.")(1).toLong)

Expand All @@ -57,8 +69,14 @@ object BatchID {
* `[startBatch, endBatch]` (inclusive).
*/
def range(start: BatchID, end: BatchID): Iterable[BatchID] =
new Iterable[BatchID] {
def iterator = iterate(start)(_.next).takeWhile(_ <= end)
if (end < start) Nil
else {
new Iterable[BatchID] {
def iterator = {
val beforeEnd = iterate(start)(_.next).takeWhile(_ < end)
beforeEnd ++ Iterator.single(end)
}
}
}

/**
Expand Down
Expand Up @@ -17,15 +17,16 @@ limitations under the License.
package com.twitter.summingbird.batch

import com.twitter.algebird.{
Universe,
Empty,
Interval,
Intersection,
InclusiveLower,
ExclusiveLower,
ExclusiveUpper,
InclusiveLower,
InclusiveUpper,
ExclusiveLower,
Intersection,
Interval,
Lower,
Successible,
Universe,
Upper
}

Expand Down Expand Up @@ -169,7 +170,13 @@ trait Batcher extends Serializable {
dateToBatch(interval)(truncateUp)(truncateDown)

def toInterval(b: BatchID): Interval[Timestamp] =
Intersection(InclusiveLower(earliestTimeOf(b)), ExclusiveUpper(earliestTimeOf(b.next)))
Successible.next(b) match {
case Some(bnext) =>
Intersection(InclusiveLower(earliestTimeOf(b)), ExclusiveUpper(earliestTimeOf(bnext)))
case None =>
// this is defensive, but if BatchID == Long.MaxValue, probably the timestamp has overflowed
Intersection(InclusiveLower(earliestTimeOf(b)), InclusiveUpper(latestTimeOf(b)))
}

def toTimestamp(b: Interval[BatchID]): Interval[Timestamp] =
b match {
Expand Down
Expand Up @@ -21,10 +21,11 @@ import org.scalacheck.Prop._

import java.util.concurrent.TimeUnit

import com.twitter.algebird.Interval
import com.twitter.algebird.{ Interval, Successible, Predecessible }

object BatchLaws extends Properties("BatchID") {
import Generators._
import OrderedFromOrderingExt._

property("BatchIDs should RT to String") =
forAll { batch: BatchID => batch == BatchID(batch.toString) }
Expand All @@ -37,6 +38,21 @@ object BatchLaws extends Properties("BatchID") {
a.compare(b) == implicitly[Ordering[BatchID]].compare(BatchID(a), BatchID(b))
}

property("BatchID.next matches Successible.next(b)") =
forAll { b: BatchID =>
Successible.next(b) match {
case Some(bnext) => bnext == b.next
case None => b.next <= b // we are wrapping around
}
}
property("BatchID.prev matches Predecessible.prev(b)") =
forAll { b: BatchID =>
Predecessible.prev(b) match {
case Some(bprev) => bprev == b.prev
case None => b.prev >= b // we are wrapping around
}
}

property("BatchID should respect addition and subtraction") =
forAll { (init: Long, forward: Long, backward: Long) =>
val batchID = BatchID(init)
Expand Down
Expand Up @@ -37,12 +37,12 @@ private class BatchedOperations(batcher: Batcher) {
BatchID.toIterable(batchInterval)
}

// This does not look correct. How does this work for closed intervals for instance?
// Only here for binary compatibility reasons
def batchToTimestamp(bint: Interval[BatchID]): Interval[Timestamp] =
bint.mapNonDecreasing { batcher.earliestTimeOf(_) }
batcher.toTimestamp(bint)

def intersect(batches: Interval[BatchID], ts: Interval[Timestamp]): Interval[Timestamp] =
batchToTimestamp(batches) && ts
batcher.toTimestamp(batches) && ts

def intersect(batches: Iterable[BatchID], ts: Interval[Timestamp]): Option[Interval[Timestamp]] =
BatchID.toInterval(batches).map { intersect(_, ts) }
Expand All @@ -57,7 +57,7 @@ private class BatchedOperations(batcher: Batcher) {
}

def readBatched[T](inBatches: Interval[BatchID], mode: Mode, in: PipeFactory[T]): Try[(Interval[BatchID], FlowToPipe[T])] = {
val inTimes = batchToTimestamp(inBatches)
val inTimes = batcher.toTimestamp(inBatches)
// Read the delta stream for the needed times
in((inTimes, mode))
.right
Expand Down

0 comments on commit df8b4b0

Please sign in to comment.