Skip to content

Commit

Permalink
poc of time interval calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Jul 22, 2015
1 parent c506661 commit 1a68e03
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{Interval, UTF8String}

/**
* Returns the current date at the start of query evaluation.
Expand Down Expand Up @@ -305,3 +305,59 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
})
}
}

/**
* Time Adds Interval.
*/
case class TimeAdd(start: Expression, interval: Expression)
extends BinaryExpression with ExpectsInputTypes with CodegenFallback {

override def left: Expression = start
override def right: Expression = interval

override def toString: String = s"$left + $right"
override def inputTypes: Seq[AbstractDataType] =
Seq(TypeCollection(DateType, TimestampType), IntervalType)

override def dataType: DataType = TimestampType

override def nullSafeEval(start: Any, inter: Any): Any = {
val itvl = inter.asInstanceOf[Interval]
dataType match {
case DateType =>
DateTimeUtils.dateAddFullInterval(
start.asInstanceOf[Int], itvl.months, itvl.microseconds)
case TimestampType =>
DateTimeUtils.timestampAddFullInterval(
start.asInstanceOf[Long], itvl.months, itvl.microseconds)
}
}
}

/**
* Time Subtracts Interval.
*/
case class TimeSub(start: Expression, interval: Expression)
extends BinaryExpression with ExpectsInputTypes with CodegenFallback {

override def left: Expression = start
override def right: Expression = interval

override def toString: String = s"$left - $right"
override def inputTypes: Seq[AbstractDataType] =
Seq(TypeCollection(DateType, TimestampType), IntervalType)

override def dataType: DataType = TimestampType

override def nullSafeEval(start: Any, inter: Any): Any = {
val itvl = inter.asInstanceOf[Interval]
dataType match {
case DateType =>
DateTimeUtils.dateAddFullInterval(
start.asInstanceOf[Int], 0 - itvl.months, 0 - itvl.microseconds)
case TimestampType =>
DateTimeUtils.timestampAddFullInterval(
start.asInstanceOf[Long], 0 - itvl.months, 0 - itvl.microseconds)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -573,4 +573,61 @@ object DateTimeUtils {
dayInYear - 334
}
}

/**
* Returns the date value for the first day of the given month.
* The month is expressed in months since 1.1.1970, starting from 0.
*/
def getDaysFromMonths(months: Int): Int = {
val yearSinceEpoch = if (months < 0) months / 12 - 1 else months / 12
val monthInYearFromZero = months - yearSinceEpoch * 12
val daysFromYears = getDaysFromYears(yearSinceEpoch)
val febDays = if (isLeapYear(1970 + yearSinceEpoch)) 29 else 28
val daysForMonths = Seq(31, febDays, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
daysForMonths.slice(0, monthInYearFromZero + 1).sum + daysFromYears
}

/**
* Returns the date value for January 1 of the given year.
* The year is expressed in years since 1.1.1970, starting from 0.
*/
def getDaysFromYears(years: Int): Int = {
val remainYear = (years % 400 + 400) % 400
val cycles = (years - remainYear) / 400
val numLeaps = if (remainYear < 130) {
remainYear / 4
} else if (remainYear < 230) {
remainYear / 4 - 1
} else if (remainYear < 330) {
remainYear / 4 - 2
} else {
remainYear / 4 - 3
}
cycles * (365 * 400 + 397) + remainYear * 365 + numLeaps
}

/**
* Add date and year-month interval.
* Returns a date value, expressed in days since 1.1.1970.
*/
def dateAddYearMonthInterval(days: Int, months: Int): Int = {
getDaysFromMonths(getYear(days) * 12 + getMonth(days) + months) + getDayOfMonth(days)
}

/**
* Add date and full interval.
* Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00.
*/
def dateAddFullInterval(days: Int, months: Int, microseconds: Long): Long = {
daysToMillis(dateAddYearMonthInterval(days, months)) * 1000 + microseconds
}

/**
* Add timestamp and full interval.
* Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00.
*/
def timestampAddFullInterval(micros: Long, months: Int, microseconds: Long): Long = {
dateAddYearMonthInterval(
millisToDays(micros / 1000), months) + micros % (MILLIS_PER_DAY * 1000) + microseconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.Calendar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{IntegerType, StringType, TimestampType, DateType}
import org.apache.spark.unsafe.types.Interval

class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

Expand Down Expand Up @@ -279,4 +280,24 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkResult(
DateSub(Literal("2015-01-01"), Literal.create(null, IntegerType)), null)
}

test("time_add") {
checkResult(
TimeAdd(Literal(Date.valueOf("2016-02-28")), Literal(new Interval(1, 0))),
DateTimeUtils.fromJavaDate(Date.valueOf("2016-02-29")))
checkResult(
TimeAdd(Literal(Date.valueOf("2016-03-01")), Literal(new Interval(1, 2000000.toLong))),
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-03-02 00:00:02")))
}

test("time_sub") {
checkResult(
TimeSub(Literal(Timestamp.valueOf("2016-02-28 10:00:00")), Literal(new Interval(1, 0))),
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-02-27 10:00:00")))
checkResult(
TimeSub(
Literal(Timestamp.valueOf("2016-03-01 00:00:02")),
Literal(new Interval(1, 2000000.toLong))),
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-02-28 23:59:59")))
}
}

0 comments on commit 1a68e03

Please sign in to comment.