Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Fetching contributors…
Cannot retrieve contributors at this time
397 lines (332 sloc) 13.4 KB
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "config.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <event.h>
#include <netdb.h>
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include "protocol_binary.h"
/* Maximum length of a key. */
#define KEY_MAX_LENGTH 250
#define DATA_BUFFER_SIZE 2048
#define UDP_READ_BUFFER_SIZE 65536
#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
/* I'm told the max legnth of a 64-bit num converted to string is 20 bytes.
* Plus a few for spaces, \r\n, \0 */
#define SUFFIX_SIZE 24
/** Initial size of list of items being returned by "get". */
/** Initial size of list of CAS suffixes appended to "gets" lines. */
/** Initial size of the sendmsg() scatter/gather array. */
#define IOV_LIST_INITIAL 400
/** Initial number of sendmsg() argument structures to allocate. */
/** High water marks for buffer shrinking */
#define IOV_LIST_HIGHWAT 600
#define MSG_LIST_HIGHWAT 100
/* Binary protocol stuff */
#define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t))
/* unistd.h is here */
# include <unistd.h>
/* Slab sizing definitions. */
#define POWER_LARGEST 200
#define POWER_BLOCK 1048576
/** Time relative to server start. Smaller than time_t on 64-bit systems. */
typedef unsigned int rel_time_t;
/* Stats stored per slab (and per thread). */
struct slab_stats {
uint64_t set_cmds;
uint64_t get_hits;
uint64_t delete_hits;
uint64_t cas_hits;
uint64_t cas_badval;
uint64_t incr_hits;
uint64_t decr_hits;
struct thread_stats {
pthread_mutex_t mutex;
uint64_t get_cmds;
uint64_t get_misses;
uint64_t delete_misses;
uint64_t incr_misses;
uint64_t decr_misses;
uint64_t cas_misses;
uint64_t bytes_read;
uint64_t bytes_written;
struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
struct stats {
pthread_mutex_t mutex;
unsigned int curr_items;
unsigned int total_items;
uint64_t curr_bytes;
unsigned int curr_conns;
unsigned int total_conns;
unsigned int conn_structs;
uint64_t evictions;
struct settings {
size_t maxbytes;
int maxconns;
int port;
int udpport;
char *inter;
int verbose;
rel_time_t oldest_live; /* ignore existing items older than this */
int evict_to_free;
char *socketpath; /* path to unix socket if using local socket */
int access; /* access mask (a la chmod) for unix domain socket */
double factor; /* chunk size growth factor */
int chunk_size;
int num_threads; /* number of libevent threads to run */
char prefix_delimiter; /* character that marks a key prefix (for stats) */
int detail_enabled; /* nonzero if we're collecting detailed stats */
int reqs_per_event; /* Maximum number of io to process on each
io-event. */
bool use_cas;
extern struct stats stats;
extern time_t process_started;
extern struct settings settings;
#define ITEM_LINKED 1
#define ITEM_CAS 2
/* temp */
#define ITEM_SLABBED 4
typedef struct _stritem {
struct _stritem *next;
struct _stritem *prev;
struct _stritem *h_next; /* hash chain next */
rel_time_t time; /* least recent access */
rel_time_t exptime; /* expire time */
int nbytes; /* size of data */
unsigned short refcount;
uint8_t nsuffix; /* length of flags-and-length string */
uint8_t it_flags; /* ITEM_* above */
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
void * end[];
/* if it_flags & ITEM_CAS we have 8 bytes CAS */
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
/* warning: don't use these macros with a function, as it evals its arg twice */
#define ITEM_get_cas(i) ((uint64_t)(((i)->it_flags & ITEM_CAS) ? \
*(uint64_t*)&((i)->end[0]) : 0x0))
#define ITEM_set_cas(i,v) { if ((i)->it_flags & ITEM_CAS) { \
*(uint64_t*)&((i)->end[0]) = v; } }
#define ITEM_key(item) (((char*)&((item)->end[0])) \
+ (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 \
+ (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 \
+ (item)->nsuffix \
+ (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \
+ (item)->nsuffix + (item)->nbytes \
+ (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
* NOTE: If you modify this table you _MUST_ update the function state_text
enum conn_states {
conn_listening, /** the socket which listens for connections */
conn_new_cmd, /** Prepare connection for next command */
conn_waiting, /** waiting for a readable socket */
conn_read, /** reading in a command line */
conn_parse_cmd, /** try to parse a command from the input buffer */
conn_write, /** writing out a simple response */
conn_nread, /** reading in a fixed number of bytes */
conn_swallow, /** swallowing unnecessary bytes w/o storing */
conn_closing, /** closing this connection */
conn_mwrite, /** writing out many items sequentially */
conn_max_state /** Max state value (used for assertion) */
enum bin_substates {
enum protocol {
ascii_prot = 3, /* arbitrary value. */
negotiating_prot /* Discovering the protocol */
#define IS_UDP(x) (x == ascii_udp_prot)
#define NREAD_ADD 1
#define NREAD_SET 2
#define NREAD_APPEND 4
#define NREAD_CAS 6
enum store_item_type {
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
typedef struct conn conn;
struct conn {
int sfd;
enum conn_states state;
enum bin_substates substate;
struct event event;
short ev_flags;
short which; /** which events were just triggered */
char *rbuf; /** buffer to read commands into */
char *rcurr; /** but if we parsed some already, this is where we stopped */
int rsize; /** total allocated size of rbuf */
int rbytes; /** how much data, starting from rcur, do we have unparsed */
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
/** which state to go into after finishing current write */
enum conn_states write_and_go;
void *write_and_free; /** free this memory after finishing writing */
char *ritem; /** when we read in an item's value, it goes here */
int rlbytes;
/* data for the nread state */
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
void *item; /* for commands set/add/replace */
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */
struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
char **suffixlist;
int suffixsize;
char **suffixcurr;
int suffixleft;
enum protocol protocol; /* which protocol this connection speaks */
/* data for UDP clients */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
struct sockaddr request_addr; /* Who sent the most recent request */
socklen_t request_addr_size;
unsigned char *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
bool noreply; /* True if the reply should not be sent. */
/* Binary protocol stuff */
/* This is where the binary header goes */
protocol_binary_request_header binary_header;
uint64_t cas; /* the cas to return */
short cmd; /* current command being processed */
int opaque;
int keylen;
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
/* current time of day (updated periodically) */
extern volatile rel_time_t current_time;
* Functions
char *do_add_delta(conn *c, item *item, const bool incr, const int64_t delta,
char *buf);
enum store_item_type do_store_item(item *item, int comm, conn* c);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum protocol prot, struct event_base *base);
uint32_t append_bin_stats(char *buf, const char *key, const uint16_t klen,
const char *val, const uint32_t vlen, void *cookie);
uint32_t append_ascii_stats(char *buf, const char *key, const uint16_t klen,
const char *val, const uint32_t vlen, void *cookie);
extern int daemonize(int nochdir, int noclose);
#include "stats.h"
#include "slabs.h"
#include "assoc.h"
#include "items.h"
#include "trace.h"
#include "hash.h"
* Functions such as the libevent-related calls that need to do cross-thread
* communication in multithreaded mode (rather than actually doing the work
* in the current thread) are called via "dispatch_" frontends, which are
* also #define-d to directly call the underlying code in singlethreaded mode.
void thread_init(int nthreads, struct event_base *main_base);
int dispatch_event_add(int thread, conn *c);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum protocol prot);
/* Lock wrappers for cache functions that are called from main loop. */
char *add_delta(conn *c, item *item, const int incr, const int64_t delta,
char *buf);
conn *conn_from_freelist(void);
bool conn_add_to_freelist(conn *c);
char *suffix_from_freelist(void);
bool suffix_add_to_freelist(char *s);
int is_listen_thread(void);
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
void item_flush_expired(void);
item *item_get(const char *key, const size_t nkey);
int item_link(item *it);
void item_remove(item *it);
int item_replace(item *it, item *new_it);
char *item_stats(uint32_t (*add_stats)(char *buf, const char *key,
const uint16_t klen, const char *val,
const uint32_t vlen, void *cookie), void *c, int *bytes);
char *item_stats_sizes(uint32_t (*add_stats)(char *buf,
const char *key, const uint16_t klen, const char *val,
const uint32_t vlen, void *cookie), void *c, int *bytes);
void item_unlink(item *it);
void item_update(item *it);
void STATS_LOCK(void);
void STATS_UNLOCK(void);
void threadlocal_stats_reset(void);
void threadlocal_stats_aggregate(struct thread_stats *stats);
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
enum store_item_type store_item(item *item, int comm, conn *c);
extern void drop_privileges();
#define drop_privileges()
/* If supported, give compiler hints for branch prediction. */
#if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
#define __builtin_expect(x, expected_value) (x)
#define likely(x) __builtin_expect((x),1)
#define unlikely(x) __builtin_expect((x),0)
Jump to Line
Something went wrong with that request. Please try again.