Skip to content

Commit

Permalink
Merge ccc3368 into b256fb3
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Dec 22, 2018
2 parents b256fb3 + ccc3368 commit 8869f40
Show file tree
Hide file tree
Showing 62 changed files with 8,161 additions and 74 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
@@ -1,4 +1,4 @@
FROM golang:1.11.1
FROM golang:1.11.4

MAINTAINER Ivan Kozlovic <ivan@synadia.com>

Expand Down
85 changes: 69 additions & 16 deletions README.md
Expand Up @@ -25,6 +25,7 @@ NATS Streaming provides the following high-level feature set.
* [Queue Group](#queue-group)
* [Redelivery](#redelivery)
* [Store Interface](#store-interface)
* [Store Encryption](#store-encryption)
* [Clustering](#clustering)
* [Supported Stores](#supported-stores)
* [Clustering Configuration](#clustering-configuration)
Expand Down Expand Up @@ -317,6 +318,52 @@ The memory and the provided file store implementations both use a generic store
When writing your own store implementation, you can do the same for APIs that don't need to do more than what the generic implementation provides.
You can check [MemStore](https://github.com/nats-io/nats-streaming-server/blob/master/stores/memstore.go) and [FileStore](https://github.com/nats-io/nats-streaming-server/blob/master/stores/filestore.go) implementations for more details.

## Store Encryption

The server can be configured to encrypt a message's payload when storing them, providing encryption at rest.
This can be done from the command line or from the configuration file. Check `encrypt` and `encryption_key`
in the [Configuring](#configuring) section.

It is recommended to provide the encryption key through the environment variable `NATS_STREAMING_ENCRYPTION_KEY`
instead of `encryption_key`. If encryption is enabled and `NATS_STREAMING_ENCRYPTION_KEY` is found, this
will take precedence over `encryption_key` value.

You can pass this from the command line this way:
```
$ env NATS_STREAMING_ENCRYPTION_KEY="mykey" nats-streaming-server -store file -dir datastore -encrypt
```

We currently support two ciphers for encryption: [AES](https://godoc.org/crypto/aes) and [CHACHA](https://godoc.org/golang.org/x/crypto/chacha20poly1305).
The default selected cipher depends on the platform. For ARM, we use `CHACHA`, otherwise
we default to `AES`. You can always override that decision by explicitly specifying the cipher like this:
```
$ env NATS_STREAMING_ENCRYPTION_KEY="mykey" nats-streaming-server -store file -dir datastore -encrypt -encryption_cipher "CHACHA"
```
or, to select `AES`:
```
$ env NATS_STREAMING_ENCRYPTION_KEY="mykey" nats-streaming-server -store file -dir datastore -encrypt -encryption_cipher "AES"
```

Note that only message payload is encrypted, all other data stored by NATS Streaming server is not.

When running in clustering mode (see below), the server uses RAFT, which uses its own log files.
Those will be encrypted too.

Starting a server with `encrypt` against a datastore that was not encrypted may result in failures
when it comes to decrypt a message, which may not happen immediately upon startup. Instead,
it will happen when attempting to deliver messages to consumers. However, when possible, the
server will detect if the data was not encrypted and return the data without attempting to decrypt it.
The server will also detect which cipher was used to encrypt the data and use the proper cipher
to decrypt, even if this is not the currently selected cipher.

If the data is encrypted with a key and the server is restarted with a different key, the
server will fail to decrypt messages when attempting to load them from the store.

Performance considerations: As expected, encryption is likely to decrease performance, but by how much is hard
to define. In some performance tests on a MacbookPro 2.8 GHz Intel Core i7 with SSD, we have
observed as little as 1% decrease to more than 30%. In addition to CPU cycles required for encryption,
the encrypted payload is bigger, which result in more data being stored or read.

## Clustering

NATS Streaming Server supports clustering and data replication, implemented
Expand Down Expand Up @@ -1320,22 +1367,25 @@ The NATS Streaming Server accepts command line arguments to control its behavior
Usage: nats-streaming-server [options]
Streaming Server Options:
-cid, --cluster_id <string> Cluster ID (default: test-cluster)
-st, --store <string> Store type: MEMORY|FILE|SQL (default: MEMORY)
--dir <string> For FILE store type, this is the root directory
-mc, --max_channels <int> Max number of channels (0 for unlimited)
-msu, --max_subs <int> Max number of subscriptions per channel (0 for unlimited)
-mm, --max_msgs <int> Max number of messages per channel (0 for unlimited)
-mb, --max_bytes <size> Max messages total size per channel (0 for unlimited)
-ma, --max_age <duration> Max duration a message can be stored ("0s" for unlimited)
-mi, --max_inactivity <duration> Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
-ns, --nats_server <string> Connect to this external NATS Server URL (embedded otherwise)
-sc, --stan_config <string> Streaming server configuration file
-hbi, --hb_interval <duration> Interval at which server sends heartbeat to a client
-hbt, --hb_timeout <duration> How long server waits for a heartbeat response
-hbf, --hb_fail_count <int> Number of failed heartbeats before server closes the client connection
--ft_group <string> Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore.
-sl, --signal <signal>[=<pid>] Send signal to nats-streaming-server process (stop, quit, reopen)
-cid, --cluster_id <string> Cluster ID (default: test-cluster)
-st, --store <string> Store type: MEMORY|FILE|SQL (default: MEMORY)
--dir <string> For FILE store type, this is the root directory
-mc, --max_channels <int> Max number of channels (0 for unlimited)
-msu, --max_subs <int> Max number of subscriptions per channel (0 for unlimited)
-mm, --max_msgs <int> Max number of messages per channel (0 for unlimited)
-mb, --max_bytes <size> Max messages total size per channel (0 for unlimited)
-ma, --max_age <duration> Max duration a message can be stored ("0s" for unlimited)
-mi, --max_inactivity <duration> Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
-ns, --nats_server <string> Connect to this external NATS Server URL (embedded otherwise)
-sc, --stan_config <string> Streaming server configuration file
-hbi, --hb_interval <duration> Interval at which server sends heartbeat to a client
-hbt, --hb_timeout <duration> How long server waits for a heartbeat response
-hbf, --hb_fail_count <int> Number of failed heartbeats before server closes the client connection
--ft_group <string> Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
-sl, --signal <signal>[=<pid>] Send signal to nats-streaming-server process (stop, quit, reopen)
--encrypt <bool> Specify if server should use encryption at rest
--encryption_cipher <string> Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
--encryption_key <sting> Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead
Streaming Server Clustering Options:
--clustered <bool> Run the server in a clustered configuration (default: false)
Expand Down Expand Up @@ -1494,6 +1544,9 @@ In general the configuration parameters are the same as the command line argumen
| ft_group | In Fault Tolerance mode, you can start a group of streaming servers with only one server being active while others are running in standby mode. This is the name of this FT group | String | `ft_group: "my_ft_group"` |
| partitioning | If set to true, a list of channels must be defined in store_limits/channels section. This section then serves two purposes, overriding limits for a given channel or adding it to the partition | `true` or `false` | `partitioning: true` |
| cluster | Cluster Configuration | Map: `cluster: { ... }` | **See details below** |
| encrypt | Specify if server should encrypt messages (only the payload) when storing them | `true` or `false` | `encrypt: true` |
| encryption_cipher | Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES | `AES` or `CHACHA` | `encryption_cipher: "AES"` |
| encryption_key | Encryption key. It is recommended to specify the key through the `NATS_STREAMING_ENCRYPTION_KEY` environment variable instead | String | `encryption_key: "mykey"` |

TLS Configuration:

Expand Down
37 changes: 20 additions & 17 deletions nats-streaming-server.go
Expand Up @@ -32,23 +32,26 @@ var usageStr = `
Usage: nats-streaming-server [options]
Streaming Server Options:
-cid, --cluster_id <string> Cluster ID (default: test-cluster)
-st, --store <string> Store type: MEMORY|FILE|SQL (default: MEMORY)
--dir <string> For FILE store type, this is the root directory
-mc, --max_channels <int> Max number of channels (0 for unlimited)
-msu, --max_subs <int> Max number of subscriptions per channel (0 for unlimited)
-mm, --max_msgs <int> Max number of messages per channel (0 for unlimited)
-mb, --max_bytes <size> Max messages total size per channel (0 for unlimited)
-ma, --max_age <duration> Max duration a message can be stored ("0s" for unlimited)
-mi, --max_inactivity <duration> Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
-ns, --nats_server <string> Connect to this external NATS Server URL (embedded otherwise)
-sc, --stan_config <string> Streaming server configuration file
-hbi, --hb_interval <duration> Interval at which server sends heartbeat to a client
-hbt, --hb_timeout <duration> How long server waits for a heartbeat response
-hbf, --hb_fail_count <int> Number of failed heartbeats before server closes the client connection
--ft_group <string> Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore.
-sl, --signal <signal>[=<pid>] Send signal to nats-streaming-server process (stop, quit, reopen)
-cid, --cluster_id <string> Cluster ID (default: test-cluster)
-st, --store <string> Store type: MEMORY|FILE|SQL (default: MEMORY)
--dir <string> For FILE store type, this is the root directory
-mc, --max_channels <int> Max number of channels (0 for unlimited)
-msu, --max_subs <int> Max number of subscriptions per channel (0 for unlimited)
-mm, --max_msgs <int> Max number of messages per channel (0 for unlimited)
-mb, --max_bytes <size> Max messages total size per channel (0 for unlimited)
-ma, --max_age <duration> Max duration a message can be stored ("0s" for unlimited)
-mi, --max_inactivity <duration> Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
-ns, --nats_server <string> Connect to this external NATS Server URL (embedded otherwise)
-sc, --stan_config <string> Streaming server configuration file
-hbi, --hb_interval <duration> Interval at which server sends heartbeat to a client
-hbt, --hb_timeout <duration> How long server waits for a heartbeat response
-hbf, --hb_fail_count <int> Number of failed heartbeats before server closes the client connection
--ft_group <string> Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
-sl, --signal <signal>[=<pid>] Send signal to nats-streaming-server process (stop, quit, reopen)
--encrypt <bool> Specify if server should use encryption at rest
--encryption_cipher <string> Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
--encryption_key <sting> Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead
Streaming Server Clustering Options:
--clustered <bool> Run the server in a clustered configuration (default: false)
--cluster_node_id <string> ID of the node within the cluster if there is no stored ID (default: random UUID)
Expand Down
2 changes: 2 additions & 0 deletions scripts/cov.sh
Expand Up @@ -4,6 +4,7 @@
rm -rf ./cov
mkdir cov
go test -v -covermode=atomic -coverprofile=./cov/server.out -coverpkg=./server,./stores,./util ./server
go test -v -covermode=atomic -coverprofile=./cov/server2.out -coverpkg=./server,./stores,./util -run=TestPersistent ./server -encrypt
go test -v -covermode=atomic -coverprofile=./cov/logger.out ./logger
go test -v -covermode=atomic -coverprofile=./cov/stores1.out ./stores
go test -v -covermode=atomic -coverprofile=./cov/stores2.out -run=TestCS/FILE ./stores -fs_no_buffer
Expand All @@ -12,6 +13,7 @@ go test -v -covermode=atomic -coverprofile=./cov/stores4.out -run=TestCS/FILE ./
go test -v -covermode=atomic -coverprofile=./cov/stores5.out -run=TestFS ./stores -fs_no_buffer
go test -v -covermode=atomic -coverprofile=./cov/stores6.out -run=TestFS ./stores -fs_set_fds_limit
go test -v -covermode=atomic -coverprofile=./cov/stores7.out -run=TestFS ./stores -fs_no_buffer -fs_set_fds_limit
go test -v -covermode=atomic -coverprofile=./cov/stores8.out -run=TestCS ./stores -encrypt
go test -v -covermode=atomic -coverprofile=./cov/util.out ./util
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov
Expand Down
3 changes: 2 additions & 1 deletion server/clustering.go
Expand Up @@ -273,7 +273,8 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
s.raft = &raftNode{}

raftLogFileName := filepath.Join(path, raftLogFile)
store, err := newRaftLog(s.log, raftLogFileName, s.opts.Clustering.Sync, int(s.opts.Clustering.TrailingLogs))
store, err := newRaftLog(s.log, raftLogFileName, s.opts.Clustering.Sync, int(s.opts.Clustering.TrailingLogs),
s.opts.Encrypt, s.opts.EncryptionCipher, s.opts.EncryptionKey)
if err != nil {
return false, err
}
Expand Down
72 changes: 72 additions & 0 deletions server/clustering_test.go
Expand Up @@ -3977,3 +3977,75 @@ func TestClusteringRaftDefaultTimeouts(t *testing.T) {
t.Fatalf("Expected commit timeout to be %v, got %v", defaultRaftCommitTimeout, commitTimeout)
}
}

func TestClusteringWithCryptoStore(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.Encrypt = true
s1sOpts.EncryptionKey = []byte("key1")
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.Encrypt = true
s2sOpts.EncryptionKey = []byte("key2")
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}
// Wait for leader to be elected.
getLeader(t, 10*time.Second, servers...)

sc := NewDefaultConnection(t)
defer sc.Close()

payload := []byte("this is the content of the message")
sc.Publish("foo", payload)

ch := make(chan pb.MsgProto, 1)
sc.Subscribe("foo", func(m *stan.Msg) {
ch <- m.MsgProto
}, stan.DeliverAllAvailable())

select {
case m := <-ch:
assertMsg(t, m, payload, uint64(1))
case <-time.After(2 * time.Second):
t.Fatalf("Did not get our message")
}

// Now check that raft logs do not contain the payload in plain text
s1.raft.Lock()
fname1 := s1.raft.store.fileName
s1.raft.Unlock()

s2.raft.Lock()
fname2 := s2.raft.store.fileName
s2.raft.Unlock()

sc.Close()
s2.Shutdown()
s1.Shutdown()

check := func(t *testing.T, name, fname string) {
t.Helper()
content, err := ioutil.ReadFile(fname)
if err != nil {
t.Fatalf("Error reading file %q: %v", fname1, err)
}
if bytes.Contains(content, payload) {
t.Fatalf("Expected raft log of %q to not contain payload in plain text", name)
}
}
check(t, "s1", fname1)
check(t, "s2", fname2)
}
25 changes: 25 additions & 0 deletions server/conf.go
Expand Up @@ -154,6 +154,22 @@ func ProcessConfigFile(configFile string, opts *Options) error {
return err
}
opts.SyslogName = v.(string)
case "encrypt":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.Encrypt = v.(bool)
case "encryption_cipher":
if err := checkType(k, reflect.String, v); err != nil {
return err
}
opts.EncryptionCipher = v.(string)
case "encryption_key":
if err := checkType(k, reflect.String, v); err != nil {
return err
}
opts.Encrypt = true
opts.EncryptionKey = []byte(v.(string))
}
}
return nil
Expand Down Expand Up @@ -528,6 +544,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
stanConfigFile string
natsConfigFile string
clusterPeers string
encryptionKey string
)

fs.StringVar(&sopts.ID, "cluster_id", DefaultClusterID, "stan.ID")
Expand Down Expand Up @@ -600,6 +617,9 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching")
fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database")
fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name")
fs.BoolVar(&sopts.Encrypt, "encrypt", false, "Specify if server should use encryption at rest")
fs.StringVar(&sopts.EncryptionCipher, "encryption_cipher", stores.CryptoCipherAutoSelect, "Encryption cipher. Supported are AES and CHACHA (default is AES)")
fs.StringVar(&encryptionKey, "encryption_key", "", "Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead")

// First, we need to call NATS's ConfigureOptions() with above flag set.
// It will be augmented with NATS specific flags and call fs.Parse(args) for us.
Expand All @@ -620,6 +640,11 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
}
}

if encryptionKey != "" {
sopts.Encrypt = true
sopts.EncryptionKey = []byte(encryptionKey)
}

// If both nats and streaming configuration files are used, then
// we only use the config file for the corresponding module.
// However, if only one command line parameter was specified,
Expand Down
12 changes: 12 additions & 0 deletions server/conf_test.go
Expand Up @@ -254,6 +254,15 @@ func TestParseConfig(t *testing.T) {
if opts.SQLStoreOpts.MaxOpenConns != 5 {
t.Fatalf("Expected SQL MaxOpenConns to be 5, got %v", opts.SQLStoreOpts.MaxOpenConns)
}
if !opts.Encrypt {
t.Fatal("Expected Encrypt to be true")
}
if string(opts.EncryptionCipher) != "AES" {
t.Fatalf("Expected EncryptionCipher to be %q, got %q", "AES", opts.EncryptionCipher)
}
if string(opts.EncryptionKey) != "key" {
t.Fatalf("Expected EncryptionKey to be %q, got %q", "key", opts.EncryptionKey)
}
}

func TestParsePermError(t *testing.T) {
Expand Down Expand Up @@ -456,6 +465,9 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "sql:{source:false}", wrongTypeErr)
expectFailureFor(t, "sql:{no_caching:123}", wrongTypeErr)
expectFailureFor(t, "sql:{max_open_conns:false}", wrongTypeErr)
expectFailureFor(t, "encrypt: 123", wrongTypeErr)
expectFailureFor(t, "encryption_cipher: 123", wrongTypeErr)
expectFailureFor(t, "encryption_key: 123", wrongTypeErr)
}

func expectFailureFor(t *testing.T, content, errorMatch string) {
Expand Down

0 comments on commit 8869f40

Please sign in to comment.