Skip to content

Commit

Permalink
Merge pull request #856 from munen/improve-audio-worker
Browse files Browse the repository at this point in the history
improve audio worker
  • Loading branch information
branch14 committed Apr 27, 2018
2 parents 98480f7 + 4732ec2 commit b3163c3
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 75 deletions.
4 changes: 4 additions & 0 deletions app/models/instance/audio_worker.rb
Expand Up @@ -16,4 +16,8 @@ def aws_secret_key
Settings.fog.storage.aws_secret_access_key
end

def slack_channel
Settings.slack.channel
end

end
2 changes: 2 additions & 0 deletions app/models/talk.rb
Expand Up @@ -96,6 +96,8 @@ class Talk < ActiveRecord::Base
transitions from: :prelive, to: :archived
# or which failed while processing and got suspended
transitions from: :suspended, to: :archived
# or it was overridden
transitions from: :archived, to: :archived
end
event :reset do
transitions from: :processing, to: :queued
Expand Down
4 changes: 2 additions & 2 deletions app/services/mediator/dj_callback.rb
Expand Up @@ -19,9 +19,9 @@ module DjCallback
#'DestroyTalk' => 'Test Talk %s has been destroyed.',
'EndTalk' => 'Talk %s has been ended by the system.',
#'GenerateFlyer' => 'Flyer for Talk %s has been generated.',
'ProcessOverride' => 'Override for Talk %s has been processed.',
'ProcessOverride' => 'Override for Talk %s has been disptached. Stand by for processing...',
'ProcessSlides' => 'Slides for Talk %s have been processed.',
'UserOverride' => 'Upload for Talk %s has been processed.'
'UserOverride' => 'Upload for Talk %s has been dispatched. Stand by for processing...'
}
}

Expand Down
15 changes: 15 additions & 0 deletions lib/audio_worker/README.md
Expand Up @@ -48,6 +48,21 @@ Done.

Pull an AMI, add reference to AMI to `settings.yml`, deploy, test.

## Debugging

AudioWorkers inherit the authorized keys from the application
server. So you should be able to login with you regular key.

The AudioWorkers IP is announced on Slack or can be retrieved from the
AWS Web Console.

Once logged in you will find the env.list file and the run script in
/tmp.

Unfortunatley the volume isn't set up proberly so no acces to the logs
yet.


## Cheat Sheet

```
Expand Down
154 changes: 83 additions & 71 deletions lib/audio_worker/runner.rb
Expand Up @@ -5,20 +5,44 @@
require 'tmpdir'
require 'fileutils'
require 'yaml'
require 'logger'

log_path = File.join(ENV['HOME'], 'job.log')
LOGGER = Logger.new(log_path)

SLACKLEVELS = [:error, :fatal, :info]

LEVELMAP = {
debug: Logger::Severity::DEBUG,
error: Logger::Severity::ERROR,
fatal: Logger::Severity::FATAL,
info: Logger::Severity::INFO,
warn: Logger::Severity::WARN
# unknown: Logger::Severity::UNKNOWN
}

def log(arg1, arg2)
level, msg = LEVELMAP.keys.include?(arg1) ? [arg1, arg2] : [:info, arg1]
LOGGER.log(LEVELMAP[level] || 0, msg)
msg = '[%s] %s' % [level, msg]
puts msg
slack(msg) if SLACKLEVELS.include?(level)
end

# make it autoflush
STDOUT.sync = true

INSTANCE_ENDPOINT = ENV['INSTANCE_ENDPOINT']
QUEUE_ENDPOINT = ENV['QUEUE_ENDPOINT']
INSTANCE = ENV['INSTANCE']
SLACK_CHANNEL = ENV['SLACK_CHANNEL'] || '#simon'

# terminate if there is nothing to do for 6 hours
MAX_WAIT_COUNT = 60 * 6

def faraday
@faraday ||= Faraday.new(url: QUEUE_ENDPOINT) do |f|
puts "Setup Faraday with endpoint #{QUEUE_ENDPOINT}"
log :debug, "Setup Faraday with endpoint #{QUEUE_ENDPOINT}"
uri = URI.parse(QUEUE_ENDPOINT)
f.basic_auth(uri.user, uri.password)
f.request :url_encoded
Expand All @@ -32,21 +56,21 @@ def die(response)
end

def job_list
puts "Retrieving job list..."
log :debug, "Retrieving job list..."
response = faraday.get
while response.status != 200 do
puts "Response Status #{response.status}, endpoint unavailable?"
puts "Waiting for 10 seconds then retry..."
log :error, "Response Status #{response.status}, endpoint unavailable?"
log :debug, "Waiting for 10 seconds then retry..."
sleep 10
puts "Retrying..."
log :debug, "Retrying..."
response = faraday.get
end
JSON.parse(response.body)
end

def terminate
faraday.put(instance_url, instance: { event: 'terminate' })
puts "Terminate!"
log :info, "`#{INSTANCE}` terminating."
exit 0
end

