Permalink
Browse files

Updates meat-locker and maple

  • Loading branch information...
1 parent a49099b commit f05fc24a1f8de4bdf82732f9f5c71700f216515a @johnynek johnynek committed Apr 12, 2012
View
@@ -18,7 +18,9 @@ libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-238"
libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.3.0"
-libraryDependencies += "com.twitter" % "meat-locker" % "0.1.6"
+libraryDependencies += "com.twitter" % "meat-locker" % "0.2.0"
+
+libraryDependencies += "com.twitter" % "maple" % "0.1.1"
libraryDependencies += "commons-lang" % "commons-lang" % "2.4"
@@ -15,7 +15,7 @@ limitations under the License.
*/
package com.twitter.scalding
-import com.twitter.meatlocker.tap.MemorySourceTap
+import com.twitter.maple.tap.MemorySourceTap
import cascading.flow.FlowProcess
import cascading.scheme.local.{TextDelimited => CLTextDelimited}
@@ -105,8 +105,11 @@ class Job(val args : Args) extends TupleConversions with FieldConversions {
flow.complete
flow.getFlowStats.isSuccessful
}
- // Add any serializations you need to deal with here:
- def ioSerializations = List[String]()
+ // Add any serializations you need to deal with here (after these)
+ def ioSerializations = List[String](
+ "org.apache.hadoop.io.serializer.WritableSerialization",
+ "cascading.tuple.hadoop.TupleSerialization"
+ )
// Override this if you want to customize comparisons/hashing for your job
def defaultComparator : Option[String] = {
Some("com.twitter.scalding.IntegralComparator")
@@ -113,6 +113,7 @@ trait HadoopMode extends Mode {
new HadoopFlowConnector(unionValues(jobConf, props))
}
+ // TODO unlike newFlowConnector, this does not look at the Job.config
override def openForRead(tap : Tap[_,_,_,_]) = {
val htap = tap.asInstanceOf[Tap[HadoopFlowProcess,_,_,_]]
val fp = new HadoopFlowProcess(new JobConf(jobConf))
@@ -15,7 +15,7 @@ limitations under the License.
*/
package com.twitter.scalding
-import com.twitter.meatlocker.tap.MemorySourceTap
+import com.twitter.maple.tap.MemorySourceTap
import java.io.File
import java.util.TimeZone
@@ -137,7 +137,7 @@ abstract class Source extends java.io.Serializable {
case Read => {
val buffer = buffers(this)
val fields = hdfsScheme.getSourceFields
- new MemorySourceTap(buffer.toList.asJava, fields)
+ (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[RawTap]
}
case Write => {
val path = hdfsTest.getWritePathFor(this)

0 comments on commit f05fc24

Please sign in to comment.