Skip to content

Commit

Permalink
Updates to cascading wip-234
Browse files Browse the repository at this point in the history
  • Loading branch information
johnynek committed Feb 24, 2012
1 parent 6bb2b69 commit cf9166c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
8 changes: 4 additions & 4 deletions build.sbt
Expand Up @@ -10,15 +10,15 @@ scalaVersion := "2.8.1"

resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"

libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-227"
libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-234"

libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-227"
libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-234"

libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-227"
libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-234"

libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.1"

libraryDependencies += "com.twitter" % "meat-locker" % "0.1.5"
libraryDependencies += "com.twitter" % "meat-locker" % "0.1.6"

libraryDependencies += "commons-lang" % "commons-lang" % "2.4"

Expand Down
9 changes: 4 additions & 5 deletions src/main/scala/com/twitter/scalding/Source.scala
Expand Up @@ -151,7 +151,7 @@ abstract class Source extends java.io.Serializable {
}

protected def createHadoopTestReadTap(buffer : Iterable[Tuple]) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
new MemorySourceTap(buffer.toList.asJava, hdfsScheme.getSourceFields())
}

Expand Down Expand Up @@ -205,7 +205,7 @@ abstract class Source extends java.io.Serializable {
}

protected def createHdfsReadTap(hdfsMode : Hdfs) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
val goodPaths = if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
Expand All @@ -223,12 +223,11 @@ abstract class Source extends java.io.Serializable {
new Hfs(hdfsScheme, hdfsPaths.head, SinkMode.KEEP)
}
case 1 => taps.head
case _ => new MultiSourceTap[HadoopFlowProcess,
JobConf, RecordReader[_,_], OutputCollector[_,_]](taps.toSeq : _*)
case _ => new MultiSourceTap[Hfs, HadoopFlowProcess, JobConf, RecordReader[_,_]]( taps.toSeq : _*)
}
}
protected def createHdfsWriteTap(hdfsMode : Hdfs) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
Tap[HadoopFlowProcess, JobConf, _, OutputCollector[_,_]] = {
new Hfs(hdfsScheme, hdfsWritePath, SinkMode.REPLACE)
}

Expand Down

0 comments on commit cf9166c

Please sign in to comment.