Expand All @@ -55,24 +79,24 @@ def queue_url(job)
end

def claim(job)
puts "Claiming job #{job['id']}..."
log :debug, "Claiming job #{job['id']}..."
response = faraday.put(queue_url(job), job: {event: 'start', locked_by: INSTANCE})
response.status == 200
end

def fidelity(path)
puts "Running fidelity..."
puts %x[./fidelity/bin/fidelity run #{path}/manifest.yml]
log :debug, "Running fidelity..."
log :debug, %x[./fidelity/bin/fidelity run #{path}/manifest.yml]
end

def wav2json(path, file)
puts "Running wav2json..."
puts %x[./wav2json.sh #{path}/#{file}]
log :debug, "Running wav2json..."
log :debug, %x[./wav2json.sh #{path}/#{file}]
end

# find the first mp3 in path, convert it to wav and return its name
def prepare_wave(path)
puts "Preparing wave file..."
log :debug, "Preparing wave file..."
wav = nil
Dir.chdir(path) do
mp3 = Dir.glob('*.mp3').first
Expand All @@ -84,26 +108,23 @@ def prepare_wave(path)
end

def complete(job)
puts "Marking job #{job['id']} as complete."
log :debug, "Marking job #{job['id']} as complete."
faraday.put(queue_url(job), job: {event: 'complete'})

File.open(File.join(ENV['HOME'], 'job.log'), 'a') do |f|
f.puts "Marked job #{job['id']} as completed."
end
log :info, "Marked job #{job['id']} as completed."
end

def s3_cp(source, target, region=nil)
cmd = "aws s3 cp #{source} #{target}"
cmd += " --region #{region}" unless region.nil?
puts cmd
puts %x[#{cmd}]
log :debug, cmd
log :debug, %x[#{cmd}]
end

def s3_sync(source, target, region=nil)
cmd = "aws s3 sync #{source} #{target}"
cmd += " --region #{region}" unless region.nil?
puts cmd
puts %x[#{cmd}]
log :debug, cmd
log :debug, %x[#{cmd}]
end

def probe_duration(path)
Expand Down Expand Up @@ -140,11 +161,7 @@ def whatever2ogg(path)
end

def run(job)
puts "Running job #{job['id']}..."

File.open(File.join(ENV['HOME'], 'job.log'), 'a') do |f|
f.puts "Claimed job #{job['id']}."
end
log :info, "Claimed job #{job['id']} on #{public_ip_address}. Processing..."

tmp_prefix = "job_#{job['id']}_"

Expand All @@ -162,10 +179,10 @@ def run(job)

type = job['type']

puts "Working directory: #{path}"
puts "Source bucket: #{source_bucket}"
puts "Target bucket: #{target_bucket}"
puts "Job Type: #{type}"
log :debug, "Working directory: #{path}"
log :debug, "Source bucket: #{source_bucket}"
log :debug, "Target bucket: #{target_bucket}"
log :debug, "Job Type: #{type}"

# pull manifest file
manifest_url = "#{target_bucket}/manifest.yml"
Expand All @@ -187,26 +204,27 @@ def run(job)
when "Job::ProcessUpload"

url = job['details']['upload_url']
puts "Upload URL: #{url}"
log :info, "Upload URL: `#{url}`"

filename = url.split('/').last
puts "Filename: #{filename}"
filename = url.split('/').last.split('?').first
log :info, "Filename: `#{filename}`"

if url.match(/^s3:\/\//)
puts "Copy from S3..."
log :debug, "Copy from S3..."
s3_cp(url, path, source_region)
else
cmd = "cd #{path}; wget --no-check-certificate -q '#{url}'"
puts cmd
cmd = "cd #{path}; wget -O '#{filename}' --no-check-certificate -q '#{url}'"
log :debug, cmd
%x[#{cmd}]
end

upload = File.join(path, filename)
puts "Source: #{upload}"
log :info, "Source: `#{upload}`"
log :info, "Data in "+%x[file #{upload}]

wav, ogg = whatever2ogg(upload)
puts "Wav file: #{wav}"
puts "Ogg File: #{ogg}"
log :debug, "Wav file: #{wav}"
log :debug, "Ogg File: #{ogg}"

File.unlink(upload)
File.rename(ogg, "#{path}/override.ogg")
Expand All @@ -219,12 +237,12 @@ def run(job)
name = manifest[:id]

expected = "#{path}/#{name}.wav"
puts "Expected wav: #{expected}"
log :debug, "Expected wav: #{expected}"
File.rename(wav, expected)

else

slack "Unknown job type: `#{type}`, job: `#{job.inspect}`"
log :fatal, "Unknown job type: `#{type}`, job: `#{job.inspect}`"
terminate

end
Expand All @@ -251,25 +269,25 @@ def run(job)

# write index file
index_yaml = File.join(path, 'index.yml')
puts "Writing #{index_yaml}"
log :debug, "Writing #{index_yaml}"
File.open(index_yaml, 'w') do |f|
f.write(YAML.dump(index))
end

# upload all files from path to target_bucket
puts "Syncing to #{target_bucket} in region #{target_region}..."
log :info, "Syncing to #{target_bucket} in region #{target_region}..."
s3_sync(path, target_bucket+'/', target_region)

# cleanup: delete everything
puts "Cleaning up..."
log :debug, "Cleaning up..."
FileUtils.rm_rf(path)

# mark job as completed
complete(job)
end

def wait
puts 'Sleeping for 1 min. Then poll queue again...'
log :debug, 'Sleeping for 1 min. Then poll queue again...'
sleep 60
end

Expand Down Expand Up @@ -297,10 +315,10 @@ def slack(message)
url = "https://voicerepublic.slack.com/services/hooks/incoming-webhook"+
"?token=VtybT1KujQ6EKstsIEjfZ4AX"
payload = {
channel: '#voicerepublic_tech',
username: 'audio_worker',
channel: SLACK_CHANNEL,
username: 'AudioWorker',
text: message,
icon_emoji: ':zombie:'
icon_emoji: ':cloud:'
}
json = JSON.unparse(payload)
cmd = "curl -X POST --data-urlencode 'payload=#{json}' '#{url}' 2>&1"
Expand All @@ -311,7 +329,7 @@ def slack(message)
wait_count = 0

# this is just a test
slack "`#{INSTANCE}` up and running..."
log :info, "`#{INSTANCE}` up and running on #{public_ip_address}..."

# with a region given this should always work
%x[aws configure set default.s3.signature_version s3v4]
Expand All @@ -322,12 +340,12 @@ def slack(message)
while true
jobs = job_list
if jobs.empty?
puts "Job list empty."
log :debug, "Job list empty."
if job_count > 0
terminate
end
if wait_count >= MAX_WAIT_COUNT
slack "`#{INSTANCE}` terminating after 6 hours idle time."
log :fatal, "`#{INSTANCE}` terminating after 6 hours idle time. But that's ok."
terminate
end
wait
Expand All @@ -339,30 +357,24 @@ def slack(message)
job_count += 1
wait_count = 0
else
puts "Failed to claim job #{job['id']}"
log :error, "Failed to claim job #{job['id']}. Maybe it has been snatched already. Retry in 5s."
sleep 5
end
end
end
rescue => e
report_failure
case e.message
when "no inputs?"
slack "Something went wrong: `#{e.message}`"
slack "`#{INSTANCE}` terminated."
exit 0
else
slack "Something went wrong: `#{e.message}`"
# NOTE ideally this would not terminate the worker to keep it
# running for inspection
#
# slack "`#{INSTANCE}` on `#{public_ip_address}`" +
# " NOT terminating. Action required!"
# exit 1
#
# But since VR is not developed actively anymore, this will result
# in a lot of orphaned servers running on EC2, so instead we will
# return exit code 0, to make the server shut down.
exit 0
end
log :fatal, "Something went wrong: `#{e.message}`"
terminate
exit 0
# NOTE ideally this would not terminate the worker to keep it
# running for inspection
#
# slack "`#{INSTANCE}` on `#{public_ip_address}`" +
# " NOT terminating. Action required!"
# exit 1
#
# But since VR is not developed actively anymore, this will result
# in a lot of orphaned servers running on EC2, so instead we will
# return exit code 0, to make the server shut down.
end

0 comments on commit b3163c3

Please sign in to comment.