Skip to content

Commit

Permalink
refactor API runner
Browse files Browse the repository at this point in the history
  • Loading branch information
bmesuere committed Jun 9, 2015
1 parent a8643ee commit 60288ce
Showing 1 changed file with 82 additions and 116 deletions.
198 changes: 82 additions & 116 deletions lib/commands/unipept/api_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,163 +7,141 @@ def initialize(args, opts, cmd)
@configuration = Unipept::Configuration.new
set_configuration

@user_agent = 'Unipept CLI - unipept ' + Unipept::VERSION

@url = "#{@host}/api/v1/#{cmd.name}.json"
@message_url = "#{@host}/api/v1/messages.json"
end

# Sets the configurable options of the command line app:
# - the host
# - the user agent
def set_configuration
@host = get_host
@user_agent = 'Unipept CLI - unipept ' + Unipept::VERSION
end

# Returns the host. If a value is defined by both an option and the config
# file, the value of the option is used.
def get_host
# find host in opts first
if options[:host]
host = options[:host]
else
host = @configuration['host']
end
host = options[:host] ? options[:host] : @configuration['host']

# No host has been set?
if host.nil? || host.empty?
puts 'WARNING: no host has been set, you can set the host with `unipept config host http://localhost:3000/`'
exit 1
end
unless host.start_with? 'http://'
host = "http://#{host}"
abort 'WARNING: no host has been set, you can set the host with `unipept config host http://api.unipept.ugent.be:3000/`'
end

@host = host
end

def input_iterator
# Argument over file input over stdin
if !arguments.empty?
arguments.each
# add http:// if needed
if host.start_with?('http://') || host.start_with?('https://')
host
else
if options[:input]
IO.foreach(options[:input])
else
STDIN.each_line
end
"http://#{host}"
end
end

# Returns an input iterator to use for the request.
# - if arguments are given, uses arguments
# - if the input file option is given, uses file input
# - if none of the previous are given, uses stdin
def get_input_iterator
return arguments.each unless arguments.empty?
return IO.foreach(options[:input]) if options[:input]
STDIN.each_line
end

# Returns the default batch_size of a command.
def batch_size
100
end

