Skip to content

Commit

Permalink
Conflicts:
Browse files Browse the repository at this point in the history
	.travis.yml
	project/Build.scala
  • Loading branch information
ianoc committed Jun 29, 2015
1 parent f283c25 commit e4103d1
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 348 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -33,3 +33,4 @@ tutorial/data/jsonoutput0.tsv
tutorial/data/avrooutput0.avro
.scalding_repl
scalding-hadoop-test/NOTICE
NOTICE
8 changes: 4 additions & 4 deletions .travis.yml
Expand Up @@ -58,18 +58,18 @@ matrix:
env: BUILD="test tutorials and matrix tutorials and repl" TEST_TARGET="scalding-repl"
script:
- "scripts/run_test.sh"
- "scripts/build_assembly_no_test.sh scalding"
- "scripts/build_assembly_no_test.sh scalding-assembly"
- "scripts/test_tutorials.sh"
- "scripts/build_assembly_no_test.sh scalding"
- "scripts/build_assembly_no_test.sh scalding-assembly"
- "scripts/test_matrix_tutorials.sh"


- scala: 2.11.5
env: BUILD="test tutorials and matrix tutorials"
script:
- "scripts/build_assembly_no_test.sh scalding"
- "scripts/build_assembly_no_test.sh scalding-assembly"
- "scripts/test_tutorials.sh"
- "scripts/build_assembly_no_test.sh scalding"
- "scripts/build_assembly_no_test.sh scalding-assembly"
- "scripts/test_matrix_tutorials.sh"

- scala: 2.10.5
Expand Down
63 changes: 47 additions & 16 deletions project/Build.scala
Expand Up @@ -28,7 +28,7 @@ object ScaldingBuild extends Build {
val dfsDatastoresVersion = "1.3.4"
val elephantbirdVersion = "4.8"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "1.2.1"
val hadoopVersion = "2.5.0"
val hbaseVersion = "0.94.10"
val hravenVersion = "0.9.13"
val jacksonVersion = "2.4.2"
Expand Down Expand Up @@ -220,6 +220,32 @@ object ScaldingBuild extends Build {
scaldingSerializationMacros
)

lazy val scaldingAssembly = Project(
id = "scalding-assembly",
base = file("assembly"),
settings = sharedSettings
).settings(
test := {},
publish := {}, // skip publishing for this root project.
publishLocal := {}
).aggregate(
scaldingArgs,
scaldingDate,
scaldingCore,
scaldingCommons,
scaldingAvro,
scaldingParquet,
scaldingParquetScrooge,
scaldingHRaven,
scaldingRepl,
scaldingJson,
scaldingJdbc,
scaldingMacros,
maple,
scaldingSerialization,
scaldingSerializationMacros
)

lazy val formattingPreferences = {
import scalariform.formatter.preferences._
FormattingPreferences().
Expand Down Expand Up @@ -279,7 +305,7 @@ object ScaldingBuild extends Build {
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "algebird-test" % algebirdVersion % "test",
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "provided"
)
Expand Down Expand Up @@ -311,7 +337,7 @@ object ScaldingBuild extends Build {
"cascading.avro" % "avro-scheme" % cascadingAvroVersion,
"org.apache.avro" % "avro" % avroVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided"
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided"
)
).dependsOn(scaldingCore)

Expand All @@ -324,13 +350,13 @@ object ScaldingBuild extends Build {
exclude("com.twitter.elephantbird", "elephant-bird-core"),
"org.apache.thrift" % "libthrift" % "0.7.0",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"org.scala-lang" % "scala-reflect" % scalaVersion,
"com.twitter" %% "bijection-macros" % bijectionVersion,
"com.twitter" %% "chill-bijection" % chillVersion
) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % quasiquotesVersion) else Seq())
}, addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full))
.dependsOn(scaldingCore, scaldingHadoopTest)
.dependsOn(scaldingCore, scaldingHadoopTest % "test")

def scaldingParquetScroogeDeps(version: String) = {
if (isScala210x(version))
Expand All @@ -342,7 +368,7 @@ object ScaldingBuild extends Build {
exclude("com.twitter.elephantbird", "elephant-bird-core"),
"com.twitter" %% "parquet-scrooge" % parquetVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided"
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided"
)
else
Seq()
Expand All @@ -360,7 +386,7 @@ object ScaldingBuild extends Build {
"com.twitter.hraven" % "hraven-core" % hravenVersion,
"org.apache.hbase" % "hbase" % hbaseVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided"
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided"
)
).dependsOn(scaldingCore)

