Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TemplateSource trait #484

Closed
johnynek opened this issue Jun 28, 2013 · 14 comments
Closed

Add TemplateSource trait #484

johnynek opened this issue Jun 28, 2013 · 14 comments
Milestone

Comments

@johnynek
Copy link
Collaborator

See: http://docs.cascading.org/cascading/2.1/javadoc/cascading/tap/hadoop/TemplateTap.html

I guess this can be a mixin stackable trait in scala that overrides createTap for the source it is mixed in to, see:
http://www.artima.com/scalazine/articles/stackable_trait_pattern.html

I'd ideally like a function that goes from T => String that produces the path relative to the parent for each item T.

I think it is essential to be able to remove the fields used for templating from the tuple. This might require subclassing TemplateTap to change.

@sritchie
Copy link
Collaborator

First pass:

trait Templated extends Source {
  def template: String
  def templateFields: Fields

  override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
    mode match {
      case hdfsMode@Hdfs(_, _) => readOrWrite match {
        case Read => super.createTap(Read)(mode)
        case Write => castHfsTap {
          val parentTap = super.createTap(Write)(mode)
          new TemplateTap(parentTap, template, templateFields)
        }
      }
      case _ => super.createTap(readOrWrite)(mode)
    }
  }
}

You won't be able to strip out the template fields by doing anything with TemplateTap -- the way Cascading usually does this is in the scheme, with the sinkFields argument. See "textline":

http://docs.cascading.org/cascading/2.0/javadoc/cascading/scheme/local/TextLine.html

@sritchie
Copy link
Collaborator

The other piece we need is GlobHfs integration instead of Hfs to actually read data produced with the TemplateTap:

http://docs.cascading.org/cascading/2.0/javadoc/cascading/tap/hadoop/GlobHfs.html

@granthenke
Copy link
Contributor

I am starting to work on this and wanted to give an update. I am fairly new to Scalding/Scala so please correct any novice mistakes. Below is a rough start with some comments (not much is changed from Sam's first pass):

trait Templated extends FileSource {
  def template: String
  def templateFields: Fields

  override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
    mode match {
      case hdfsMode @ Hdfs(_, _) => readOrWrite match {
        case Read => createHdfsReadTap(hdfsMode)
        case Write => {
          val hfs = new Hfs(hdfsScheme, hdfsWritePath, SinkMode.REPLACE)
          new TemplateTap(hfs, template, templateFields)
        }
      }
      case _ => super.createTap(readOrWrite)(mode)
    }
  }
}

_Thoughts:_

  • Extending source with stackable trait pattern may be tough. Template tap is expecting Hfs when super returns Tap.
  • As a temporary solution data output could be read with current implementations utilizing MultiSourceTap. (I write out to /yyyy/MM/dd/ folders and read in with TimePathedSource)
  • May want to let SinkMode.REPLACE be overridden since it may be dangerous with template output. (An easy change but I have not seen this elsewhere in Scalding so I wanted to bring it up)
  • I am not sure how to handle the "T => String" ideal implementation since TemplateTap uses Tuple.format on the templateFields.

I have done some minor testing on various sources. I also ran a large map only job with many unique output paths and it ran well. Below is an example source utilizing the trait:

case class TemplateCSV(p : String,
               override val template: String,
               override val templateFields: Fields,
               override val separator : String = ",",
               override val fields : Fields = Fields.ALL,
               override val skipHeader : Boolean = false,
               override val writeHeader : Boolean = false,
               override val quote : String ="\"") extends FixedPathSource(p) 
                                                  with DelimitedScheme with Templated

_Use:_

TemplateCSV(outputPath, "%s", 'templateField, fields = 'outputField)

_Thoughts:_

  • This supports removing the fields used for templating.
TemplateCSV(outputPath, "%s", 'templateField, fields = 'outputField))   

Below is an example source with scalding.avro where the first field is the output object and any remaining field can be used for templating:

case class TemplatePackedAvroSource[AvroType : Manifest: AvroSchemaType : TupleConverter](paths: Seq[String], template: String, templateFields: Fields)
  extends FixedPathSource(paths: _*) with PackedAvroFileScheme[AvroType] with Templated {
  val schemaType = implicitly[AvroSchemaType[AvroType]]
  override val schema = schemaType.schema
  override val converter = implicitly[TupleConverter[AvroType]]
}

