Skip to content

Commit

Permalink
Merge pull request #67 from azymnis/feature/upgrade_cascading_281
Browse files Browse the repository at this point in the history
Upgrade cascading to wip-281.
  • Loading branch information
azymnis committed Apr 25, 2012
2 parents 3034686 + f9ccf9a commit 12e9693
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 19 deletions.
8 changes: 4 additions & 4 deletions build.sbt
Expand Up @@ -10,17 +10,17 @@ scalaVersion := "2.8.1"

resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"

libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-278"
libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-281"

libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-278"
libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-281"

libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-278"
libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-281"

libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.3.1"

libraryDependencies += "com.twitter" % "meat-locker" % "0.2.1"

libraryDependencies += "com.twitter" % "maple" % "0.1.3"
libraryDependencies += "com.twitter" % "maple" % "0.1.4"

libraryDependencies += "commons-lang" % "commons-lang" % "2.4"

Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/scalding/FileSource.scala
Expand Up @@ -39,8 +39,8 @@ import cascading.tuple.{Tuple, TupleEntryIterator, Fields}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.RecordReader

import collection.mutable.{Buffer, MutableList}
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -145,7 +145,7 @@ abstract class FileSource extends Source {
*/
trait TextLineScheme extends Mappable[String] {
override def localScheme = new CLTextLine()
override def hdfsScheme = new CHTextLine().asInstanceOf[Scheme[_ <: FlowProcess[JobConf],JobConf,_,_,_,_]]
override def hdfsScheme = new CHTextLine().asInstanceOf[Scheme[FlowProcess[JobConf],JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
//In textline, 0 is the byte position, the actual text string is in column 1
override val columnNums = Seq(1)
}
Expand All @@ -163,7 +163,7 @@ trait DelimitedScheme extends Source {
//These should not be changed:
override def localScheme = new CLTextDelimited(fields, separator, types)
override def hdfsScheme = {
new CHTextDelimited(fields, separator, types).asInstanceOf[Scheme[_ <: FlowProcess[JobConf],JobConf,_,_,_,_]]
new CHTextDelimited(fields, separator, types).asInstanceOf[Scheme[FlowProcess[JobConf],JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}
}

Expand All @@ -172,7 +172,7 @@ trait SequenceFileScheme extends Source {
val fields = Fields.ALL
// TODO Cascading doesn't support local mode yet
override def hdfsScheme = {
new CHSequenceFile(fields).asInstanceOf[Scheme[_ <: FlowProcess[JobConf],JobConf,_,_,_,_]]
new CHSequenceFile(fields).asInstanceOf[Scheme[FlowProcess[JobConf],JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/twitter/scalding/IterableSource.scala
Expand Up @@ -24,6 +24,8 @@ import cascading.tuple.Tuple
import cascading.tuple.Fields

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.RecordReader

import scala.collection.mutable.Buffer
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -56,8 +58,8 @@ case class IterableSource[T](@transient iter: Iterable[T], inFields : Fields = F
new CLTextDelimited(fields, "\t", null : Array[Class[_]])
}

override def hdfsScheme : Scheme[_ <: FlowProcess[JobConf],JobConf,_,_,_,_] = {
hdfsTap.getScheme.asInstanceOf[Scheme[_ <: FlowProcess[JobConf],JobConf,_,_,_,_]]
override def hdfsScheme : Scheme[FlowProcess[JobConf],JobConf,RecordReader[_,_],OutputCollector[_,_],_,_] = {
hdfsTap.getScheme.asInstanceOf[Scheme[FlowProcess[JobConf],JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}

private lazy val hdfsTap = new MemorySourceTap(asBuffer.asJava, fields)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
Expand Up @@ -172,18 +172,18 @@ trait JoinAlgorithms {
def joinWithTiny(fs :(Fields,Fields), that : Pipe) = {
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
new Join(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin)
new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin)
}
else {
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
(new Join(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin))
(new HashJoin(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin))
.discard(temp)
}
}

def leftJoinWithTiny(fs :(Fields,Fields), that : Pipe) = {
//Rename these pipes to avoid cascading name conflicts
new Join(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin)
new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin)
}

/*
Expand Down
Expand Up @@ -27,8 +27,8 @@ import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input, Output}

import cascading.kryo.KryoSerialization;
import cascading.tuple.hadoop.BufferedInputStream
import cascading.tuple.hadoop.TupleSerialization
import cascading.tuple.hadoop.io.BufferedInputStream

import scala.annotation.tailrec

Expand Down Expand Up @@ -113,7 +113,7 @@ class SingletonSerializer[T](obj: T) extends KSerializer[T] {
}

// Lists cause stack overflows for Kryo because they are cons cells.
class ListSerializer extends KSerializer[AnyRef] {
class ListSerializer extends KSerializer[AnyRef] {
def write(kser: Kryo, out: Output, obj: AnyRef) {
val list = obj.asInstanceOf[List[AnyRef]]
//Write the size:
Expand All @@ -132,7 +132,7 @@ class ListSerializer extends KSerializer[AnyRef] {

override def create(kser: Kryo, in: Input, cls: Class[AnyRef]) : AnyRef = {
val size = in.readInt(true);

//Produce the reversed list:
if (size == 0) {
/*
Expand Down Expand Up @@ -168,7 +168,7 @@ class DateRangeSerializer() extends KSerializer[DateRange] {
out.writeLong(range.start.value.getTime, true);
out.writeLong(range.end.value.getTime, true);
}

override def create(kser: Kryo, in: Input, cls: Class[DateRange]): DateRange = {
DateRange(RichDate(in.readLong(true)), RichDate(in.readLong(true)));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/scalding/Source.scala
Expand Up @@ -75,7 +75,7 @@ abstract class Source extends java.io.Serializable {
def localScheme : LocalScheme = {
error("Cascading local mode not supported for: " + toString)
}
def hdfsScheme : Scheme[_ <: FlowProcess[JobConf],JobConf,_,_,_,_] = {
def hdfsScheme : Scheme[FlowProcess[JobConf],JobConf,RecordReader[_,_],OutputCollector[_,_],_,_] = {
error("Cascading Hadoop mode not supported for: " + toString)
}

Expand Down

0 comments on commit 12e9693

Please sign in to comment.