-
Notifications
You must be signed in to change notification settings - Fork 706
/
scald.rb
executable file
·626 lines (540 loc) · 20 KB
/
scald.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
#!/usr/bin/env ruby
$LOAD_PATH << File.join(File.expand_path(File.dirname(File.symlink?(__FILE__) ? File.readlink(__FILE__) : __FILE__)), 'lib')
require 'fileutils'
require 'open-uri'
require 'thread'
require 'trollop'
require 'yaml'
require 'tmpdir'
USAGE = <<END
Usage : scald.rb [options] job <job args>
If job ends in ".scala" or ".java" and the file exists, then link
it against JARFILE (default: versioned scalding-core jar) and run
it (default: on HOST). Otherwise, it is assumed to be a full
classname to an item in the JARFILE, which is run.
END
##############################################################
# Default configuration:
#Get the absolute path of the original (non-symlink) file.
CONFIG_DEFAULT = begin
original_file = File.symlink?(__FILE__) ? File.readlink(__FILE__) : __FILE__
repo_root = File.expand_path(File.dirname(original_file)+"/../")
{ "host" => "my.host.here", #where the job is rsynced to and run
"repo_root" => repo_root, #full path to the repo you use, Twitter specific
"cp" => ENV['CLASSPATH'] || "",
"localmem" => "3g", #how much memory for java to use when running in local mode
"namespaces" => { "abj" => "com.twitter.ads.batch.job", "s" => "com.twitter.scalding" },
"hadoop_opts" => { "mapred.reduce.tasks" => 20, #be conservative by default
"mapred.min.split.size" => "2000000000" }, #2 billion bytes!!!
"depends" => [ "org.apache.hadoop/hadoop-core/1.1.2",
"commons-codec/commons-codec/1.8",
"commons-configuration/commons-configuration/1.9",
"org.codehaus.jackson/jackson-asl/0.9.5",
"org.codehaus.jackson/jackson-mapper-asl/1.9.13",
"commons-lang/commons-lang/2.6",
"org.slf4j/slf4j-log4j12/1.6.6",
"log4j/log4j/1.2.15",
"commons-httpclient/commons-httpclient/3.1",
"commons-cli/commons-cli/1.2",
"commons-logging/commons-logging/1.1.1",
"org.apache.zookeeper/zookeeper/3.3.4" ],
"default_mode" => "--hdfs"
}
end
##############################################################
CONFIG_RC = begin
#put configuration in .scaldrc in HOME to override the defaults below:
YAML.load_file(ENV['HOME'] + "/.scaldrc") || {} #seems that on ruby 1.9, this returns false on failure
rescue
{}
end
CONFIG = CONFIG_DEFAULT.merge!(CONFIG_RC)
BUILDFILE = open(CONFIG["repo_root"] + "/project/Build.scala").read
VERSIONFILE = open(CONFIG["repo_root"] + "/version.sbt").read
SCALDING_VERSION=VERSIONFILE.match(/version.*:=\s*\"([^\"]+)\"/)[1]
#optionally set variables (not linux often doesn't have this set, and falls back to TMP. Set up a
#YAML file in .scaldrc with "tmpdir: my_tmp_directory_name" or export TMPDIR="/my/tmp" to set on
#linux
TMPDIR=CONFIG["tmpdir"] || ENV['TMPDIR'] || "/tmp"
TMPMAVENDIR = File.join(TMPDIR, "maven")
BUILDDIR=CONFIG["builddir"] || File.join(TMPDIR,"script-build")
LOCALMEM=CONFIG["localmem"] || "3g"
DEPENDENCIES=CONFIG["depends"] || []
RSYNC_STATFILE_PREFIX = TMPDIR + "/scald.touch."
#Recall that usage is of the form scald.rb [--jar jarfile] [--hdfs|--hdfs-local|--local|--print] [--print_cp] [--scalaversion version] job <job args>
#This parser holds the {job <job args>} part of the command.
OPTS_PARSER = Trollop::Parser.new do
opt :clean, "Clean all rsync and maven state before running"
opt :cp, "Scala classpath", :type => String
opt :hdfs, "Run on HDFS"
opt :hdfs_local, "Run in Hadoop local mode"
opt :local, "Run in Cascading local mode (does not use Hadoop)"
opt :print, "Print the command YOU SHOULD enter on the remote node. Useful for screen sessions"
opt :scalaversion, "version of Scala for scalac (defaults to scalaVersion in project/Build.scala)", :type => String
opt :print_cp, "Print the Scala classpath"
opt :jar, "Specify the jar file", :type => String
opt :host, "Specify the hadoop host where the job runs", :type => String
opt :reducers, "Specify the number of reducers", :type => :int
opt :avro, "Add scalding-avro to classpath"
opt :commons, "Add scalding-commons to classpath"
opt :jdbc, "Add scalding-jdbc to classpath"
opt :json, "Add scalding-json to classpath"
opt :parquet, "Add scalding-parquet to classpath"
opt :repl, "Add scalding-repl to classpath"
opt :tool, "The scalding main class, defaults to com.twitter.scalding.Tool", :type => String
stop_on_unknown #Stop parsing for options parameters once we reach the job file.
end
#OPTS holds the option parameters that come before {job}, i.e., the
#[--jar jarfile] [--hdfs|--hdfs-local|--local|--print] part of the command.
OPTS = Trollop::with_standard_exception_handling OPTS_PARSER do
OPTS_PARSER.parse ARGV
end
#Make sure one of the execution modes is set.
unless [OPTS[:hdfs], OPTS[:hdfs_local], OPTS[:local], OPTS[:print]].any?
#Modes in CONFIG file are in the form "--hdfs" or "--local", but the OPTS hash assumes
#them to be in the form :hdfs or :local.
#TODO: Add error checking on CONFIG["default_mode"]?
mode = CONFIG["default_mode"] ? CONFIG["default_mode"].gsub("--", "").to_sym : :hdfs
OPTS[mode] = true
end
def maven_filename(jar_filename)
File.join(TMPMAVENDIR, jar_filename)
end
if OPTS[:clean]
rsync_files = Dir.glob(RSYNC_STATFILE_PREFIX + "*")
$stderr.puts("Cleaning rsync stat files: #{rsync_files.join(', ')}")
rsync_files.each { |f| File.delete(f) }
maven_files = Dir.glob(maven_filename("*"))
$stderr.puts("Cleaning maven jars: #{maven_files.join(', ')}")
maven_files.each { |f| File.delete(f) }
Dir.rmdir(TMPMAVENDIR) if File.directory?(TMPMAVENDIR)
#HACK -- exit immediately because other parts of this script assume more
#arguments are passed in.
exit(0)
end
if ARGV.size < 1 && OPTS[:repl].nil?
$stderr.puts USAGE
OPTS_PARSER::educate
exit(0)
end
SCALA_VERSION= OPTS[:scalaversion] || BUILDFILE.match(/scalaVersion\s*:=\s*\"([^\"]+)\"/)[1]
SHORT_SCALA_VERSION = if SCALA_VERSION.start_with?("2.10")
"2.10"
elsif SCALA_VERSION.start_with?("2.11")
"2.11"
else
SCALA_VERSION
end
SBT_HOME="#{ENV['HOME']}/.sbt"
SCALA_LIB_DIR = Dir.tmpdir + "/scald.rb/scala_home/#{SCALA_VERSION}"
def scala_libs(version)
["scala-library", "scala-reflect", "scala-compiler"]
end
def find_dependencies(org, dep, version)
res = %x[./sbt 'set libraryDependencies := Seq("#{org}" % "#{dep}" % "#{version}")' 'printDependencyClasspath'].split("\n")
mapVer = {}
res.map { |l|
l,m,r = l.partition(" => ")
if (m == " => ")
removedSome = l.sub(/Some\(/, '').sub(/\)$/,'')
removeExtraBraces = removedSome.sub(/ .*/, '') # In 2.10.4 for resolution for some reason there is a " ()" at the end
mapVer[removeExtraBraces] = r
else
[]
end
}
mapVer
end
def find_dependency(org, reqDep, version)
retDeps = find_dependencies(org, reqDep, version)
dep = retDeps["#{org}:#{reqDep}:#{version}"]
raise "Dependency #{org}:#{reqDep}:#{version} not found\n#{retDeps}" unless dep
dep
end
def get_dep_location(org, dep, version)
f = "#{SCALA_LIB_DIR}/#{dep}-#{version}.jar"
ivyPath = "#{ENV['HOME']}/.ivy2/cache/#{org}/#{dep}/jars/#{dep}-#{version}.jar"
if File.exists?(f)
f
elsif File.exists?(ivyPath)
puts "Found #{dep} in ivy path"
f = ivyPath
else
puts "#{dep} was not where it was expected, #{SCALA_LIB_DIR}...finding..."
f = find_dependency(org, dep, version)
raise "Unable to find jar library: #{dep}" unless f and File.exists?(f)
puts "Found #{dep} in #{File.dirname(f)}"
f
end
end
libs = scala_libs(SCALA_VERSION).map { |l| get_dep_location("org.scala-lang", l, SCALA_VERSION) }
lib_dirs = libs.map { |f| File.dirname(f) }
FileUtils.mkdir_p(SCALA_LIB_DIR)
libs.map! do |l|
if File.dirname(l) != SCALA_LIB_DIR
FileUtils.cp(l, SCALA_LIB_DIR)
end
"#{SCALA_LIB_DIR}/#{File.basename(l)}"
end
LIBCP= libs.join(":")
COMPILE_CMD="java -cp #{LIBCP} -Dscala.home=#{SCALA_LIB_DIR} scala.tools.nsc.Main"
HOST = OPTS[:host] || CONFIG["host"]
CLASSPATH =
if OPTS[:cp]
CONFIG["cp"] + ":" + OPTS[:cp]
else
CONFIG["cp"]
end
MODULEJARPATHS=[]
if OPTS[:avro]
MODULEJARPATHS.push(repo_root + "/scalding-avro/target/scala-#{SHORT_SCALA_VERSION}/scalding-avro-assembly-#{SCALDING_VERSION}.jar")
end
if OPTS[:commons]
MODULEJARPATHS.push(repo_root + "/scalding-commons/target/scala-#{SHORT_SCALA_VERSION}/scalding-commons-assembly-#{SCALDING_VERSION}.jar")
end
if OPTS[:jdbc]
MODULEJARPATHS.push(repo_root + "/scalding-jdbc/target/scala-#{SHORT_SCALA_VERSION}/scalding-jdbc-assembly-#{SCALDING_VERSION}.jar")
end
if OPTS[:json]
MODULEJARPATHS.push(repo_root + "/scalding-json/target/scala-#{SHORT_SCALA_VERSION}/scalding-json-assembly-#{SCALDING_VERSION}.jar")
end
if OPTS[:parquet]
MODULEJARPATHS.push(repo_root + "/scalding-parquet/target/scala-#{SHORT_SCALA_VERSION}/scalding-parquet-assembly-#{SCALDING_VERSION}.jar")
end
if OPTS[:repl]
MODULEJARPATHS.push(repo_root + "/scalding-repl/target/scala-#{SHORT_SCALA_VERSION}/scalding-repl-assembly-#{SCALDING_VERSION}.jar")
# Here we don't need the overall assembly to work with the repl
# the repl target itself should suffice (depends on scalding-core)
if CONFIG["jar"].nil?
repl_assembly_path = repo_root + "/scalding-repl/target/scala-#{SHORT_SCALA_VERSION}/scalding-repl-assembly-#{SCALDING_VERSION}.jar"
if (!File.exist?(repl_assembly_path))
puts("When trying to run the repl, the #{repl_assembly_path} is missing, you probably need to run ./sbt scalding-repl/assembly")
exit(1)
end
CONFIG["jar"] = repl_assembly_path
end
if OPTS[:tool].nil?
OPTS[:tool] = "com.twitter.scalding.ScaldingShell"
end
end
if (!CONFIG["jar"])
#what jar has all the dependencies for this job
CONFIG["jar"] = repo_root + "/scalding-core/target/scala-#{SHORT_SCALA_VERSION}/scalding-core-assembly-#{SCALDING_VERSION}.jar"
end
#Check that we can find the jar:
if (!File.exist?(CONFIG["jar"]))
puts("#{CONFIG["jar"]} is missing, you probably need to run ./sbt assembly")
exit(1)
end
JARFILE =
if OPTS[:jar]
jarname = OPTS[:jar]
#highly Twitter specific here:
CONFIG["repo_root"] + "/dist/#{jarname}-deploy.jar"
else
CONFIG["jar"]
end
JOBFILE= OPTS_PARSER.leftovers.first
JOB_ARGS= JOBFILE.nil? ? "" : OPTS_PARSER.leftovers[1..-1].join(" ")
JOB_ARGS << " --repl " if OPTS[:repl]
TOOL = OPTS[:tool] || 'com.twitter.scalding.Tool'
#Check that we have all the dependencies, and download any we don't.
def maven_get(dependencies = DEPENDENCIES)
#First, make sure the TMPMAVENDIR exists and create it if not.
FileUtils.mkdir_p(TMPMAVENDIR)
#Now make sure all the dependencies exist.
dependencies.each do |dependency|
jar_filename = dependency_to_jar(dependency)
#Check if we already have the jar. Get it if not.
if !File.exists?(maven_filename(jar_filename))
url = dependency_to_url(dependency)
uri = URI(dependency_to_url(dependency))
$stderr.puts("downloading #{jar_filename} from #{uri}...")
File.open(maven_filename(jar_filename), "wb") do |f|
begin
f.print open(url, 'User-Agent' => 'ruby').read
$stderr.puts "Successfully downloaded #{jar_filename}!"
rescue SocketError => e
$stderr.puts "SocketError in downloading #{jar_filename}: #{e}"
rescue SystemCallError => e
$stderr.puts "SystemCallError in downloading #{jar_filename}: #{e}"
end
end
end
end
end
#Converts an array of dependencies into an array of jar filenames.
#Example:
#Input dependencies: ["org.apache.hadoop/hadoop-core/0.20.0", "com.twitter/scalding/0.2.0"]
#Output jar filenames: ["/tmp/mvn/hadoop-core-0.20.0.jar","/tmp/mvn/scalding-0.2.0.jar"]
def convert_dependencies_to_jars(dependencies = DEPENDENCIES, as_classpath = false)
ret = dependencies.map{ |d| maven_filename(dependency_to_jar(d)) }
ret = ret.join(":") if as_classpath
ret
end
#Convert a maven dependency to a url for downloading.
#Example:
#Input dependency: org.apache.hadoop/hadoop-core/0.20.2
#Output url: http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-core/0.20.2/hadoop-core-0.20.2.jar
def dependency_to_url(dependency)
#Each dependency is in the form group/artifact/version.
group, artifact, version = dependency.split("/")
jar_filename = dependency_to_jar(dependency)
group_with_slash = group.split(".").join("/")
"http://repo1.maven.org/maven2/#{group_with_slash}/#{artifact}/#{version}/#{jar_filename}"
end
#Convert a maven dependency to the name of its jar file.
#Example:
#Input dependency: org.apache.hadoop/hadoop-core/0.20.2
#Output: hadoop-core-0.20.2.jar
def dependency_to_jar(dependency)
group, artifact, version = dependency.split("/")
"#{artifact}-#{version}.jar"
end
def hadoop_opts
opts = CONFIG["hadoop_opts"] || {}
opts["mapred.reduce.tasks"] = OPTS[:reducers] if OPTS[:reducers]
if !opts.has_key?("mapred.reduce.tasks")
Trollop::die "number of reducers not set"
end
opts.collect { |k,v| "-D#{k}=#{v}" }.join(" ")
end
def file_type
JOBFILE =~ /\.(scala|java)$/
$1
end
def is_file?
!file_type.nil?
end
EXTENSION_RE = /(.*)\.(scala|java)$/
#Get the name of the job from the file.
#the rule is: last class in the file, or the one that matches the filename
def get_job_name(file)
package = ""
job = nil
default = nil
if file =~ EXTENSION_RE
default = $1
File.readlines(file).each { |s|
if s =~ /^package ([^;]+)/
package = $1.chop + "."
elsif s =~ /class\s+([^\s(]+).*extends\s+.*Job/
unless job and default and (job.downcase == default.downcase)
#use either the last class, or the one with the same name as the file
job = $1
end
end
}
raise "Could not find job name" unless job
"#{package}#{job}"
elsif file =~ /(.*):(.*)/
begin
CONFIG["namespaces"][$1] + "." + $2
rescue
$stderr.puts "Unknown namespace: #{$1}"
exit(1)
end
else
file
end
end
JARPATH=File.expand_path(JARFILE)
JARBASE=File.basename(JARFILE)
JOBPATH=JOBFILE.nil? ? nil : File.expand_path(JOBFILE)
JOB=JOBFILE.nil? ? nil : get_job_name(JOBFILE)
JOBJAR=JOB.nil? ? nil : JOB+".jar"
JOBJARPATH=JOBJAR.nil? ? nil : TMPDIR+"/"+JOBJAR
class ThreadList
def initialize
@threads = []
@failures = []
@mtx = Mutex.new
@open = true
end
def thread(*targs, &block)
@mtx.synchronize {
if @open
@threads << Thread.new(*targs, &block)
else
raise "ThreadList is closed"
end
}
end
def failure(f)
@mtx.synchronize {
@failures << f
}
end
#if the size > 0, execute a block then join, else no not yield.
#returns thread count
def waitall
block = false
size = @mtx.synchronize {
if @open
@open = false
block = true
end
@threads.size
}
if block
yield size
@threads.each { |t| t.join }
#at this point, all threads are finished.
if @failures.each { |f| $stderr.puts f }.size > 0
raise "There were failures"
end
end
size
end
end
THREADS = ThreadList.new
# Returns file size formatted with a human-readable suffix.
def human_filesize(filename)
size = File.size(filename)
case
when size < 2**10 then '%d bytes' % size
when size < 2**20 then '%.1fK' % (1.0 * size / 2**10)
when size < 2**30 then '%.1fM' % (1.0 * size / 2**20)
else '%.1fG' % (1.0 * size / 2**30)
end
end
#this is used to record the last time we rsynced
def rsync_stat_file(filename)
RSYNC_STATFILE_PREFIX+filename.gsub(/\//,'.')+"."+HOST
end
#In another thread, rsync the file. If it succeeds, touch the rsync_stat_file
def rsync(from, to)
rtouch = rsync_stat_file(from)
if !File.exists?(rtouch) || File.stat(rtouch).mtime < File.stat(from).mtime
$stderr.puts("rsyncing #{human_filesize(from)} from #{to} to #{HOST} in background...")
THREADS.thread(from, to) { |ff,tt|
if system("rsync -e ssh -z #{ff} #{HOST}:#{tt}")
#this indicates success and notes the time
FileUtils.touch(rtouch)
else
#indicate failure
THREADS.failure("Could not rsync: #{ff} to #{HOST}:#{tt}")
FileUtils.rm_f(rtouch)
end
}
end
end
def is_local?
OPTS[:local] || OPTS[:hdfs_local]
end
def needs_rebuild?
if !File.exists?(JOBJARPATH)
true
else
#the jar exists, but is it fresh enough:
mtime = File.stat(JOBJARPATH).mtime
[ # if the jar is older than the path of the job file:
(mtime < File.stat(JOBPATH).mtime),
# if the jobjar is older than the main jar:
mtime < File.stat(JARPATH).mtime ].any?
end
end
def build_job_jar
$stderr.puts("compiling " + JOBFILE)
FileUtils.mkdir_p(BUILDDIR)
classpath = (([LIBCP, JARPATH, MODULEJARPATHS, CLASSPATH].select { |s| s != "" }) + convert_dependencies_to_jars).flatten.join(":")
puts("#{file_type}c -classpath #{classpath} -d #{BUILDDIR} #{JOBFILE}")
unless system("#{COMPILE_CMD} -classpath #{classpath} -d #{BUILDDIR} #{JOBFILE}")
puts "[SUGGESTION]: Try scald.rb --clean, you may have corrupt jars lying around"
FileUtils.rm_f(rsync_stat_file(JOBJARPATH))
FileUtils.rm_rf(BUILDDIR)
exit(1)
end
FileUtils.rm_f(JOBJARPATH)
system("jar cf #{JOBJARPATH} -C #{BUILDDIR} .")
FileUtils.rm_rf(BUILDDIR)
end
def hadoop_classpath
(["/usr/share/java/hadoop-lzo-0.4.15.jar", JARBASE, MODULEJARPATHS.map{|n| File.basename(n)}, "job-jars/#{JOBJAR}"].select { |s| s != "" }).flatten.join(":")
end
def hadoop_command
hadoop_libjars = ([MODULEJARPATHS.map{|n| File.basename(n)}, "job-jars/#{JOBJAR}"].select { |s| s != "" }).flatten.join(",")
"HADOOP_CLASSPATH=#{hadoop_classpath} " +
"hadoop jar #{JARBASE} -libjars #{hadoop_libjars} #{hadoop_opts} #{JOB} --hdfs " +
JOB_ARGS
end
def jar_mode_command
"HADOOP_CLASSPATH=#{JARBASE} hadoop jar #{JARBASE} #{hadoop_opts} #{JOB} --hdfs " + JOB_ARGS
end
#Always sync the remote JARFILE
rsync(JARPATH, JARBASE) if !is_local?
#Sync any required scalding modules
if OPTS[:hdfs] && MODULEJARPATHS != []
MODULEJARPATHS.each do|n|
rsync(n, File.basename(n))
end
$stderr.puts("[INFO]: Modules support with --hdfs is experimental.")
end
#make sure we have the dependencies to compile and run locally (these are not in the above jar)
#this does nothing if we already have the deps.
maven_get
if is_file?
build_job_jar if needs_rebuild?
if !is_local?
#Make sure the job-jars/ directory exists before rsyncing to it
system("ssh #{HOST} '[ ! -d job-jars/ ] && mkdir job-jars/'")
#rsync only acts if the file is out of date
rsync(JOBJARPATH, "job-jars/" + JOBJAR)
end
end
def local_cmd(mode)
localHadoopDepPaths = if OPTS[:hdfs_local]
hadoop_version = BUILDFILE.match(/val hadoopVersion\s*=\s*\"([^\"]+)\"/)[1]
find_dependencies("org.apache.hadoop", "hadoop-core", hadoop_version).values
else
[]
end
classpath = ([JARPATH, MODULEJARPATHS].select { |s| s != "" } + convert_dependencies_to_jars + localHadoopDepPaths).flatten.join(":") + (is_file? ? ":#{JOBJARPATH}" : "") +
":" + CLASSPATH
"java -Xmx#{LOCALMEM} -cp #{classpath} #{TOOL} #{JOB} #{mode} #{JOB_ARGS}"
end
SHELL_COMMAND =
if OPTS[:print_cp]
classpath = ([JARPATH, MODULEJARPATHS].select { |s| s != "" } + convert_dependencies_to_jars).flatten.join(":") + (is_file? ? ":#{JOBJARPATH}" : "") +
":" + CLASSPATH
"echo #{classpath}"
elsif OPTS[:hdfs]
if is_file?
"ssh -t -C #{HOST} #{hadoop_command}"
else
"ssh -t -C #{HOST} #{jar_mode_command}"
end
elsif OPTS[:hdfs_local]
local_cmd("--hdfs")
elsif OPTS[:local]
local_cmd("--local")
elsif OPTS[:print]
if is_file?
"echo #{hadoop_command}"
else
"echo #{jar_mode_command}"
end
else
Trollop::die "no mode set"
end
def getStty()
`stty -g 2> /dev/null`.strip
end
def restoreStty(stty)
if(stty.length > 10)
`stty #{stty}`
end
end
savedStty=""
#Now block on all the threads:
begin
THREADS.waitall { |c| puts "Waiting for #{c} background thread#{c > 1 ? 's' : ''}..." if c > 0 }
savedStty = getStty
#If there are no errors:
exitCode = system(SHELL_COMMAND)
restoreStty(savedStty)
exit(exitCode)
rescue
restoreStty(savedStty)
exit(1)
end