Skip to content

Commit

Permalink
feat: support connection type (#15)
Browse files Browse the repository at this point in the history
* feat: support user type as identifier

* refactor: extract websocket connection logic into a package

* feat: add conn_type to metrics

* test: add connection type test

* refactor: change conn type to conn group

* refactor: optimize connection table

* refactor: rename ticker var

* refactor: use proper header naming convention

* refactor: remove identifier factory

* docs: add more details to conn group header

* refactor: use default value as default group name instead of empty string

* fix: typo in identifier and reword a comment

* feat: add conn_group tag to server_processing_latency_millisecond
  • Loading branch information
NNcrawler committed Oct 26, 2021
1 parent b381101 commit a562cd5
Show file tree
Hide file tree
Showing 30 changed files with 760 additions and 353 deletions.
2 changes: 1 addition & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ SERVER_WEBSOCKET_CHECK_ORIGIN="true"
SERVER_WEBSOCKET_MAX_CONN="30000"
SERVER_WEBSOCKET_READ_BUFFER_SIZE="10240"
SERVER_WEBSOCKET_WRITE_BUFFER_SIZE="10240"
SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER="x-user-id"
SERVER_WEBSOCKET_CONN_ID_HEADER="X-User-ID"
SERVER_WEBSOCKET_PING_INTERVAL_MS=30000
SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS=60000
SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS=5000
Expand Down
9 changes: 5 additions & 4 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ SERVER_WEBSOCKET_CHECK_ORIGIN="true"
SERVER_WEBSOCKET_MAX_CONN="30000"
SERVER_WEBSOCKET_READ_BUFFER_SIZE="10240"
SERVER_WEBSOCKET_WRITE_BUFFER_SIZE="10240"
SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER="x-user-id"
SERVER_WEBSOCKET_PING_INTERVAL_MS=30000
SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS=60000
SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS=5000
SERVER_WEBSOCKET_CONN_ID_HEADER="X-User-ID"
SERVER_WEBSOCKET_CONN_GROUP_HEADER="X-User-Group"
SERVER_WEBSOCKET_PING_INTERVAL_MS=10000
SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS=10000
SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS=1000
SERVER_WEBSOCKET_PINGER_SIZE=1

WORKER_BUFFER_CHANNEL_SIZE=5
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ $ docker pull odpf/raccoon
# Run the following docker command with minimal config.
$ docker run -p 8080:8080 \
-e SERVER_WEBSOCKET_PORT=8080 \
-e SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER=x-user-id \
-e SERVER_WEBSOCKET_CONN_ID_HEADER=X-User-ID \
-e PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS=host.docker.internal:9093 \
-e EVENT_DISTRIBUTION_PUBLISHER_PATTERN=clickstream-%s-log \
odpf/raccoon
Expand Down
2 changes: 1 addition & 1 deletion config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestServerConfig(t *testing.T) {
os.Setenv("SERVER_WEBSOCKET_PING_INTERVAL_MS", "1")
os.Setenv("SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS", "1")
os.Setenv("SERVER_WEBSOCKET_SERVER_SHUTDOWN_GRACE_PERIOD_MS", "3")
os.Setenv("SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER", "x-user-id")
os.Setenv("SERVER_WEBSOCKET_CONN_ID_HEADER", "X-User-ID")
serverWsConfigLoader()
assert.Equal(t, "8080", ServerWs.AppPort)
assert.Equal(t, time.Duration(1)*time.Millisecond, ServerWs.PingInterval)
Expand Down
46 changes: 26 additions & 20 deletions config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ import (
var ServerWs serverWs

type serverWs struct {
AppPort string
ServerMaxConn int
ReadBufferSize int
WriteBufferSize int
CheckOrigin bool
PingInterval time.Duration
PongWaitInterval time.Duration
WriteWaitInterval time.Duration
PingerSize int
UniqConnIDHeader string
AppPort string
ServerMaxConn int
ReadBufferSize int
WriteBufferSize int
CheckOrigin bool
PingInterval time.Duration
PongWaitInterval time.Duration
WriteWaitInterval time.Duration
PingerSize int
ConnIDHeader string
ConnGroupHeader string
ConnGroupDefault string
}

func serverWsConfigLoader() {
Expand All @@ -32,17 +34,21 @@ func serverWsConfigLoader() {
viper.SetDefault("SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS", "60000") //should be more than the ping period
viper.SetDefault("SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS", "5000")
viper.SetDefault("SERVER_WEBSOCKET_PINGER_SIZE", 1)
viper.SetDefault("SERVER_WEBSOCKET_CONN_GROUP_HEADER", "")
viper.SetDefault("SERVER_WEBSOCKET_CONN_GROUP_DEFAULT", "--default--")

ServerWs = serverWs{
AppPort: util.MustGetString("SERVER_WEBSOCKET_PORT"),
ServerMaxConn: util.MustGetInt("SERVER_WEBSOCKET_MAX_CONN"),
ReadBufferSize: util.MustGetInt("SERVER_WEBSOCKET_READ_BUFFER_SIZE"),
WriteBufferSize: util.MustGetInt("SERVER_WEBSOCKET_WRITE_BUFFER_SIZE"),
CheckOrigin: util.MustGetBool("SERVER_WEBSOCKET_CHECK_ORIGIN"),
PingInterval: util.MustGetDuration("SERVER_WEBSOCKET_PING_INTERVAL_MS", time.Millisecond),
PongWaitInterval: util.MustGetDuration("SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS", time.Millisecond),
WriteWaitInterval: util.MustGetDuration("SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS", time.Microsecond),
PingerSize: util.MustGetInt("SERVER_WEBSOCKET_PINGER_SIZE"),
UniqConnIDHeader: util.MustGetString("SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER"),
AppPort: util.MustGetString("SERVER_WEBSOCKET_PORT"),
ServerMaxConn: util.MustGetInt("SERVER_WEBSOCKET_MAX_CONN"),
ReadBufferSize: util.MustGetInt("SERVER_WEBSOCKET_READ_BUFFER_SIZE"),
WriteBufferSize: util.MustGetInt("SERVER_WEBSOCKET_WRITE_BUFFER_SIZE"),
CheckOrigin: util.MustGetBool("SERVER_WEBSOCKET_CHECK_ORIGIN"),
PingInterval: util.MustGetDuration("SERVER_WEBSOCKET_PING_INTERVAL_MS", time.Millisecond),
PongWaitInterval: util.MustGetDuration("SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS", time.Millisecond),
WriteWaitInterval: util.MustGetDuration("SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS", time.Millisecond),
PingerSize: util.MustGetInt("SERVER_WEBSOCKET_PINGER_SIZE"),
ConnIDHeader: util.MustGetString("SERVER_WEBSOCKET_CONN_ID_HEADER"),
ConnGroupHeader: util.MustGetString("SERVER_WEBSOCKET_CONN_GROUP_HEADER"),
ConnGroupDefault: util.MustGetString("SERVER_WEBSOCKET_CONN_GROUP_DEFAULT"),
}
}
20 changes: 10 additions & 10 deletions docs/concepts/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ Note: The internals of each of the components like channel size, buffer sizes, p

### Connections

Raccoon has long running persistent connections with the client. Once a client makes a http request with a websocket upgrade header, raccoon upgrades the http request to a websocket connection end of which a persistent connection is established with the client.
Raccoon has long-running persistent connections with the client. Once a client makes an HTTP request with a WebSocket upgrade header, raccoon upgrades the HTTP request to a WebSocket connection end of which a persistent connection is established with the client.

The following sequence outlines the connection handling by Raccoon.
The following sequence outlines the connection handling by Raccoon:

* Fetch connection id details from the initial request header based on the configured header name in `SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER`. The header name uniquely identifies a client. A client in this case can be the user in the app. There can be multiple connections from the same client. The no., of connections allowed per client is determined by `SERVER_WEBSOCKET_MAX_CONN`.
* Once the connection id is fetched, verify if the user has connection limit reached based on the configured `SERVER_WEBSOCKET_MAX_CONN`. For each client an internal map stores the `SERVER_WEBSOCKET_MAX_CONN` along with the connection objects. On reaching the max connections for the client, the connection is disconnected with an appropriate error message as a response proto.
* Upgrade the connection
* Add this user-id -> connection mapping
* Add ping/pong handlers on this connection, readtimeout deadline. More about these handlers in the following sections
* Handle the message and send it to the events-channel
* Remove connection/user when the client closes the connection
* Construct connection identifier from the request header. The identifier is constructed from the value of `SERVER_WEBSOCKET_CONN_ID_HEADER` header. For example, Raccoon is configured with `SERVER_WEBSOCKET_CONN_ID_HEADER=X-User-ID`. Raccoon will check the value of X-User-ID header and make it an identifier. Raccoon then uses this identifier to check if there is already an existing connection with the same identifier. If the same connection already exists, Raccoon will disconnect the connection with an appropriate error message as a response proto.
* Optionally, you can also configure `SERVER_WEBSOCKET_CONN_GROUP_HEADER` to support multi-tenancy. For example, you want to use an instance of Raccoon with multiple mobile clients. You can configure raccoon with `SERVER_WEBSOCKET_CONN_GROUP_HEADER=X-Mobile-Client`. Then, Raccoon will use the value of X-Mobile-Client along with X-User-ID as identifier. The uniqueness becomes the combination of X-User-ID value with X-Mobile-Client value. This way, Raccoon can maintain the same X-User-ID within different X-Mobile-Client.
* Verify if the total connections have reached the configured limit based on `SERVER_WEBSOCKET_MAX_CONN` configuration. On reaching the max connections, Raccoon disconnects the connection with an appropriate error message as a response proto.
* Upgrade the connection and persist the identifier.
* Add ping/pong handlers on this connection, read timeout deadline. More about these handlers in the following sections
* At this point, the connection is completely upgraded and Raccoon is ready to accept EventRequest. The handler handles each EventRequest by sending it to the events-channel to be asynchronously published by the publisher.
* When the connection is closed. Raccoon clean up the connection along with the identifier. The same identifier then can be reused on the upcoming connection.

### Event Delivery gurantee \(at-least-once for most time\)
### Event Delivery Gurantee \(at-least-once for most time\)

The server for the most times provide at-least-once event delivery gurantee.

Expand Down
2 changes: 1 addition & 1 deletion docs/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var (
url = "ws://localhost:8080/api/v1/events"
header = http.Header{
"x-user-id": []string{"1234"},
"X-User-ID": []string{"1234"},
}
pingInterval = 5 * time.Second
)
Expand Down
2 changes: 1 addition & 1 deletion docs/example/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ You are free to use any websocket client as long as it supports passing header.
var (
url = "ws://localhost:8080/api/v1/events"
header = http.Header{
"x-user-id": []string{"1234"},
"X-User-ID": []string{"1234"},
}
)

Expand Down
4 changes: 2 additions & 2 deletions docs/guides/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ metadata:
data:
METRIC_STATSD_ADDRESS: "host.docker.internal:8125"
PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS: "host.docker.internal:9093"
SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER: "x-user-id"
SERVER_WEBSOCKET_CONN_ID_HEADER: "X-User-ID"
SERVER_WEBSOCKET_PORT: "8080"
```

Expand Down Expand Up @@ -193,7 +193,7 @@ Followings are main configurations closely related to deployment that you need t
* [`EVENT_DISTRIBUTION_PUBLISHER_PATTERN`](https://odpf.gitbook.io/raccoon/reference/configurations#event_distribution_publisher_pattern)
* [`PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS`](https://odpf.gitbook.io/raccoon/reference/configurations#publisher_kafka_client_bootstrap_servers)
* [`METRIC_STATSD_ADDRESS`](https://odpf.gitbook.io/raccoon/reference/configurations#metric_statsd_address)
* [`SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER`](https://odpf.gitbook.io/raccoon/reference/configurations#server_websocket_conn_uniq_id_header)
* [`SERVER_WEBSOCKET_CONN_ID_HEADER`](https://odpf.gitbook.io/raccoon/reference/configurations#server_websocket_conn_id_header)

**TLS/HTTPS**

Expand Down
4 changes: 2 additions & 2 deletions docs/guides/publishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ The above response model is self-explanatory. Clients can choose to retry for er

## Headers

Raccoon service accepts headers to identify a user connection uniquely. The header name is made configurable as it enables clients to specify a header name that works for them. For, e.g. for a mobile app having a request header as `X-User-id` which identifies the user \(client\) connecting to Raccoon, can configure Raccoon service with the config set as below `SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER=X-User-id`
Raccoon service accepts headers to identify a user connection uniquely. The header name is made configurable as it enables clients to specify a header name that works for them. For, e.g. for a mobile app having a request header as `X-User-ID` which identifies the user \(client\) connecting to Raccoon, can configure Raccoon service with the config set as below `SERVER_WEBSOCKET_CONN_ID_HEADER=X-User-ID`. Optionally, `SERVER_WEBSOCKET_CONN_GROUP_HEADER` can also be configured to [support multitenancy](https://odpf.gitbook.io/raccoon/concepts/architecture#connections) such as multiple apps connecting to a single Raccoon instance.

Raccoon uses the config to fetch the header name and uses the value passed in the request header with this name, as the connection id. This header name uniquely identifies a client. A client, in this case, can be the user in the app.

The following header is a sample providing a user id: 654785432. Once the client initiates a WebSocket upgrade request over Raccoon, assuming the request is upgraded, and the client connection is established, Racoon accepts the header and extracts the user id to build a connection map. This map helps deduplicate connections for a user within the same raccoon instance.

```text
{
"X-User-id": "654785432"
"X-User-ID": "654785432"
}
```

Expand Down
2 changes: 1 addition & 1 deletion docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Run the following command. Make sure to set `PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SE

```bash
$ docker run -p 8080:8080 \
-e SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER=x-user-id \
-e SERVER_WEBSOCKET_CONN_ID_HEADER=X-User-ID \
-e PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS=host.docker.internal:9092 \
-e EVENT_DISTRIBUTION_PUBLISHER_PATTERN=clickstream-log \
odpf/raccoon:latest
Expand Down
18 changes: 16 additions & 2 deletions docs/reference/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,27 @@ Specify I/O buffer sizes in bytes: [Refer gorilla websocket API](https://pkg.go.
* Type: `Optional`
* Default value: `10240`

### `SERVER_WEBSOCKET_CONN_UNIQ_ID_HEADER`
### `SERVER_WEBSOCKET_CONN_ID_HEADER`

Unique identifier for the server to maintain the connection. A single uniq id can only connect once in a session. If, there is a subsequence connection with the same uniq id the connection will be rejected.

* Example value: `x-user-id`
* Example value: `X-User-ID`
* Type: `Required`

### `SERVER_WEBSOCKET_CONN_GROUP_HEADER`

Additional identifier for the server to maintain the connection. Value of the conn group header combined with user id will act as unique identifier instead of only user id. You can use this if you want to differentiate between user groups or clients e.g(mobile, web). The group names is used as conn_group tag in some of the metrics.

* Example value: `X-User-Group`
* Type: `Optional`

### `SERVER_WEBSOCKET_CONN_GROUP_DEFAULT`

Default connection group name. The default is fallback when `SERVER_WEBSOCKET_CONN_GROUP_HEADER` is not set or when the value of group header is empty. In case the connection group default is clashing with your actual group name, override this config.

* Default value: `--default--`
* Type: `Optional`

### `SERVER_WEBSOCKET_PING_INTERVAL_MS`

Interval of each ping to client. The interval is in seconds.
Expand Down
14 changes: 11 additions & 3 deletions docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,42 @@ Raccoon uses Statsd protocol as way to report metrics. You can capture the metri
Total ping that server fails to send

* Type: `Counting`
* Tags: `conn_group=*`

### `server_pong_failure_total`

Total pong that server fails to send

* Type: `Counting`
* Tags: `conn_group=*`

### `connections_count_current`

Number of alive connections

* Type: `Gauge`
* Tags: `conn_group=*`

### `user_connection_success_total`

Number of successful connections established to the server

* Type: `Count`
* Tags: `conn_group=*`

### `user_connection_failure_total`

Number of fail connections established to the server

* Type: `Count`
* Tags: `reason=ugfailure` `reason=exists` `reason=serverlimit`
* Tags: `reason=ugfailure` `reason=exists` `reason=serverlimit` `conn_group=*`

### `user_session_duration_milliseconds`

Duration of alive connection per session per connection

* Type: `Timing`
* Tags: `conn_group=*`

## Kafka Publisher

Expand All @@ -55,7 +60,7 @@ Duration of alive connection per session per connection
Number of delivered events to Kafka

* Type: `Count`
* Tags: `success=false` `success=true`
* Tags: `success=false` `success=true` `conn_group=*`

### `kafka_unknown_topic_failure_total`

Expand Down Expand Up @@ -164,6 +169,7 @@ Following metrics are event delivery reports. Each metrics reported at a differe
Total byte receieved in requests

* Type: `Count`
* Tags: `conn_group=*`

### `events_rx_total`

Expand All @@ -176,7 +182,7 @@ Number of events received in requests
Request count

* Type: `Count`
* Tags: `status=failed` `status=success` `reason=*`
* Tags: `status=failed` `status=success` `reason=*` `conn_group=*`

### `batch_idle_in_channel_milliseconds`

Expand All @@ -190,12 +196,14 @@ Duration from when the request is received to when the request is processed. Hig
Duration from the time request is sent to the time events are published. This metric is calculated per event by following formula `(PublishedTime - SentTime)/CountEvents`

* Type: `Timing`
* Tags: `conn_group=*`

### `server_processing_latency_milliseconds`

Duration from the time request is receieved to the time events are published. This metric is calculated per event by following formula`(PublishedTime - ReceievedTime)/CountEvents`

* Type: `Timing`
* Tags: `conn_group=*`

### `worker_processing_duration_milliseconds`

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ go 1.14

require (
github.com/confluentinc/confluent-kafka-go v1.4.2 // indirect
github.com/golang/protobuf v1.4.1
github.com/golang/protobuf v1.5.0
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.6.0
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb // indirect
google.golang.org/protobuf v1.22.0
google.golang.org/protobuf v1.26.0
gopkg.in/alexcesaro/statsd.v2 v2.0.0
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
)
Loading

0 comments on commit a562cd5

Please sign in to comment.