Permalink
Browse files

Fix FileSystem.get issue

  • Loading branch information...
1 parent 4385fe5 commit dddff48716d600e5b7bd7a72beef7f6c606c2811 @oscar-stripe oscar-stripe committed Jan 30, 2016
View
@@ -26,8 +26,7 @@ object SummingbirdBuild extends Build {
val chillVersion = "0.7.0"
val slf4jVersion = "1.6.6"
- val dfsDatastoresVersion = "1.3.6"
- val scaldingVersion = "0.15.1-RC13"
+ val scaldingVersion = "0.15.1-SNAPSHOT"
val storehausVersion = "0.12.0"
val utilVersion = "6.26.0"
@@ -49,7 +48,7 @@ object SummingbirdBuild extends Build {
val sharedSettings = extraSettings ++ Seq(
organization := "com.twitter",
- scalaVersion := "2.10.5",
+ scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7"),
// To support hadoop 1.x
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
@@ -282,8 +281,6 @@ object SummingbirdBuild extends Build {
lazy val summingbirdScalding = module("scalding").settings(
libraryDependencies ++= Seq(
- "com.backtype" % "dfs-datastores" % dfsDatastoresVersion,
- "com.backtype" % "dfs-datastores-cascading" % dfsDatastoresVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "algebird-util" % algebirdVersion,
"com.twitter" %% "algebird-bijection" % algebirdVersion,
@@ -319,11 +316,11 @@ object SummingbirdBuild extends Build {
lazy val summingbirdBatchHadoop = module("batch-hadoop").settings(
libraryDependencies ++= Seq(
- "com.backtype" % "dfs-datastores" % dfsDatastoresVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "bijection-json" % bijectionVersion,
"com.twitter" %% "scalding-date" % scaldingVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test",
+ "com.twitter" %% "scalding-commons" % scaldingVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided"
)
).dependsOn(
@@ -17,6 +17,7 @@ package com.twitter.summingbird.batch.state
import com.twitter.algebird.{ ExclusiveUpper, InclusiveLower, Intersection }
import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp }
+import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.slf4j.LoggerFactory
@@ -63,7 +64,7 @@ class HDFSCheckpointStore(val config: HDFSState.Config)(implicit val batcher: Ba
@transient private val logger = LoggerFactory.getLogger(classOf[HDFSState])
protected lazy val versionedStore =
- new FileVersionTracking(config.rootPath, FileSystem.get(config.conf))
+ new FileVersionTracking(config.rootPath, FileSystem.get(new URI(config.rootPath), config.conf))
private def version(b: BatchID) =
batcher.earliestTimeOf(b).milliSinceEpoch
@@ -16,9 +16,10 @@ limitations under the License.
package com.twitter.summingbird.batch.store
-import com.backtype.hadoop.datastores.{ VersionedStore => BacktypeVersionedStore }
+import com.twitter.scalding.commons.datastores.{ VersionedStore => BacktypeVersionedStore }
import com.twitter.bijection.json.{ JsonInjection, JsonNodeInjection }
import java.io.{ DataOutputStream, DataInputStream }
+import java.net.URI
import org.apache.hadoop.io.WritableUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
@@ -79,7 +80,7 @@ private[summingbird] object HDFSMetadata {
*/
class HDFSMetadata(conf: Configuration, rootPath: String) {
protected val versionedStore: BacktypeVersionedStore = {
- val fs = FileSystem.get(conf)
+ val fs = FileSystem.get(new URI(rootPath), conf)
new BacktypeVersionedStore(fs, rootPath)
}
@@ -138,10 +139,10 @@ class HDFSMetadata(conf: Configuration, rootPath: String) {
* Refers to a specific version on disk. Allows reading and writing metadata to specific locations
*/
private[summingbird] class HDFSVersionMetadata private[store] (val version: Long, conf: Configuration, val path: Path) {
+ private def getFS = path.getFileSystem(conf)
private def getString: Try[String] =
Try {
- val fs = FileSystem.get(conf)
- val is = new DataInputStream(fs.open(path))
+ val is = new DataInputStream(getFS.open(path))
val str = WritableUtils.readString(is)
is.close
str
@@ -153,8 +154,7 @@ private[summingbird] class HDFSVersionMetadata private[store] (val version: Long
getString.flatMap { JsonInjection.fromString[T](_) }
private def putString(str: String) {
- val fs = FileSystem.get(conf)
- val os = new DataOutputStream(fs.create(path))
+ val os = new DataOutputStream(getFS.create(path))
WritableUtils.writeString(os, str)
os.close
}
@@ -87,48 +87,8 @@ abstract class VersionedBatchStoreBase[K, V](val rootPath: String) extends Batch
protected def lastBatch(exclusiveUB: BatchID, mode: HdfsMode): Option[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = {
val meta = HDFSMetadata(mode.conf, rootPath)
- /*
- * The deprecated Summingbird builder API coordinated versioning
- * through a _summingbird.json dropped into each version of this
- * store's VersionedStore.
- *
- * The new API (as of 0.1.0) coordinates this state via the actual
- * version numbers within the VersionedStore. This function
- * resolves the BatchID out of a version by first checking the
- * metadata inside of the version; if the metadata exists, it
- * takes preference over the version number (which was garbage,
- * just wall clock time, in the deprecated API). If the metadata
- * does NOT exist we know that the version is meaningful and
- * convert it to a batchID.
- *
- * TODO (https://github.com/twitter/summingbird/issues/95): remove
- * this when all internal Twitter jobs have run for a while with
- * the new version format.
- */
- def versionToBatchIDCompat(ver: Long): BatchID = {
- /**
- * Old style writes the UPPER BOUND batchID, so all times
- * are in a batch LESS than the value in the file.
- */
- meta(ver)
- .get[String]
- .flatMap { str => ScalaTry(BatchID(str).prev) }
- .map { oldbatch =>
- val newBatch = versionToBatchID(ver)
- if (newBatch > oldbatch) {
- println("## WARNING ##")
- println("in BatchStore(%s)".format(rootPath))
- println("Old-style version number is ahead of what the new-style would be.")
- println("Until batchID: %s (%s) you will see this warning"
- .format(newBatch, batcher.earliestTimeOf(newBatch)))
- println("##---------##")
- }
- oldbatch
- }
- .getOrElse(versionToBatchID(ver))
- }
meta
- .versions.map { ver => (versionToBatchIDCompat(ver), readVersion(ver)) }
+ .versions.map { ver => (versionToBatchID(ver), readVersion(ver)) }
.filter { _._1 < exclusiveUB }
.reduceOption { (a, b) => if (a._1 > b._1) a else b }
}
@@ -168,46 +128,13 @@ class VersionedBatchStore[K, V, K2, V2](rootPath: String, versionsToKeep: Int, o
* EXCLUSIVE upper bound on batchID, or "batchID.next".
*/
override def writeLast(batchID: BatchID, lastVals: TypedPipe[(K, V)])(implicit flowDef: FlowDef, mode: Mode): Unit = {
- val batchVersion = batchIDToVersion(batchID)
- /**
- * The Builder API used to not specify a sinkVersion, leading to
- * versions tagged with the wall clock time. When builder API
- * users migrate over to the new code, they can run into a
- * situation where the new version created has a lower version
- * than the current maximum version in the directory.
- *
- * This behavior clashes with the current VersionedState
- * implementation, which decides what data to source by querying
- * meta.mostRecentVersion. If mostRecentVersion doesn't change
- * from run to run, the job will process the same data over and
- * over.
- *
- * To solve this issue and assist with migrations, if the
- * existing max version in the directory has a timestamp that's
- * greater than that of the batchID being committed, we add a
- * single millisecond to the current version, guaranteeing that
- * we're writing a new max version (but only bumping a tiny bit
- * forward).
- *
- * After a couple of job runs the batchID version should start
- * winning.
- */
- val newVersion: Option[Long] = mode match {
- case m: HdfsMode => {
- val meta = HDFSMetadata(m.conf, rootPath)
- meta.mostRecentVersion.map(_.version)
- .filter(_ > batchVersion)
- .map(_ + 1L)
- .orElse(Some(batchVersion))
- }
- case _ => Some(batchVersion)
- }
-
+ val newVersion = batchIDToVersion(batchID)
val target = VersionedKeyValSource[K2, V2](rootPath,
sourceVersion = None,
- sinkVersion = newVersion,
+ sinkVersion = Some(newVersion),
maxFailures = 0,
versionsToKeep = versionsToKeep)
+
if (!target.sinkExists(mode)) {
logger.info(s"Versioned batched store version for $this @ $newVersion doesn't exist. Will write out.")
lastVals.map(pack(batchID, _))

0 comments on commit dddff48

Please sign in to comment.