layout | |||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Distribution lightly queue stream collect service.
- Nats cluster needs to enable JetStream
- MongoDB recommends version >= 5.0 so that time series collections can be used
- Services and applications should work together the same nats tenant
A collector service that subscribes to stream queues and then writes to data.
{% hint style="info" %}
If you use the time series collection, you need to manually create a database and then add a data stream. Set the time series collection time field to timestamp
and metadata field to metaField
. Nats Stream naming COLLECT_${key}
is consistent with database name ${key}
.
{% endhint %}
The main container image is:
- ghcr.io/weplanx/collector:latest
- registry.cn-shenzhen.aliyuncs.com/weplanx/collector:latest
The case will use Kubernetes deployment orchestration, replicate deployment (modify as needed).
apiVersion: apps/v1
kind: Deployment
metadata:
name: collector
spec:
selector:
matchLabels:
app: collector
template:
metadata:
labels:
app: collector
spec:
containers:
- image: registry.cn-shenzhen.aliyuncs.com/weplanx/collector:latest
imagePullPolicy: Always
name: collector
env:
- name: MODE
value: release
- name: NATS_HOSTS
value: <*** your nats hosts ***>
- name: NATS_NKEY
value: <*** your nats nkey***>
- name: DATABASE_URL
value: <*** your mongodb url ***>
- name: DATABASE_NAME
value: <*** your mongodb name ***>
- Working mode, default
debug
- Nats connection host, use
,
split
- Nats NKEY authentication
- MongoDB connection address
- MongoDB database name
The client for managing collector configuration, data transmission, and dispatching, installed in the application:
go get github.com/weplanx/collector
// Create the nats client and then create the jetstream context
if js, err = nc.JetStream(nats.PublishAsyncMaxPending(256)); err != nil {
panic(err)
}
// Create the transfer client
if x, err = client.New(js); err != nil {
panic(err)
}
err := x.Set(context.TODO(), client.StreamOption{
Key: "system",
Description: "system example",
})
err := x.Update(context.TODO(), client.StreamOption{
Key: "system",
Description: "system example 123",
})
result, err := client.Get("system")
err := x.Publish(context.TODO(), "system", client.Payload{
Timestamp: time.Now(),
Data: map[string]interface{}{
"metadata": map[string]interface{}{
"method": method,
"path": string(c.Request.Path()),
"user_id": userId,
"client_ip": c.ClientIP(),
},
"params": string(c.Request.QueryString()),
"body": c.Request.Body(),
"status": c.Response.StatusCode(),
"user_agent": string(c.Request.Header.UserAgent()),
},
XData: map[string]interface{}{},
})
err := x.Remove("system")