Skip to content

Commit

Permalink
Fixes buggy fromMillisecs
Browse files Browse the repository at this point in the history
  • Loading branch information
johnynek committed Jan 10, 2013
1 parent 78edf58 commit 88aa8cb
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 33 deletions.
58 changes: 56 additions & 2 deletions src/main/scala/com/twitter/scalding/AbsoluteDuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,67 @@ package com.twitter.scalding

import java.util.Calendar

import scala.annotation.tailrec

/*
* These are reasonably indepedendent of calendars (or we will pretend)
*/
object AbsoluteDuration extends java.io.Serializable {
def max(a : AbsoluteDuration, b : AbsoluteDuration) = if(a > b) a else b

type TimeCons = ((Int) => AbsoluteDuration, Int)

val SEC_IN_MS = 1000
val MIN_IN_MS = 60 * SEC_IN_MS
val HOUR_IN_MS = 60 * MIN_IN_MS
val UTC_UNITS = List[TimeCons]((Hours,HOUR_IN_MS),
(Minutes,MIN_IN_MS),
(Seconds,SEC_IN_MS),
(Millisecs,1)).reverse

def exact(fnms: TimeCons): (Long) => Option[AbsoluteDuration] = { ms: Long =>
if( ms % fnms._2 == 0 ) {
Some(fnms._1( (ms / fnms._2).toInt ))
}
else {
None
}
}

def fromMillisecs(diffInMs: Long): AbsoluteDuration = fromMillisecs(diffInMs, UTC_UNITS, Nil)

@tailrec
private def fromMillisecs(diffInMs: Long, units: List[TimeCons], acc: List[AbsoluteDuration]): AbsoluteDuration = {
units match {
case (tc0 :: tc1 :: tail) => {
//Only get as many as the next guy can't get:
val nextSize = tc1._2
val thisDiff = diffInMs % nextSize
val theseMillis = thisDiff / tc0._2
val thisPart = tc0._1(theseMillis.toInt)
fromMillisecs(diffInMs - theseMillis, (tc1 :: tail), thisPart :: acc)
}
case (tc :: Nil) => {
// We can't go any further, try to jam the rest into this unit:
val (fn, cnt) = tc
require((diffInMs / cnt <= Int.MaxValue) && ((diffInMs / cnt) >= Int.MinValue),
"diff not representable in an Int")
val thisPart = fn((diffInMs / cnt).toInt)

if (acc.isEmpty)
thisPart
else
AbsoluteDurationList(thisPart :: acc)
}
case Nil => {
// These are left over millisecs, but should be unreachable
sys.error("this is only reachable if units is passed with a length == 0, which should never happen")
}
}
}
}
trait AbsoluteDuration extends Duration with Ordered[AbsoluteDuration] {

sealed trait AbsoluteDuration extends Duration with Ordered[AbsoluteDuration] {
def toSeconds : Double = {
calField match {
case Calendar.MILLISECOND => count / 1000.0
Expand All @@ -44,7 +98,7 @@ trait AbsoluteDuration extends Duration with Ordered[AbsoluteDuration] {
this.toMillisecs.compareTo(that.toMillisecs)
}
def +(that : AbsoluteDuration) = {
Duration.fromMillisecs(this.toMillisecs + that.toMillisecs)
AbsoluteDuration.fromMillisecs(this.toMillisecs + that.toMillisecs)
}
}

Expand Down
30 changes: 3 additions & 27 deletions src/main/scala/com/twitter/scalding/Duration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,14 @@ import scala.annotation.tailrec
* calField should be a java.util.Calendar field
*/
object Duration extends java.io.Serializable {
// TODO: remove this in 0.9.0
val SEC_IN_MS = 1000
val MIN_IN_MS = 60 * SEC_IN_MS
val HOUR_IN_MS = 60 * MIN_IN_MS
val UTC_UNITS = List((Hours,HOUR_IN_MS),(Minutes,MIN_IN_MS),(Seconds,SEC_IN_MS),(Millisecs,1))

// Creates the largest unit corresponding to this number of milliseconds (or possibly a duration list)
def fromMillisecs(diffInMs : Long) : AbsoluteDuration = {
// Try to see if we have an even number of any "calendar-free" units
// Note, no unit is truly calendar free due to leap years, seconds, etc... so
// so this is approximate
// We can't fail the last one, x % 1 == 0
UTC_UNITS.find { u_ms : (Function1[Int,AbsoluteDuration],Int) =>
//This always returns something, maybe the last item:
(diffInMs % u_ms._2) == 0
}.map { u_ms : (Function1[Int,AbsoluteDuration],Int) =>
val count = diffInMs / u_ms._2
if (count <= Int.MaxValue)
u_ms._1(count.toInt)
else {
if (diffInMs / HOUR_IN_MS > Int.MaxValue) throw new RuntimeException("difference not expressable in 2^{31} hours")

AbsoluteDurationList(UTC_UNITS.foldLeft((diffInMs, Nil : List[AbsoluteDuration])) { (state, u_ms) =>
val (rest, durList) = state
val (constructor, timeInterval) = u_ms
val thisCnt = (rest / timeInterval).toInt
val next = rest - (thisCnt) * timeInterval
(next, constructor(thisCnt) :: durList)
}._2)
}
//This get should never throw because the list always finds something
}.get
}
@deprecated("Use AbsoluteDuration.fromMillisecs", "0.8.2")
def fromMillisecs(diffInMs : Long) : AbsoluteDuration = AbsoluteDuration.fromMillisecs(diffInMs)
}

abstract class Duration(val calField : Int, val count : Int, val tz : TimeZone)
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/com/twitter/scalding/FileSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ case class JsonLine(p : String) extends FixedPathSource(p) with TextLineScheme {
import Dsl._

override def transformForWrite(pipe : Pipe) = pipe.mapTo(Fields.ALL -> 'json) {
t : TupleEntry =>
Json.generate(t.getFields.asScala.map(f => f.toString -> t.getString(f.toString)).toMap)
t: TupleEntry => Json.generate(toMap(t).mapValues { _.toString })
}
}
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/scalding/RichDate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ case class RichDate(val value : Date) extends Ordered[RichDate] {
def -(interval : Duration) = interval.subtractFrom(this)

//Inverse of the above, d2 + (d1 - d2) == d1
def -(that : RichDate) = Duration.fromMillisecs(value.getTime - that.value.getTime)
def -(that : RichDate) = AbsoluteDuration.fromMillisecs(value.getTime - that.value.getTime)

override def compare(that : RichDate) : Int = {
if (value.before(that.value)) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/com/twitter/scalding/TupleConversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ trait TupleConversions extends GeneratedConversions {
}.toList
}

def toMap(tupe: TupleEntry): Map[String, AnyRef] = {
val keys = tupe.getFields
(0 until keys.size).map { idx =>
(keys.get(idx).toString, tupe.getObject(idx))
}.toMap
}

// Convert a Cascading TupleEntryIterator into a Stream of a given type
def toStream[T](it : TupleEntryIterator)(implicit conv : TupleConverter[T]) : Stream[T] = {
if(null != it && it.hasNext) {
Expand Down
21 changes: 20 additions & 1 deletion src/test/scala/com/twitter/scalding/DateProperties.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.scalacheck.Arbitrary
import org.scalacheck.Properties
import org.scalacheck.Prop.forAll
import org.scalacheck.Gen.choose
import org.scalacheck.Prop._

object DateProperties extends Properties("Date Properties") {

Expand All @@ -27,6 +28,13 @@ object DateProperties extends Properties("Date Properties") {
(dr + r) - r == dr &&
(dr.start + r) - r == dr.start
}
property("fromMillisecs toMillisecs") = forAll { (unsafems: Long) =>
val ms = unsafems/50
val hours = ms/AbsoluteDuration.HOUR_IN_MS
(Int.MinValue <= hours && hours <= Int.MaxValue) ==>
(AbsoluteDuration.fromMillisecs(ms).toMillisecs == ms)
}

def asInt(b: Boolean) = if(b) 1 else 0

property("Before/After works") = forAll { (dr: DateRange, rd: RichDate) =>
Expand All @@ -35,7 +43,7 @@ object DateProperties extends Properties("Date Properties") {
(dr.isAfter(dr.start - (dr.end - dr.start)))
}

def divDur(ad: AbsoluteDuration, div: Int) = Duration.fromMillisecs(ad.toMillisecs/div)
def divDur(ad: AbsoluteDuration, div: Int) = AbsoluteDuration.fromMillisecs(ad.toMillisecs/div)

property("each output is contained") = forAll { (dr: DateRange) =>
val r = divDur(dr.end - dr.start, 10)
Expand All @@ -47,6 +55,17 @@ object DateProperties extends Properties("Date Properties") {
dr.extend(d).contains(dr)
}

property("RichDate subtraction Roundtrip") = forAll { (utimestamp0: Long, utimestamp1: Long) =>
val timestamp0 = utimestamp0/50
val timestamp1 = utimestamp1/50
val hours = (timestamp0 - timestamp1)/AbsoluteDuration.HOUR_IN_MS
(Int.MinValue <= hours && hours <= Int.MaxValue) ==>
((RichDate(timestamp0) - RichDate(timestamp1)).toMillisecs == (timestamp0 - timestamp1))
}
property("Millisecs rt") = forAll { (ms: Int) =>
Millisecs(ms).toMillisecs.toInt == ms
}

def toRegex(glob: String) = (glob.flatMap { c => if(c == '*') ".*" else c.toString }).r

def matches(l: List[String], arg: String): Int = l
Expand Down

0 comments on commit 88aa8cb

Please sign in to comment.