Permalink
Browse files

Add long polling support

  • Loading branch information...
1 parent 6818c69 commit 7a87bee999fc82a8f175bf533e59ef216575d541 Jeff Garzik committed with jgarzik Mar 18, 2011
Showing with 196 additions and 21 deletions.
  1. +104 −18 cpu-miner.c
  2. +12 −2 miner.h
  3. +80 −1 util.c
View
@@ -31,12 +31,6 @@
#define DEF_RPC_URL "http://127.0.0.1:8332/"
#define DEF_RPC_USERPASS "rpcuser:rpcpass"
-struct thr_info {
- int id;
- pthread_t pth;
- struct thread_q *q;
-};
-
enum workio_commands {
WC_GET_WORK,
WC_SUBMIT_WORK,
@@ -78,18 +72,21 @@ static const char *algo_names[] = {
bool opt_debug = false;
bool opt_protocol = false;
+bool want_longpoll = true;
+bool have_longpoll = false;
static bool opt_quiet = false;
static int opt_retries = 10;
static int opt_fail_pause = 30;
-static int opt_scantime = 5;
+int opt_scantime = 5;
static json_t *opt_config;
static const bool opt_time = true;
static enum sha256_algos opt_algo = ALGO_C;
static int opt_n_threads = 1;
static char *rpc_url;
static char *userpass;
-static struct thr_info *thr_info;
+struct thr_info *thr_info;
static int work_thr_id;
+int longpoll_thr_id;
struct work_restart *work_restart = NULL;
@@ -130,6 +127,9 @@ static struct option_help options_help[] = {
{ "debug",
"(-D) Enable debug output (default: off)" },
+ { "no-longpoll",
+ "Disable X-Long-Polling support (default: enabled)" },
+
{ "protocol-dump",
"(-P) Verbose dump of protocol-level activities (default: off)" },
@@ -170,6 +170,7 @@ static struct option options[] = {
{ "scantime", 1, NULL, 's' },
{ "url", 1, NULL, 1001 },
{ "userpass", 1, NULL, 1002 },
+ { "no-longpoll", 0, NULL, 1003 },
{ }
};
@@ -262,7 +263,7 @@ static bool submit_upstream_work(CURL *curl, const struct work *work)
fprintf(stderr, "DBG: sending RPC call:\n%s", s);
/* issue JSON-RPC request */
- val = json_rpc_call(curl, rpc_url, userpass, s);
+ val = json_rpc_call(curl, rpc_url, userpass, s, false, false);
if (!val) {
fprintf(stderr, "submit_upstream_work json_rpc_call failed\n");
goto out;
@@ -285,14 +286,16 @@ static bool submit_upstream_work(CURL *curl, const struct work *work)
return rc;
}
+static const char *rpc_req =
+ "{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";
+
static bool get_upstream_work(CURL *curl, struct work *work)
{
- static const char *rpc_req =
- "{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";
json_t *val;
bool rc;
- val = json_rpc_call(curl, rpc_url, userpass, rpc_req);
+ val = json_rpc_call(curl, rpc_url, userpass, rpc_req,
+ want_longpoll, false);
if (!val)
return false;
@@ -593,14 +596,76 @@ static void *miner_thread(void *userdata)
return NULL;
}
-void restart_threads(void)
+static void restart_threads(void)
{
int i;
for (i = 0; i < opt_n_threads; i++)
work_restart[i].restart = 1;
}
+static void *longpoll_thread(void *userdata)
+{
+ struct thr_info *mythr = userdata;
+ CURL *curl = NULL;
+ char *copy_start, *hdr_path, *lp_url = NULL;
+ bool need_slash = false;
+ int failures = 0;
+
+ hdr_path = tq_pop(mythr->q, NULL);
+ if (!hdr_path)
+ goto out;
+ copy_start = (*hdr_path == '/') ? (hdr_path + 1) : hdr_path;
+ if (rpc_url[strlen(rpc_url) - 1] != '/')
+ need_slash = true;
+
+ lp_url = malloc(strlen(rpc_url) + strlen(copy_start) + 2);
+ if (!lp_url)
+ goto out;
+
+ sprintf(lp_url, "%s%s%s", rpc_url, need_slash ? "/" : "", copy_start);
+
+ fprintf(stderr, "Long-polling activated for %s\n", lp_url);
+
+ curl = curl_easy_init();
+ if (!curl) {
+ fprintf(stderr, "CURL initialization failed\n");
+ goto out;
+ }
+
+ while (1) {
+ json_t *val;
+
+ val = json_rpc_call(curl, lp_url, userpass, rpc_req,
+ false, true);
+ if (val) {
+ failures = 0;
+ json_decref(val);
+ fprintf(stderr, "LONGPOLL detected new block\n");
+ restart_threads();
+ } else {
+ if (failures++ < 10) {
+ sleep(30);
+ fprintf(stderr,
+ "longpoll failed, sleeping for 30s\n");
+ } else {
+ fprintf(stderr,
+ "longpoll failed, ending thread\n");
+ goto out;
+ }
+ }
+ }
+
+out:
+ free(hdr_path);
+ free(lp_url);
+ tq_freeze(mythr->q);
+ if (curl)
+ curl_easy_cleanup(curl);
+
+ return NULL;
+}
+
static void show_usage(void)
{
int i;
@@ -696,6 +761,9 @@ static void parse_arg (int key, char *arg)
free(userpass);
userpass = strdup(arg);
break;
+ case 1003:
+ want_longpoll = false;
+ break;
default:
show_usage();
}
@@ -763,17 +831,18 @@ int main (int argc, char *argv[])
if (setpriority(PRIO_PROCESS, 0, 19))
perror("setpriority");
- thr_info = calloc(opt_n_threads + 1, sizeof(*thr));
- if (!thr_info)
- return 1;
-
work_restart = calloc(opt_n_threads, sizeof(*work_restart));
if (!work_restart)
return 1;
+ thr_info = calloc(opt_n_threads + 2, sizeof(*thr));
+ if (!thr_info)
+ return 1;
+
+ /* init workio thread info */
work_thr_id = opt_n_threads;
thr = &thr_info[work_thr_id];
- thr->id = opt_n_threads;
+ thr->id = work_thr_id;
thr->q = tq_new();
if (!thr->q)
return 1;
@@ -784,6 +853,23 @@ int main (int argc, char *argv[])
return 1;
}
+ /* init longpoll thread info */
+ if (want_longpoll) {
+ longpoll_thr_id = opt_n_threads + 1;
+ thr = &thr_info[longpoll_thr_id];
+ thr->id = longpoll_thr_id;
+ thr->q = tq_new();
+ if (!thr->q)
+ return 1;
+
+ /* start longpoll thread */
+ if (pthread_create(&thr->pth, NULL, longpoll_thread, thr)) {
+ fprintf(stderr, "longpoll thread create failed\n");
+ return 1;
+ }
+ } else
+ longpoll_thr_id = -1;
+
/* start mining threads */
for (i = 0; i < opt_n_threads; i++) {
thr = &thr_info[i];
View
@@ -42,6 +42,12 @@
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
#endif
+struct thr_info {
+ int id;
+ pthread_t pth;
+ struct thread_q *q;
+};
+
static inline uint32_t swab32(uint32_t v)
{
#ifdef WANT_BUILTIN_BSWAP
@@ -70,7 +76,7 @@ extern bool opt_debug;
extern bool opt_protocol;
extern const uint32_t sha256_init_state[];
extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass,
- const char *rpc_req);
+ const char *rpc_req, bool, bool);
extern char *bin2hex(const unsigned char *p, size_t len);
extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len);
@@ -110,15 +116,19 @@ timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);
extern bool fulltest(const unsigned char *hash, const unsigned char *target);
+extern int opt_scantime;
+extern bool want_longpoll;
+extern bool have_longpoll;
struct thread_q;
struct work_restart {
volatile unsigned long restart;
char padding[128 - sizeof(unsigned long)];
};
+extern struct thr_info *thr_info;
+extern int longpoll_thr_id;
extern struct work_restart *work_restart;
-extern void restart_threads(void);
extern struct thread_q *tq_new(void);
extern void tq_free(struct thread_q *tq);
View
@@ -13,6 +13,7 @@
#include <stdio.h>
#include <stdlib.h>
+#include <ctype.h>
#include <string.h>
#include <jansson.h>
#include <curl/curl.h>
@@ -29,6 +30,10 @@ struct upload_buffer {
size_t len;
};
+struct header_info {
+ char *lp_path;
+};
+
struct tq_ent {
void *data;
struct list_head q_node;
@@ -95,8 +100,62 @@ static size_t upload_data_cb(void *ptr, size_t size, size_t nmemb,
return len;
}
+static size_t resp_hdr_cb(void *ptr, size_t size, size_t nmemb, void *user_data)
+{
+ struct header_info *hi = user_data;
+ size_t remlen, slen, ptrlen = size * nmemb;
+ char *rem, *val = NULL, *key = NULL;
+ void *tmp;
+
+ if (opt_protocol)
+ printf("In resp_hdr_cb\n");
+
+ val = calloc(1, ptrlen);
+ key = calloc(1, ptrlen);
+ if (!key || !val)
+ goto out;
+
+ tmp = memchr(ptr, ':', ptrlen);
+ if (!tmp || (tmp == ptr)) /* skip empty keys / blanks */
+ goto out;
+ slen = tmp - ptr;
+ if ((slen + 1) == ptrlen) /* skip key w/ no value */
+ goto out;
+ memcpy(key, ptr, slen); /* store & nul term key */
+ key[slen] = 0;
+
+ rem = ptr + slen + 1; /* trim value's leading whitespace */
+ remlen = ptrlen - slen - 1;
+ while ((remlen > 0) && (isspace(*rem))) {
+ remlen--;
+ rem++;
+ }
+
+ memcpy(val, rem, remlen); /* store value, trim trailing ws */
+ val[remlen] = 0;
+ while ((*val) && (isspace(val[strlen(val) - 1]))) {
+ val[strlen(val) - 1] = 0;
+ }
+ if (!*val) /* skip blank value */
+ goto out;
+
+ if (opt_protocol)
+ printf("HTTP hdr(%s): %s\n", key, val);
+
+ if (!strcasecmp("X-Long-Polling", key)) {
+ hi->lp_path = val; /* steal memory reference */
+ val = NULL;
+ }
+
+out:
+ free(key);
+ free(val);
+ return ptrlen;
+}
+
json_t *json_rpc_call(CURL *curl, const char *url,
- const char *userpass, const char *rpc_req)
+ const char *userpass, const char *rpc_req,
+ bool longpoll_scan, bool longpoll)
{
json_t *val, *err_val, *res_val;
int rc;
@@ -106,9 +165,15 @@ json_t *json_rpc_call(CURL *curl, const char *url,
struct curl_slist *headers = NULL;
char len_hdr[64];
char curl_err_str[CURL_ERROR_SIZE];
+ long timeout = longpoll ? (60 * 60) : (60 * 10);
+ struct header_info hi = { };
+ bool lp_scanning = false;
/* it is assumed that 'curl' is freshly [re]initialized at this pt */
+ if (longpoll_scan)
+ lp_scanning = want_longpoll && !have_longpoll;
+
if (opt_protocol)
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
curl_easy_setopt(curl, CURLOPT_URL, url);
@@ -121,6 +186,11 @@ json_t *json_rpc_call(CURL *curl, const char *url,
curl_easy_setopt(curl, CURLOPT_READDATA, &upload_data);
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_err_str);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+ curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
+ if (lp_scanning) {
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, resp_hdr_cb);
+ curl_easy_setopt(curl, CURLOPT_HEADERDATA, &hi);
+ }
if (userpass) {
curl_easy_setopt(curl, CURLOPT_USERPWD, userpass);
curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
@@ -148,6 +218,15 @@ json_t *json_rpc_call(CURL *curl, const char *url,
goto err_out;
}
+ /* If X-Long-Polling was found, activate long polling */
+ if (hi.lp_path) {
+ have_longpoll = true;
+ opt_scantime = 60;
+ tq_push(thr_info[longpoll_thr_id].q, hi.lp_path);
+ } else
+ free(hi.lp_path);
+ hi.lp_path = NULL;
+
val = json_loads(all_data.buf, &err);
if (!val) {
fprintf(stderr, "JSON decode failed(%d): %s\n", err.line, err.text);

0 comments on commit 7a87bee

Please sign in to comment.