Skip to content

Commit

Permalink
Use native timestamps in mediawiki history
Browse files Browse the repository at this point in the history
We were using original mediawiki formatted string
as timetamps (YYYYMMDDHHMMSS). This patch updates
the fields to now use java.sql.Timestamp, but still
saves them as JDBC compliant strings because of
our version of hive not yet accepting Timestamps
in Parquet format, see:
https://issues.apache.org/jira/browse/HIVE-6384

Bug: T161150

Change-Id: I021c70528aa992b9a958bb1f70ba7878f65ebe54
  • Loading branch information
jobar committed Jun 16, 2017
1 parent 0fcdd84 commit 7a887f2
Show file tree
Hide file tree
Showing 26 changed files with 592 additions and 383 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized

import java.sql.Timestamp

import org.apache.spark.Partitioner

import scala.math.Ordered.orderingToOrdered
Expand All @@ -9,8 +11,8 @@ import scala.math.Ordered.orderingToOrdered
* Used to generate the set of years covered by such an object.
*/
trait TimeBoundaries {
def startTimestamp: Option[String]
def endTimestamp: Option[String]
def startTimestamp: Option[Timestamp]
def endTimestamp: Option[Timestamp]
}

/**
Expand All @@ -31,7 +33,7 @@ trait HasPartitionKey {
* @param id The interesting id (can be userId, pageId, revId...)
* @param year The year (because whole-time groups are too big)
*/
case class PartitionKey(db: String, id: Long, year: String)
case class PartitionKey(db: String, id: Long, year: Int)
extends Ordered[PartitionKey] {
override def compare(that: PartitionKey): Int =
(this.db, this.id, this.year) compare (that.db, that.id, that.year)
Expand All @@ -50,14 +52,16 @@ case class PartitionKey(db: String, id: Long, year: String)
* @param endTimestamp The state endTimestamp
*/
case class StateKey(partitionKey: PartitionKey,
startTimestamp: Option[String],
endTimestamp: Option[String])
startTimestamp: Option[Timestamp],
endTimestamp: Option[Timestamp])
extends Ordered[StateKey]
with HasPartitionKey {
override def compare(that: StateKey): Int = {
val partitionComp = this.partitionKey.compare(that.partitionKey)
if (partitionComp == 0)
(this.startTimestamp, this.endTimestamp) compare (that.startTimestamp, that.endTimestamp)
if (partitionComp == 0) {
// No option comparator defined for Timestamp, so we use the Long one using getTime
(this.startTimestamp.map(_.getTime), this.endTimestamp.map(_.getTime)) compare (that.startTimestamp.map(_.getTime), that.endTimestamp.map(_.getTime))
}
else partitionComp
}
}
Expand All @@ -76,14 +80,14 @@ case class StateKey(partitionKey: PartitionKey,
* @param sortingId The MW Event sorting id
*/
case class MediawikiEventKey(partitionKey: PartitionKey,
timestamp: Option[String],
timestamp: Option[Timestamp],
sortingId: Option[Long])
extends Ordered[MediawikiEventKey]
with HasPartitionKey {
override def compare(that: MediawikiEventKey): Int = {
val partitionComp = this.partitionKey.compare(that.partitionKey)
if (partitionComp == 0)
(this.timestamp, this.sortingId) compare (that.timestamp, that.sortingId)
(this.timestamp.map(_.getTime), this.sortingId) compare (that.timestamp.map(_.getTime), that.sortingId)
else partitionComp
}
}
Expand Down Expand Up @@ -117,26 +121,29 @@ object DenormalizedKeysHelper extends Serializable {
import org.wikimedia.analytics.refinery.job.mediawikihistory.user.UserState

/**
* Extracts the year part of a YYYYMMDDHHmmss formatted timestamp.
* Extracts the year part of a timestamp.
*
* @param timestamp the timestamp to extract the year from
* @param default The value to return in case of invalid or None timestamp
* @return The year as a 4 character string or the default value
* @param default The value to return in case of None timestamp
* @return The year or the default value
*/
def year(timestamp: Option[String], default: String): String = timestamp match {
def year(timestamp: Option[Timestamp], default: Integer): Int = timestamp match {
case None => default
case Some(validTimestamp) if validTimestamp.length < 4 => default
case Some(validTimestamp) => validTimestamp.substring(0, 4)
case Some(validTimestamp) => new DateTime(validTimestamp.getTime).getYear
}


/**
* 1999 to current-year year list used to generate years covered
* by an object implementing [[TimeBoundaries]].
*/
val yearList = (1999 to DateTime.now.getYear).map(_.toString)
def years[B <: TimeBoundaries](boundaries: B): Seq[String] =
yearList.filter(y => y >= year(boundaries.startTimestamp, "-1") && y <= year(boundaries.endTimestamp, "9"))
val yearList = 1999 to DateTime.now.getYear
def years[B <: TimeBoundaries](boundaries: B): Seq[Int] =
yearList.filter(y =>
// default to a year smaller than any valid one
y >= year(boundaries.startTimestamp, -1) &&
// default to a year bigger than any valid one
y <= year(boundaries.endTimestamp, 10000))


/**
Expand Down Expand Up @@ -170,16 +177,16 @@ object DenormalizedKeysHelper extends Serializable {


/**
* Generate the user-centered [[StateKey]] ((wikiDb, UserId, ""), start, end)
* (empty string as year) using a fake value in place of user id
* Generate the user-centered [[StateKey]] ((wikiDb, UserId, -1), start, end)
* (-1 as year) using a fake value in place of user id
* if invalid (see [[idOrHashNegative]]).
*
* @param userState The user state to generate key for
* @return The key for the userState
*/
def userStateKeyNoYear(userState: UserState): StateKey = {
val userId = DenormalizedKeysHelper.idOrHashNegative(Some(userState.userId), userState)
StateKey(PartitionKey(userState.wikiDb, userId, ""),
StateKey(PartitionKey(userState.wikiDb, userId, -1),
userState.startTimestamp, userState.endTimestamp)
}

Expand All @@ -194,11 +201,10 @@ object DenormalizedKeysHelper extends Serializable {
*/
def userMediawikiEventKey(mwEvent: MediawikiEvent): MediawikiEventKey = {
val userId: Long = idOrHashNegative(mwEvent.eventUserDetails.userId, mwEvent)
MediawikiEventKey(PartitionKey(mwEvent.wikiDb, userId, year(mwEvent.eventTimestamp, "-1")),
MediawikiEventKey(PartitionKey(mwEvent.wikiDb, userId, year(mwEvent.eventTimestamp, 0)),
mwEvent.eventTimestamp, mwEvent.revisionDetails.revId)
}


/**
* Generate a list of page-centered [[StateKey]] for
* a given [[PageState]], having the same values except
Expand All @@ -217,16 +223,16 @@ object DenormalizedKeysHelper extends Serializable {


/**
* Generate the page-centered [[StateKey]] ((wikiDb, pageId, ""), start, end)
* (empty string as year) using a fake value in place of page id
* Generate the page-centered [[StateKey]] ((wikiDb, pageId, -1), start, end)
* (-1 as year) using a fake value in place of page id
* if invalid (see [[idOrHashNegative]]).
*
* @param pageState The page state to generate key for
* @return The key for the pageState
*/
def pageStateKeyNoYear(pageState: PageState): StateKey = {
val pageId = DenormalizedKeysHelper.idOrHashNegative(pageState.pageId, pageState)
StateKey(PartitionKey(pageState.wikiDb, pageId, ""),
StateKey(PartitionKey(pageState.wikiDb, pageId, -1),
pageState.startTimestamp, pageState.endTimestamp)
}

Expand All @@ -236,23 +242,23 @@ object DenormalizedKeysHelper extends Serializable {
* given [[MediawikiEvent]] using a fake value in place
* of page id if invalid (see [[idOrHashNegative]]).
*
* Year value of the [[PartitionKey]] can be empty-string
* Year value of the [[PartitionKey]] can be -1
* if the useYear parameter is false (default true).
*
* @param mwEvent The MW Event to generate key for
* @param useYear Flag for using MW Event year (true) or
* empty string (false) in [[PartitionKey]]
* -1 (false) in [[PartitionKey]]
* @return The MW Event key
*/
def pageMediawikiEventKey(mwEvent: MediawikiEvent, useYear: Boolean = true): MediawikiEventKey = {
val pageId: Long = idOrHashNegative(mwEvent.pageDetails.pageId, mwEvent)
MediawikiEventKey(PartitionKey(mwEvent.wikiDb, pageId, if (useYear) year(mwEvent.eventTimestamp, "-1") else ""),
MediawikiEventKey(PartitionKey(mwEvent.wikiDb, pageId, if (useYear) year(mwEvent.eventTimestamp, 0) else -1),
mwEvent.eventTimestamp, mwEvent.revisionDetails.revId)
}

/**
* Generate a page-centered [[MediawikiEventKey]] for a
* given MW Event with empty-string as year using a fake
* given MW Event with -1 as year using a fake
* value in place of page id if invalid (see [[idOrHashNegative]]).
*
* @param mwEvent The MW Event to generate key for
Expand All @@ -265,15 +271,15 @@ object DenormalizedKeysHelper extends Serializable {

/**
* Generate a revision-centered [[MediawikiEventKey]] for a
* given MW Event with empty-string as year using a fake
* given MW Event with -1 as year using a fake
* value in place of revision id if invalid (see [[idOrHashNegative]]).
*
* @param mwEvent The MW Event to generate key for
* @return The MW Event key
*/
def revisionMediawikiEventKeyNoYear(mwEvent: MediawikiEvent): MediawikiEventKey = {
val revisionId: Long = idOrHashNegative(mwEvent.revisionDetails.revId, mwEvent)
MediawikiEventKey(PartitionKey(mwEvent.wikiDb, revisionId, ""),
MediawikiEventKey(PartitionKey(mwEvent.wikiDb, revisionId, -1),
mwEvent.eventTimestamp, mwEvent.revisionDetails.revId)
}

Expand All @@ -292,11 +298,11 @@ object DenormalizedKeysHelper extends Serializable {
val partitionKeyComp = mweKey.partitionKey.compare(sKey.partitionKey)
if (partitionKeyComp != 0) partitionKeyComp
else { // Same partition, check timestamps
val hTimestamp = mweKey.timestamp.getOrElse("-1")
val sStartTimestamp = sKey.startTimestamp.getOrElse("-1")
val hTimestamp = mweKey.timestamp.map(_.getTime).getOrElse(-1L)
val sStartTimestamp = sKey.startTimestamp.map(_.getTime).getOrElse(-1L)
if (hTimestamp < sStartTimestamp) -1
else if (hTimestamp >= sStartTimestamp &&
(hTimestamp < sKey.endTimestamp.getOrElse("9"))) 0
(hTimestamp < sKey.endTimestamp.map(_.getTime).getOrElse(Long.MaxValue))) 0
else 1
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized


/**
* This file defines the functions for revisions-enrichment.
*
Expand All @@ -18,22 +17,21 @@ package org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized
*/
object DenormalizedRevisionsBuilder extends Serializable {

import com.github.nscala_time.time.Imports._
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.wikimedia.analytics.refinery.job.mediawikihistory.page.PageState
import java.sql.Timestamp
// Implicit needed to sort by timestamps
import org.wikimedia.analytics.refinery.job.mediawikihistory.utils.TimestampFormats.orderedTimestamp

import scala.annotation.tailrec

// Mutable ordered set to manage reverts by timestamp
type MutableOrderedReverts = scala.collection.mutable.TreeSet[((Option[String], Option[Long]), Option[Long])]
type MutableOrderedReverts = scala.collection.mutable.TreeSet[((Option[Timestamp], Option[Long]), Option[Long])]

@transient
lazy val log: Logger = Logger.getLogger(this.getClass)

@transient
lazy val timestampParser = DateTimeFormat.forPattern("YYYYMMddHHmmss")

/**
* Return the first value from sortedTimestamps
* that is bigger than refTs, None if not existing.
Expand All @@ -47,9 +45,9 @@ object DenormalizedRevisionsBuilder extends Serializable {
*/
@tailrec
final def firstBigger(
refTimestamp: String,
sortedTimestamps: Vector[String]
): Option[String] = {
refTimestamp: Long,
sortedTimestamps: Vector[Long]
): Option[Long] = {
sortedTimestamps match {
case IndexedSeq() => None
case timestamp +: restTimestamps =>
Expand Down Expand Up @@ -84,8 +82,7 @@ object DenormalizedRevisionsBuilder extends Serializable {
else Seq.empty)
.groupByKey()
// We assume there will not be so many deletion events per page for this sort to fail
.mapValues(_.toVector.sorted)

.mapValues(_.toVector.map(_.getTime).sorted)

// Max archived revision timestamp by page (when pageId is defined)
// RDD[((wikiDb, PageId), MaxTimestamp)]
Expand All @@ -95,7 +92,7 @@ object DenormalizedRevisionsBuilder extends Serializable {
.reduceByKey {
case (None, ts2) => ts2
case (ts1, None) => ts1
case (ts1, ts2) => if (ts1.get >= ts2.get) ts1 else ts2
case (ts1, ts2) => if (ts1.get.getTime >= ts2.get.getTime) ts1 else ts2
}

// (Max Rev Ts, sorted deletes Ts) by page
Expand All @@ -111,7 +108,7 @@ object DenormalizedRevisionsBuilder extends Serializable {
if (deleteTs.isEmpty) r.isDeleted(maxTs) // No delete timestamp -- Use max archived revision one
else {
// In case event timestamp is empty, firstBigger will return deleteTs first value
val foundDeleteTs = firstBigger(r.eventTimestamp.getOrElse(""), deleteTs.get)
val foundDeleteTs = firstBigger(r.eventTimestamp.map(_.getTime).getOrElse(-1L), deleteTs.get).map(new Timestamp(_))
foundDeleteTs match {
case None => r.isDeleted(maxTs) // No delete timestamp -- Use max archived revision one
case _ => r.isDeleted(foundDeleteTs) // Use delete timestamp
Expand Down Expand Up @@ -185,7 +182,7 @@ object DenormalizedRevisionsBuilder extends Serializable {
*/
def prepareRevertsLists(
revisions: RDD[MediawikiEvent]
): RDD[(MediawikiEventKey, Vector[(Option[String], Option[Long])])] = {
): RDD[(MediawikiEventKey, Vector[(Option[Timestamp], Option[Long])])] = {
revisions
.filter(_.pageDetails.pageId.getOrElse(-1L) > 0L) // remove invalid pageIds
.map(r =>
Expand All @@ -207,8 +204,8 @@ object DenormalizedRevisionsBuilder extends Serializable {
else {
// Years spanned by the reverts-lists (from base to last revert)
val yearsSpanned = DenormalizedKeysHelper.years(new TimeBoundaries {
override def startTimestamp: Option[String] = baseRevision._1
override def endTimestamp: Option[String] = reverts.last._1
override def startTimestamp: Option[Timestamp] = baseRevision._1
override def endTimestamp: Option[Timestamp] = reverts.last._1
})
// Generate one revert-list event by year to be zipped with revisions-by-year
yearsSpanned.map(y => {
Expand All @@ -227,14 +224,10 @@ object DenormalizedRevisionsBuilder extends Serializable {
**/

def getTimestampDifference(
revertTimestamp: Option[String],
revisionTimestamp: Option[String]
revertTimestamp: Option[Timestamp],
revisionTimestamp: Option[Timestamp]
): Option[Long] = (revertTimestamp, revisionTimestamp) match {
case (Some(t1), Some(t2)) => {
val revertTimestampMs = timestampParser.parseMillis(t1)
val revisionTimestampMs = timestampParser.parseMillis(t2)
Option((revertTimestampMs - revisionTimestampMs) / 1000)
}
case (Some(t1), Some(t2)) => Option((t1.getTime - t2.getTime) / 1000)
case _ => None
}

Expand Down Expand Up @@ -303,7 +296,7 @@ object DenormalizedRevisionsBuilder extends Serializable {
def updateRevisionWithOptionalRevertsList(innerState: RevertsListsState)
(
keyAndRevision: (MediawikiEventKey, MediawikiEvent),
optionalKeyAndRevertsList: Option[(MediawikiEventKey, Vector[(Option[String], Option[Long])])]
optionalKeyAndRevertsList: Option[(MediawikiEventKey, Vector[(Option[Timestamp], Option[Long])])]
): MediawikiEvent = {

val (revKey, revision) = keyAndRevision
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SQLContext}

import scala.reflect.ClassTag
import org.apache.spark.sql.SQLContext

/**
* This class defines the functions for revisions-denormalization process.
Expand All @@ -21,11 +18,17 @@ import scala.reflect.ClassTag
*/
class DenormalizedRunner(sqlContext: SQLContext) extends Serializable {

import org.apache.spark.sql.SaveMode
import scala.reflect.ClassTag
import com.databricks.spark.avro._
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.wikimedia.analytics.refinery.job.mediawikihistory.page.PageState
import org.wikimedia.analytics.refinery.job.mediawikihistory.user.UserState
import org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized.MediawikiEvent
import java.sql.Timestamp
// Implicit needed to sort by timestamps
import org.wikimedia.analytics.refinery.job.mediawikihistory.utils.TimestampFormats.orderedTimestamp

@transient
lazy val log: Logger = Logger.getLogger(this.getClass)
Expand Down Expand Up @@ -67,7 +70,7 @@ class DenormalizedRunner(sqlContext: SQLContext) extends Serializable {
.flatMap {
case (key, count) =>
if (count > 1L)
Seq((new StateKey(key, None.asInstanceOf[Option[String]], None.asInstanceOf[Option[String]]), true))
Seq((new StateKey(key, None.asInstanceOf[Option[Timestamp]], None.asInstanceOf[Option[Timestamp]]), true))
else
Seq.empty[(StateKey, Boolean)]
}
Expand Down
Loading

0 comments on commit 7a887f2

Please sign in to comment.