Skip to content

Commit

Permalink
Merge pull request #195 from inverted-ai/cpp-retry
Browse files Browse the repository at this point in the history
Cpp retry
  • Loading branch information
Ruishenl committed Mar 21, 2024
2 parents 812467c + cc2e4db commit 43ded39
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 38 deletions.
1 change: 1 addition & 0 deletions invertedai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import csv
import math
import logging
import time

from typing import Dict, Optional, List, Tuple
from tqdm.contrib import tmap
Expand Down
92 changes: 55 additions & 37 deletions invertedai_cpp/invertedai/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ void Session::shutdown() {
const std::string Session::request(
const std::string &mode,
const std::string &body_str,
const std::string &url_query_string) {
const std::string &url_query_string,
double max_retries,
const std::vector<int>& status_force_list,
int base_backoff,
int backoff_factor,
int max_backoff
) {
std::string target = subdomain + mode + url_query_string;

http::request<http::string_body> req{
Expand All @@ -114,45 +120,57 @@ const std::string Session::request(
req.body() = body_str;
req.prepare_payload();

if (local_mode){
http::write(this->tcp_stream_, req);

}
else {
http::write(this->ssl_stream_, req);
}

beast::flat_buffer buffer;
http::response<http::string_body> res;
beast::error_code ec;
if (local_mode){
http::read(this->tcp_stream_, buffer, res, ec);

}
else{
http::read(this->ssl_stream_, buffer, res, ec);
int retry_count = 0;
while (retry_count < max_retries || max_retries == std::numeric_limits<double>::infinity()) {
beast::error_code ec;
if (local_mode){
http::write(this->tcp_stream_, req);
}
else {
http::write(this->ssl_stream_, req);
}

}
if (!(res.result() == http::status::ok)) {
throw std::runtime_error(
"response status: " + std::to_string(res.result_int()) + "\nbody:\n" + res.body());
}
if (debug_mode) {
std::cout << "res body content:\n";
std::cout << res.body().data() << std::endl;
}
beast::flat_buffer buffer;
http::response<http::string_body> res;
if (local_mode){
http::read(this->tcp_stream_, buffer, res, ec);
}
else{
http::read(this->ssl_stream_, buffer, res, ec);
}
if (!(res.result() == http::status::ok)) {
if (std::find(status_force_list.begin(), status_force_list.end(), res.result_int()) != status_force_list.end() || ec) {
int delay_seconds = base_backoff * std::pow(backoff_factor, retry_count);
if (max_backoff > 0 && delay_seconds > max_backoff) {
delay_seconds = max_backoff;
}
std::this_thread::sleep_for(std::chrono::seconds(delay_seconds));
retry_count++;
std::cout << "Retrying" << mode << ":" << "Status" << res.result() << std::endl;
continue;
} else {
throw std::runtime_error(
"response status: " + std::to_string(res.result_int()) + "\nbody:\n" + res.body());
}
}
if (debug_mode) {
std::cout << "res body content:\n";
std::cout << res.body().data() << std::endl;
}

if (res["Content-Encoding"] == "gzip") {
boost::iostreams::array_source src{res.body().data(), res.body().size()};
boost::iostreams::filtering_istream is;
is.push(boost::iostreams::gzip_decompressor{}); // gzip
is.push(src);
std::stringstream strstream;
boost::iostreams::copy(is, strstream);
return strstream.str();
} else {
return res.body().data();
if (res["Content-Encoding"] == "gzip") {
boost::iostreams::array_source src{res.body().data(), res.body().size()};
boost::iostreams::filtering_istream is;
is.push(boost::iostreams::gzip_decompressor{}); // gzip
is.push(src);
std::stringstream strstream;
boost::iostreams::copy(is, strstream);
return strstream.str();
} else {
return res.body().data();
}
}
throw std::runtime_error("max retries exceeded");
}

} // namespace invertedai
17 changes: 16 additions & 1 deletion invertedai_cpp/invertedai/session.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#include <string>
#include <vector>
#include <cmath>
#include <limits>
#include <chrono>
#include <thread>

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
Expand All @@ -24,6 +28,12 @@ class Session {
const char *debug_mode = std::getenv("DEBUG");
const char *iai_dev = std::getenv("IAI_DEV");
const bool local_mode = iai_dev && (std::string(iai_dev) == "1" || std::string(iai_dev) == "True");
double max_retries = std::numeric_limits<double>::infinity(); // Allows for infinite retries by default
std::vector<int> status_force_list = {408, 429, 500, 502, 503, 504};
int base_backoff = 1; // Base backoff time in seconds
int backoff_factor = 2;
int current_backoff = base_backoff;
int max_backoff = 0; // No max backoff by default, 0 signifies no limit

public:
const char* host_ = local_mode ? "localhost" : "api.inverted.ai";
Expand Down Expand Up @@ -65,7 +75,12 @@ class Session {
const std::string request(
const std::string &mode,
const std::string &body_str,
const std::string &url_params
const std::string &url_params,
double max_retries = std::numeric_limits<double>::infinity(),
const std::vector<int>& status_force_list = {408, 429, 500, 502, 503, 504},
int base_backoff = 1,
int backoff_factor = 2,
int max_backoff = 0 // No max by default
);
};

Expand Down

0 comments on commit 43ded39

Please sign in to comment.