Expand All @@ -385,8 +411,8 @@ object ScaldingBuild extends Build {
"jline" % "jline" % scalaVersion.take(4),
"org.scala-lang" % "scala-compiler" % scalaVersion,
"org.scala-lang" % "scala-reflect" % scalaVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "unprovided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "unprovided",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "provided",
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "unprovided"
Expand Down Expand Up @@ -417,7 +443,7 @@ object ScaldingBuild extends Build {

lazy val scaldingJson = module("json").settings(
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"org.json4s" %% "json4s-native" % json4SVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion % "provided"
Expand All @@ -427,7 +453,7 @@ object ScaldingBuild extends Build {

lazy val scaldingJdbc = module("jdbc").settings(
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"cascading" % "cascading-jdbc-core" % cascadingJDBCVersion,
"cascading" % "cascading-jdbc-mysql" % cascadingJDBCVersion
)
Expand All @@ -436,8 +462,13 @@ object ScaldingBuild extends Build {

lazy val scaldingHadoopTest = module("hadoop-test").settings(
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
("org.apache.hadoop" % "hadoop-core" % hadoopVersion),
("org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion),
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion,
"org.apache.hadoop" % "hadoop-yarn-server-tests" % hadoopVersion classifier "tests",
"org.apache.hadoop" % "hadoop-yarn-server" % hadoopVersion,
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion classifier "tests",
"org.apache.hadoop" % "hadoop-common" % hadoopVersion classifier "tests",
"org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests",
"com.twitter" %% "chill-algebird" % chillVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
Expand All @@ -455,7 +486,7 @@ object ScaldingBuild extends Build {
) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % quasiquotesVersion) else Seq())
},
addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full)
).dependsOn(scaldingCore, scaldingHadoopTest)
).dependsOn(scaldingCore, scaldingHadoopTest % "test")

// This one uses a different naming convention
lazy val maple = Project(
Expand All @@ -468,7 +499,7 @@ object ScaldingBuild extends Build {
crossPaths := false,
autoScalaLibrary := false,
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"org.apache.hbase" % "hbase" % hbaseVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingVersion
)
Expand All @@ -484,7 +515,7 @@ object ScaldingBuild extends Build {
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
"org.scala-lang" % "scala-library" % scalaVersion,
"org.scala-lang" % "scala-reflect" % scalaVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion,
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"cascading" % "cascading-hadoop" % cascadingVersion
Expand Down
15 changes: 10 additions & 5 deletions scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala
Expand Up @@ -35,7 +35,7 @@ import cascading.tuple.Fields
import com.etsy.cascading.tap.local.LocalTap

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileStatus, PathFilter, Path }
import org.apache.hadoop.fs.{ FileStatus, FileSystem, PathFilter, Path }
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.RecordReader
Expand Down Expand Up @@ -114,10 +114,15 @@ object FileSource {

def glob(glob: String, conf: Configuration, filter: PathFilter = AcceptAllPathFilter): Iterable[FileStatus] = {
val path = new Path(glob)
Option(path.getFileSystem(conf).globStatus(path, filter)).map {
_.toIterable // convert java Array to scala Iterable
} getOrElse {
Iterable.empty
val fs = FileSystem.newInstance(path.toUri, conf)
try {
Option(fs.globStatus(path, filter)).map {
_.toIterable // convert java Array to scala Iterable
} getOrElse {
Iterable.empty
}
} finally {
fs.close
}
}

Expand Down
Expand Up @@ -6,7 +6,7 @@ import java.io.File
import java.net.URI
import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.filecache.{ DistributedCache => HDistributedCache }
import org.apache.hadoop.mapreduce.filecache.{ DistributedCache => HDistributedCache }
import org.apache.hadoop.fs.Path

object URIHasher {
Expand Down
Expand Up @@ -21,7 +21,7 @@ import java.io.{ File, RandomAccessFile }
import java.nio.channels.FileLock

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.filecache.DistributedCache
import org.apache.hadoop.mapreduce.filecache.DistributedCache
import org.apache.hadoop.fs.{ FileUtil, Path }
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hadoop.mapred.{ JobConf, MiniMRCluster }
Expand All @@ -36,6 +36,11 @@ object LocalCluster {
}

class LocalCluster(mutex: Boolean = true) {
org.apache.log4j.Logger.getLogger("org.apache.hadoop").setLevel(org.apache.log4j.Level.ERROR)
org.apache.log4j.Logger.getLogger("org.mortbay").setLevel(org.apache.log4j.Level.ERROR)
org.apache.log4j.Logger.getLogger("BlockStateChange").setLevel(org.apache.log4j.Level.ERROR)
org.apache.log4j.Logger.getLogger("SecurityLogger").setLevel(org.apache.log4j.Level.ERROR)

private val LOG = LoggerFactory.getLogger(getClass)

private var hadoop: Option[(MiniDFSCluster, MiniMRCluster, JobConf)] = None
Expand Down Expand Up @@ -91,9 +96,15 @@ class LocalCluster(mutex: Boolean = true) {
mrJobConf.set("mapred.reduce.max.attempts", "2")
mrJobConf.set("mapred.child.java.opts", "-Xmx512m")
mrJobConf.setInt("mapred.job.reuse.jvm.num.tasks", -1)
mrJobConf.setInt("jobclient.completion.poll.interval", 50)
mrJobConf.setInt("jobclient.progress.monitor.poll.interval", 50)
mrJobConf.setInt("ipc.ping.interval", 5000)
mrJobConf.setInt("mapreduce.client.completion.pollinterval", 20)
mrJobConf.setInt("mapreduce.client.progressmonitor.pollinterval", 20)
mrJobConf.setInt("ipc.ping.interval", 500)
mrJobConf.setInt("dfs.client.socket-timeout", 50)
mrJobConf.set("mapreduce.job.ubertask.enable", "true")
mrJobConf.setInt("mapreduce.job.ubertask.maxmaps", 500)
mrJobConf.setInt("mapreduce.job.ubertask.maxreduces", 500)
mrJobConf.setInt("ipc.client.connection.maxidletime", 50)

mrJobConf.setMapSpeculativeExecution(false)
mrJobConf.setReduceSpeculativeExecution(false)
mrJobConf.set("mapreduce.user.classpath.first", "true")
Expand All @@ -114,6 +125,8 @@ class LocalCluster(mutex: Boolean = true) {
classOf[Option[_]],
classOf[LoggerFactory],
classOf[Log4jLoggerAdapter],
classOf[org.apache.hadoop.net.StaticMapping],
classOf[org.apache.hadoop.yarn.server.MiniYARNCluster],
classOf[com.twitter.scalding.Args],
classOf[org.apache.log4j.LogManager],
classOf[com.twitter.scalding.RichDate],
Expand Down

This file was deleted.

0 comments on commit e4103d1

Please sign in to comment.