Skip to content

Commit

Permalink
Create DistributedCacheFile, a pleasant API for managing files to be …
Browse files Browse the repository at this point in the history
…distributed via hadoop's DistributedCache

Dealing with hadoop's filecache.DistributedCache is kind of awkward.
This gives users an easy way of setting up paths to be added to the
distributed cache, and hides the details of consistent-symlink-naming
and such. This is a prerequisite for loading our indexes in hadoop.
  • Loading branch information
slyphon committed Jun 4, 2013
1 parent b9d18b9 commit 7fd4b15
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 1 deletion.
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ object ScaldingBuild extends Build {

libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test"
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test"
),

resolvers ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.twitter.scalding.filecache

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.filecache.{DistributedCache => HDistributedCache}
import org.apache.hadoop.fs.Path

trait DistributedCache {
def createSymlink(conf: Configuration)
def addCacheFile(uri: URI, conf: Configuration)
def makeQualified(path: String, conf: Configuration): URI
def makeQualified(uri: URI, conf: Configuration): URI
def makeQualified(p: Path, conf: Configuration): URI
}

// used to supply the implicit cache argument to UncachedFile, allows us to stub this in tests
class HadoopDistributedCache extends DistributedCache {
def createSymlink(conf: Configuration) {
HDistributedCache.createSymlink(conf)
}

def addCacheFile(uri: URI, conf: Configuration) {
HDistributedCache.addCacheFile(uri, conf)
}

def makeQualified(path: String, conf: Configuration): URI =
makeQualified(new Path(path), conf)

def makeQualified(uri: URI, conf: Configuration) =
makeQualified(new Path(uri.toString), conf) // uri.toString because hadoop 0.20.2 doesn't take a URI

def makeQualified(p: Path, conf: Configuration): URI =
p.makeQualified(p.getFileSystem(conf)).toUri // make sure we have fully-qualified URI
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.twitter.scalding.filecache

trait DistributedCacheContextLike {
implicit val distributedCache: DistributedCache
}

trait DistributedCacheContext extends DistributedCacheContextLike {
@transient
implicit lazy val distributedCache: DistributedCache = new HadoopDistributedCache
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.twitter.scalding.filecache

import com.google.common.hash.Hashing
import java.io.File
import java.net.URI
import org.apache.hadoop.conf.Configuration


object DistributedCacheFile {
// TODO: make this pluggable
private val HashFunc = Hashing.md5()

/**
* Create an object that can be used to register a given URI (representing an hdfs file)
* that should be added to the DistributedCache.
*
* @param uri The fully qualified URI that points to the hdfs file to add
* @return A DistributedCacheFile that must have its add() method called with the current
* Configuration before use.
*/
def apply(uri: URI)(implicit distCache: DistributedCache): UncachedFile =
UncachedFile(Right(uri))

def apply(path: String)(implicit distCache: DistributedCache): UncachedFile =
UncachedFile(Left(path))

def symlinkNameFor(uri: URI): String = {
val hexsum = HashFunc.hashString(uri.toString).toString
val fileName = new File(uri.toString).getName

Seq(fileName, hexsum).mkString("-")
}

def symlinkedUriFor(sourceUri: URI): URI =
new URI(sourceUri.getScheme, sourceUri.getSchemeSpecificPart, symlinkNameFor(sourceUri))
}


/**
* The distributed cache is simply hadoop's method for allowing each node local access to a
* specific file. The registration of that file must be called with the Configuration of the job,
* and not when it's on a mapper or reducer. Additionally, a unique name for the node-local access
* path must be used to prevent collisions in the cluster. This class provides this functionality.
*
* In the configuration phase, the file URI is used to construct an UncachedFile instance. The name
* of the symlink to use on the mappers is only available after calling the add() method, which
* registers the file and computes the unique symlink name and returns a CachedFile instance.
* The CachedFile instance is Serializable, it's designed to be assigned to a val and accessed later.
*
* The local symlink is available thorugh .file or .path depending on what type you need.
*/
sealed abstract class DistributedCacheFile {
def isDefined: Boolean

def add(conf: Configuration): CachedFile
}

// the reason we use an implicit here is that we don't want to concern our users with
// the DistributedCache class, which is a hack for wrapping the actual Hadoop DistributedCache
// object to allow for stubbing during tests.
//
final case class UncachedFile private[scalding] (source: Either[String, URI])(implicit cache: DistributedCache)
extends DistributedCacheFile {

import DistributedCacheFile._

def isDefined = false

def add(conf: Configuration): CachedFile = {
cache.createSymlink(conf)

val sourceUri =
source match {
case Left(strPath) => cache.makeQualified(strPath, conf)
case Right(uri) => cache.makeQualified(uri, conf)
}

cache.addCacheFile(symlinkedUriFor(sourceUri), conf)
CachedFile(sourceUri)
}
}

final case class CachedFile private[scalding] (sourceUri: URI) extends DistributedCacheFile {

import DistributedCacheFile._

def path: String =
Seq("./", symlinkNameFor(sourceUri)).mkString("")

def file: File =
new File(path)

def isDefined = true
def add(conf: Configuration) = this
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.twitter.scalding.filecache

import com.google.common.hash.Hashing
import java.io.File
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.specs.mock.Mockito
import org.specs.Specification

class DistributedCacheFileSpec extends Specification with Mockito {
implicit val distCache = smartMock[DistributedCache]
val conf = smartMock[Configuration]
val uriString = "hdfs://foo.example:1234/path/to/the/stuff/thefilename.blah"
val md5Hex = Hashing.md5().hashString(uriString).toString
val hashedFilename = "thefilename.blah-" + md5Hex
val uri = new URI(uriString)


distCache.makeQualified(uri, conf) returns uri
distCache.makeQualified(uriString, conf) returns uri

"DistributedCacheFile" should {
"symlinkNameFor must return a hashed name" in {
DistributedCacheFile.symlinkNameFor(uri) must_== hashedFilename
}
}

"UncachedFile" should {
"not be defined" in {
DistributedCacheFile(uri).isDefined must beTrue
}
}

"UncachedFile.add" should {
"register the uri with the cache and return the appropriate CachedFile" in {
val expectedUri = new URI("%s#%s".format(uriString, hashedFilename))

val dcf = new UncachedFile(Right(uri))
val cf = dcf.add(conf)

there was one(distCache).createSymlink(conf)
there was one(distCache).addCacheFile(expectedUri, conf)

val cachedPath = "./" + hashedFilename
cf.path must_== cachedPath
cf.file must_== (new File(cachedPath))
}
}
}

3 comments on commit 7fd4b15

@wli12
Copy link

@wli12 wli12 commented on 7fd4b15 Jun 18, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dear slyphon, I read about Using the distributed cache and Maxmind's geoip LookupService, but I am still very confused. It seems that the LookupService is highly domain specific and would not fit into other projects. Could you show us lines of sample data(GeoLiteCity.dat) which can be used for distributed cache and how can I share any table(like a map[String, Int, Int]) over cluster via distributed cache?

@slyphon
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the LookupService is only an example of what essentially is a custom index that is implemented as a binary file (a database, more or less).

The DistributedCacheFile is a thin wrapper around functionality provided in hadoop that allows you to register files that should be present on the local disk on each of your mappers (and I believe reducers as well). You tell hadoop "I need file X on the mappers at location ./x" and hadoop takes care of copying it and providing you access at the name you specify. That's it. The DistributedCacheFile takes care of registering a unique name for you and hiding the details so you can avoid collisions and focus on solving the problem at hand.

What you put in that file is up to you. If you want a general purpose index, you would either need to implement your own, or somewhat more sensibly, you could use one of a number of key-value stores available for java.

So the workflow looks like:

  • Generate a custom distributable (embedded) index
  • copy that index into HDFS in a known location
  • Register that location using DistributedCacheFile as part of your job
  • Access the path at map/reduce time and load the index

@gerashegalov
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worked on similar functionality for HashJoin in Cascading gerashegalov/cascading@dd880d3 . And there we encountered changing a file name extension may make the path unusable for some InputFormats. Therefore I suggest to change the renaming scheme here as well such that the file name extension is preserved, to be safe.

Please sign in to comment.