def url_options(sub_part)
filter = options[:select] ? options[:select] : []
if filter.empty?
names = true
else
names = filter.any? { |f| /.*name.*/.match f }
end
{ input: sub_part,
# Constructs a request body (a Hash) for set of input strings, using the
# options supplied by the user.
def get_request_body(input, selected_fields)
names = selected_fields.empty? || selected_fields.any? { |f| /.*name.*/.match f }
{ input: input,
equate_il: options[:equate],
extra: options[:all],
names: names
names: options[:all] && names
}
end

def get_server_message
# Checks if the server has a message and prints it if not empty.
# We will only check this once a day and won't print anything if the quiet
# option is set or if we output to a file.
def print_server_message
return if options[:quiet]
return unless STDOUT.tty?
last_fetched = @configuration['last_fetch_date']
return unless last_fetched.nil? || (last_fetched + 60 * 60 * 24) < Time.now
version = Unipept::VERSION
resp = Typhoeus.get(@message_url, params: { version: version })
puts resp.body unless resp.body.chomp.empty?
return if recently_fetched?
@configuration['last_fetch_date'] = Time.now
@configuration.save
resp = Typhoeus.get(@message_url, params: { version: Unipept::VERSION }).body.chomp
puts resp unless resp.empty?
end

def run
get_server_message
# Returns true if the last check for a server message was less than a day
# ago.
def recently_fetched?
last_fetched = @configuration['last_fetch_date']
!last_fetched.nil? && (last_fetched + 60 * 60 * 24) > Time.now
end

def run
print_server_message
hydra = Typhoeus::Hydra.new(max_concurrency: 10)
formatter = Unipept::Formatter.new_for_format(options[:format])
peptides = input_iterator

filter_list = options[:select] ? options[:select] : []
# Parse filter list: convert to regex and split on commas
filter_list = filter_list.map { |f| f.include?(',') ? f.split(',') : f }.flatten.map { |f| glob_to_regex(f) }

batch_order = Unipept::BatchOrder.new

printed_header = false
result = []

hydra = Typhoeus::Hydra.new(max_concurrency: 10)
num_req = 0
input = get_input_iterator
selected_fields = options[:select] ? options[:select] : []
selected_fields = selected_fields.map { |f| f.include?(',') ? f.split(',') : f }.flatten.map { |f| glob_to_regex(f) }

peptide_iterator(peptides) do |sub_division, i, fasta_input|
line_iterator(input) do |input_slice, batch_id, fasta_input|
request = Typhoeus::Request.new(
@url,
method: :post,
body: url_options(sub_division),
body: get_request_body(input_slice, selected_fields),
accept_encoding: 'gzip',
headers: { 'User-Agent' => @user_agent }
)
request.on_complete do |resp|
if resp.success?
# if JSON parsing goes wrong
sub_result = JSON[resp.response_body] rescue []
sub_result = [sub_result] unless sub_result.is_a? Array

sub_result.map! { |r| r.select! { |k, _v| filter_list.any? { |f| f.match k } } } unless filter_list.empty?

if options[:xml]
result << sub_result
end
result = JSON[resp.response_body] rescue []
result = [result] unless result.is_a? Array
result.map! { |r| r.select! { |k, _v| selected_fields.any? { |f| f.match k } } } unless selected_fields.empty?

# wait till it's our turn to write
batch_order.wait(i) do
unless sub_result.empty?
unless printed_header
write_to_output formatter.header(sub_result, fasta_input)
printed_header = true
end
write_to_output formatter.format(sub_result, fasta_input)
batch_order.wait(batch_id) do
unless result.empty?
write_to_output formatter.header(result, fasta_input) if batch_id == 0
write_to_output formatter.format(result, fasta_input)
end
end

elsif resp.timed_out?

batch_order.wait(i) do
batch_order.wait(batch_id) do
$stderr.puts 'request timed out, continuing anyway, but results might be incomplete'
save_error('request timed out, continuing anyway, but results might be incomplete')
end

elsif resp.code == 0

batch_order.wait(i) do
batch_order.wait(batch_id) do
$stderr.puts 'could not get an http response, continuing anyway, but results might be incomplete'
save_error(resp.return_message)
end

else

batch_order.wait(i) do
batch_order.wait(batch_id) do
$stderr.puts "received a non-successful http response #{resp.code}, continuing anyway, but results might be incomplete"
save_error("Got #{resp.code}: #{resp.response_body}\nRequest headers: #{resp.request.options}\nRequest body:\n#{resp.request.encoded_body}\n\n")
end

end
end

hydra.queue request

num_req += 1
if num_req % 200 == 0
if batch_id % 200 == 0
hydra.run
end
end

hydra.run

begin
download_xml(result)
rescue
STDERR.puts 'Something went wrong while downloading xml information! please check the output'
end
end

def save_error(message)
Expand All @@ -185,46 +163,34 @@ def write_to_output(string)
end
end

def download_xml(result)
return unless options[:xml]
File.open(options[:xml] + '.xml', 'wb') do |f|
f.write Typhoeus.get("http://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=taxonomy&id=#{result.first.map { |h| h['taxon_id'] }.join(',')}&retmode=xml").response_body
end
end

def peptide_iterator(peptides, &block)
first = peptides.next rescue return
if first.start_with? '>'
# FASTA MODE ENGAGED
fasta_header = first.chomp
peptides.each_slice(batch_size).with_index do |sub, i|
fasta_input = []
# Use a set so we don't ask data twice
newsub = Set.new

# Iterate to find fasta headers
sub.each do |s|
s.chomp!
if s.start_with? '>'
# Save the FASTA header when found
fasta_header = s
def line_iterator(lines, &block)
first_line = lines.next rescue return
if first_line.start_with? '>'
current_fasta_header = first_line.chomp
lines.each_slice(batch_size).with_index do |slice, i|
fasta_mapper = []
input_set = Set.new

slice.each do |line|
line.chomp!
if line.start_with? '>'
current_fasta_header = line
else
# Add the input pair to our input list
fasta_input << [fasta_header, s]
newsub << s
fasta_mapper << [current_fasta_header, line]
input_set << line
end
end

block.call(newsub.to_a, i, fasta_input)
block.call(input_set.to_a, i, fasta_mapper)
end
else
# shame we have to be this explicit, but it appears to be the only way
Enumerator.new do |y|
y << first
loop do
y << peptides.next
y << lines.next
end
end.each_slice(batch_size).with_index(&block)

end
end

Expand Down

1 comment on commit 60288ce

@bmesuere
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the first batch fails, the header won't be written out this way.

Original comment by @silox on Tue Jun 09 2015 at 18:07.

Please sign in to comment.