This has a lot of clean up, improvements and testing to do yet. However, I wanted to post it in case anyone had an immediate need and to comment on some thoughts in case I cant get to it for a bit.

@sritchie
Copy link
Collaborator

sritchie commented Aug 8, 2013

Nice! I can look at this in more detail later, but one comment on the reading; in Cascalog, I've used GlobHfs on the read side. The problem is that, unlike TemplateTap (which wraps an Hfs instance) GlobHfs needs to be subbed in instead of Hfs, taking the scheme directly. I found that the glob pattern is a nice complement to the format string for writing.

@johnynek
Copy link
Collaborator Author

johnynek commented Aug 8, 2013

Looks pretty good. I look forward to the pull req with tests.

I definitely want to add a TemplateSink which extends the TypedSink with the T => Seq[Any] and then take that Seq[Any] into a Tuple which we then use for the format string, but omit in writing.

@granthenke
Copy link
Contributor

http://docs.cascading.org/cascading/2.1/userguide/html/ch07s07.html

Cascading docs recommend the following:

...whether binning happens during the Map phase or the Reduce phase. By doing a GroupBy on the values used to ?populate the template, binning will happen during the Reduce phase, and will likely scale much better in cases where there are a very large number of unique values used in the template resulting in a large number of directories."

Is this something the tap should do by default?

@granthenke
Copy link
Contributor

If my understanding of "pass" is correct the above grouping by the template fields could be done automatically in the tap by adding the following method:

override def transformForWrite(pipe : Pipe): Pipe = {
   val parentPipe = super.transformForWrite(pipe)
   parentPipe.groupBy(templateFields){_.pass}
}

@sritchie
Copy link
Collaborator

You only want to do that if the job doesn't already have a grouping phase.

@granthenke
Copy link
Contributor

I have a working implementation of this coded out though improvements can and will definitely be made. I can send a pull request if you like with a sample use.

However, when I approach Unit testing it (without writing out files), I am not sure how to approach it. A standard job test will not validate the actual output paths. Does anyone have any recommended approaches?

@sritchie
Copy link
Collaborator

I did this in Cascalog by sinking with the TemplateTap, then reading back the files using GlobHfs and verifying that the tuples matched. I bet even without GlobHfs you could just read from all of the subdirectories, union together and check that they match.

@rubanm
Copy link
Contributor

rubanm commented Nov 7, 2013

I took a stab at adding read support using GlobHfs. Here's what worked for me:
Will work on tests and pull req once I have my job up and running with this.

class MyGlobHfs(scheme: cascading.scheme.Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_], pattern: String)
    extends GlobHfs(scheme, pattern) {
  def getTap = new ScaldingMultiSourceTap(super.getTaps.toList)
  // need this subclass because getTaps is protected
}

trait Templated extends FileSource {
  def template: String
  def templateFields: Fields

  override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
    mode match {
      case hdfsMode@Hdfs(_, _) => readOrWrite match {
        case Read =>
          new MyGlobHfs(hdfsScheme, hdfsPaths.head + "/" + template.format("*") + "/*") 
            // works with %s only, need better regex
            .getTap
        case Write =>
          val hfs = new Hfs(hdfsScheme, hdfsWritePath, SinkMode.REPLACE)
          new TemplateTap(hfs, template, templateFields)
      }
      case _ => super.createTap(readOrWrite)(mode)
    }
  }
}

@rubanm
Copy link
Contributor

rubanm commented Nov 7, 2013

This currently reads all paths under the parent. Adding support for a filter by field value might be useful.

@smarden1
Copy link
Contributor

smarden1 commented Nov 7, 2013

I recently added a version of template tap for our code base. One thing that I found out the first time I used it is that the number of directories isn't always directly related to your template.

For example, our template was "%s" and our template field occasionally had values with slashes in them. The tap choked on reading these files. To overcome that, I added an additional trait value (globLevels) that could be set to overcome cases where we know this is an issue.

Additionally, there is a version of TemplateTap that works locally, which was necessary for us to include as we do quite a bit of local development and it's important to be able to run the same jobs locally and on the cluster.

@jcoveney
Copy link
Contributor

Looks like this is long in? Reopen a new issue if there are desired enhancements

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants