Skip to content

Commit

Permalink
Merge pull request #448 from streamdal/dselans/wasm-dedupe-debug
Browse files Browse the repository at this point in the history
Dselans/wasm dedupe debug
  • Loading branch information
dselans committed May 19, 2024
2 parents d49f254 + b540f24 commit 8d7cd06
Show file tree
Hide file tree
Showing 9 changed files with 541 additions and 206 deletions.
45 changes: 23 additions & 22 deletions apps/server/services/bus/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/streamdal/streamdal/libs/protos/build/go/protos"

"github.com/streamdal/streamdal/apps/server/services/store"
Expand Down Expand Up @@ -420,31 +419,33 @@ func (b *Bus) handleNewAudienceRequest(ctx context.Context, req *protos.NewAudie
return errors.Wrap(err, "error getting config by audience")
}

// If this is NOT a brand new audience, nothing to do
// If this is an existing audience with a pipeline config - nothing to do
if len(existingPipelines.Configs) > 0 {
llog.Debugf("audience '%s' already has pipeline configuration - nothing to do", util.AudienceToStr(req.Audience))
return nil
}

// Get all session IDs on this node
sessionIDs, err := b.options.Store.GetSessionIDsByAudience(ctx, req.Audience, b.options.NodeName)
// No point in doing anything about this session id if it's not on our node

// Ensure that this session id is on this node
exists, err := b.options.Store.SessionIDExistsOnNode(ctx, req.SessionId, b.options.NodeName)
if err != nil {
llog.Errorf("error getting session ids by audience '%s' from store: %v", req.Audience, err)
return errors.Wrapf(err, "error getting session ids by audience '%s' from store", req.Audience)
llog.Errorf("error checking if session id '%s' exists on node '%s': %v", req.SessionId, b.options.NodeName, err)
return errors.Wrapf(err, "error checking if session id '%s' exists on node '%s'", req.SessionId, b.options.NodeName)
}

if len(sessionIDs) == 0 {
llog.Debugf("no active sessions found for audience '%s' on node '%s' - skipping", req.Audience, b.options.NodeName)
if !exists {
llog.Debugf("session id '%s' not found on node '%s' - nothing to do in handleNewAudienceRequest()", req.SessionId, b.options.NodeName)
return nil
}

// Send SetPipelines command to each session ID
if _, err := b.sendSetPipelinesCommand(ctx, req.Audience, make([]*protos.Pipeline, 0), sessionIDs...); err != nil {
// Session ID is on node - send SetPipelines command
if _, err := b.sendSetPipelinesCommand(ctx, req.Audience, make([]*protos.Pipeline, 0), req.SessionId); err != nil {
llog.Errorf("unable to send SetPipelines command: %v", err)
return errors.Wrap(err, "error sending SetPipelines command")
}

llog.Debugf("sent SetPipelineCommands for '%d' sessions", len(sessionIDs))
llog.Debugf("sent SetPipelineCommands for session id '%s'", req.SessionId)

return nil
}
Expand All @@ -471,6 +472,17 @@ func (b *Bus) sendSetPipelinesCommand(

var sent int

cmd := &protos.Command{
Audience: aud,
Command: &protos.Command_SetPipelines{
SetPipelines: &protos.SetPipelinesCommand{
Pipelines: updatedPipelines,
},
},
}

cmd.GetSetPipelines().WasmModules = util.GenerateWasmMapping(cmd)

for _, sessionID := range sessionIDs {
ch := b.options.Cmd.GetChannel(sessionID)
if ch == nil {
Expand All @@ -483,17 +495,6 @@ func (b *Bus) sendSetPipelinesCommand(
// TODO: Need to investigate and determine if this could panic as a
// result an SDK disconnecting and having the channel get closed. If so,
// need to implement a more reliable way to push the command.
cmd := &protos.Command{
Audience: aud,
Command: &protos.Command_SetPipelines{
SetPipelines: &protos.SetPipelinesCommand{
Pipelines: updatedPipelines,
},
},
}

cmd.GetSetPipelines().WasmModules = util.GenerateWasmMapping(cmd)

ch <- cmd

sent += 1
Expand Down
32 changes: 31 additions & 1 deletion apps/server/services/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ type IStore interface {

// GetPipelinesByWasmID returns a slice of pipelines that use a given Wasm ID
GetPipelinesByWasmID(ctx context.Context, wasmID string) ([]*protos.Pipeline, error)

// SessionIDExistsOnNode checks given session ID and node name against live
// entries in Redis and returns true if found. This is used by
SessionIDExistsOnNode(ctx context.Context, sessionID, node string) (bool, error)
}

// Option contains settings that can influence read, write or delete operations.
Expand Down Expand Up @@ -397,7 +401,33 @@ func (s *Store) GetAudiencesByPipelineID(ctx context.Context, pipelineID string)
return audiences, nil
}

// TODO: Needs tests
func (s *Store) SessionIDExistsOnNode(ctx context.Context, sessionID, node string) (bool, error) {
llog := s.log.WithField("method", "GetSessionIDsByAudience")
llog.Debug("received request to get session IDs by audience")

if sessionID == "" {
return false, errors.New("session ID is required")
}

if node == "" {
return false, errors.New("node name is required")
}

liveEntries, err := s.GetLive(ctx)
if err != nil {
llog.Errorf("unable to fetch live entries: %s", err)
return false, errors.Wrap(err, "error fetching live entries")
}

for _, l := range liveEntries {
if l.SessionID == sessionID && l.NodeName == node {
return true, nil
}
}

return false, nil
}

func (s *Store) GetSessionIDsByAudience(ctx context.Context, aud *protos.Audience, nodeName ...string) ([]string, error) {
llog := s.log.WithField("method", "GetSessionIDsByAudience")
llog.Debug("received request to get session IDs by audience")
Expand Down
1 change: 1 addition & 0 deletions apps/server/test-utils/demo-client/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build
vendor
.idea
33 changes: 26 additions & 7 deletions apps/server/test-utils/demo-client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ run2:
run3: description = Run billing-service
run3:
go run *.go -d \
--quiet \
--output-level 1 \
--output-type plaintext \
--message-rate 1,4 \
--service-name "billing-service" \
--operation-type 1 \
Expand All @@ -72,7 +73,8 @@ run3:
run4: description = Run billing-service
run4:
go run *.go -d \
--quiet \
--output-level 1 \
--output-type plaintext \
--message-rate 3,5 \
--service-name "billing-service" \
--operation-type 2 \
Expand All @@ -85,7 +87,7 @@ run4:
run5: description = Run welcome-service
run5:
go run *.go -d \
--quiet \
--output-level 1 \
--message-rate 1,5 \
--service-name "welcome-service" \
--operation-type 1 \
Expand All @@ -98,7 +100,7 @@ run5:
run6: description = Run welcome-service
run6:
go run *.go -d \
--quiet \
--output-level 1 \
--message-rate 1,3 \
--service-name "welcome-service" \
--operation-type 2 \
Expand All @@ -124,7 +126,7 @@ run7:
run/async: description = Run signup-service-async service configured with async
run/async:
go run *.go -d \
--quiet \
--output-level 1 \
--async \
--display-exec-time \
--message-rate 10,20 \
Expand All @@ -139,7 +141,7 @@ run/async:
run/sampling: description = Run signup-service-async service configured with sampling rate of 1 message/sec
run/sampling:
go run *.go -d \
--quiet \
--output-level 1 \
--sampling-rate 1 \
--display-exec-time \
--message-rate 10,100 \
Expand All @@ -154,7 +156,7 @@ run/sampling:
run/async-sampling: description = Run signup-service-async service configured with async and sampling
run/async-sampling:
go run *.go -d \
--quiet \
--output-level 1 \
--async \
--sampling-rate 1 \
--display-exec-time \
Expand Down Expand Up @@ -192,6 +194,23 @@ run/plaintext2:
--data-source-type file \
--data-source-file ./assets/plaintext-logs.real.txt

.PHONY: run/wasm-dedupe-debug
run/wasm-dedupe-debug: description = Used for troubleshooting wasm dedupe issues
run/wasm-dedupe-debug:
go run *.go -d \
--message-rate 1 \
--num-instances 10 \
--inject-logger \
--service-name "dedupe-svc" \
--operation-type 2 \
--operation-name "processor" \
--component-name "logs" \
--display-exec-time \
--data-source-type file \
--output-level 3 \
--output-type plaintext \
--data-source-file ./assets/plaintext-logs.txt

### Build

.PHONY: build
Expand Down
9 changes: 9 additions & 0 deletions apps/server/test-utils/demo-client/assets/plaintext-logs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,12 @@
2024-04-29T15:59:41.60221514Z stdout Found user with email test@streamdal.com
2024-04-29T15:59:41.60221515Z stdout Exporting data {"user": {"ccnum": "4111111111111111"}} to billing service
2024-04-29T15:59:41.60221516Z stdout Run Completed
2024-05-16 13:45:22 - User logged in with email: example@email.com
2024-05-16 13:45:22 - User logged in with AWS key ID: AKIA1234567890
2024-05-16 13:45:22 - User logged in with JWT: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
2024-04-23T14:13:34.201893177Z stdout - String with PII SSN: 123-45-6789 <- this should get masked
2024-04-23T14:13:34.201893177Z stdout - String with PII UK NINO: AB123456C <- this should get masked
2024-04-23T14:13:34.201893177Z stdout - String with PII VIN number: 1G1YY26U055136480 <- this should get masked
2024-04-23T14:13:34.201893177Z stdout - String with PII Canada SIN: 123-456-789 <- this should get masked
2024-04-23T14:13:34.201893177Z stdout - String with PII credit card: 1234 5678 9012 3456 <- this should get masked
2024-04-23T14:13:34.201893177Z stdout - String with PII SSN: 123-45-6789 AND AGAIN 123-45-6789 <- this should get masked
40 changes: 21 additions & 19 deletions apps/server/test-utils/demo-client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,27 @@ const (
)

type Config struct {
ServiceName string `kong:"help='Service name',required,default='demo-client'"`
OperationType int64 `kong:"help='Audience component name (1 = Consumer, 2 = Producer)',enum='1,2',default='1',required"`
OperationName string `kong:"help='Audience operation name',required,default='demo-operation'"`
ComponentName string `kong:"help='Audience component name',required,default='demo-component'"`
NumInstances int `kong:"help='Number of instances of SDK to register',required,default='1'"`
ReconnectRandom bool `kong:"help='Randomly disconnects and reconnects to server (useful for testing concurrency in server)',default='false'"`
ReconnectInterval int `kong:"help='Seconds between reconnects (rand(0..ReconnectInterval) if ReconnectRandom is true)',default='0'"`
MessageRate []int `kong:"help='Messages to send per second (can specify range as X,Y)',required,default='1'"`
DataSourceType string `kong:"help='Type of data source this client will use', enum='none,file',default='none'"`
DataSourceFile *os.File `kong:"help='File that contains sample data - used only when DataSourceType=file'"`
DisplayExecTime bool `kong:"help='Display execution time for each message',default='false'"`
Async bool `kong:"help='Use async mode in go-sdk',default='false'"`
SamplingRate int `kong:"help='Enable sampling and sample rate in go-sdk',default='0'"`

ServerAddress string `kong:"help='Streamdal gRPC server address',default='localhost:8082',required"`
ServerToken string `kong:"help='Streamdal server token',default='1234',required"`
Debug bool `kong:"help='Enable debug output',short='d'"`
Quiet bool `kong:"help='Disable showing pre/post output',short='q'"`
InjectLogger bool `kong:"help='Inject logger into SDK',default='false'"`
ServerAddress string `kong:"help='Streamdal gRPC server address',default='localhost:8082',required"`
ServerToken string `kong:"help='Streamdal server token',default='1234',required"`
ServiceName string `kong:"help='Service name',required,default='demo-client'"`
OperationType int64 `kong:"help='Audience component name (1 = Consumer, 2 = Producer)',enum='1,2',default='1',required"`
OperationName string `kong:"help='Audience operation name',required,default='demo-operation'"`
ComponentName string `kong:"help='Audience component name',required,default='demo-component'"`
NumInstances int `kong:"help='Number of instances of SDK to register',required,default='1'"`
ReconnectRandom bool `kong:"help='Randomly disconnects and reconnects to server (useful for testing concurrency in server)',default='false'"`
ReconnectInterval int `kong:"help='Seconds between reconnects (rand(0..ReconnectInterval) if ReconnectRandom is true)',default='0'"`
MessageRate []int `kong:"help='Messages to send per second (can specify range as X,Y)',required,default='1'"`
DataSourceType string `kong:"help='Type of data source this client will use', enum='none,file',default='none'"`
DataSourceFile *os.File `kong:"help='File that contains sample data - used only when DataSourceType=file'"`
DisplayExecTime bool `kong:"help='Display execution time for each message',default='false'"`
Async bool `kong:"help='Use async mode in go-sdk',default='false'"`
SamplingRate int `kong:"help='Enable sampling and sample rate in go-sdk',default='0'"`
OutputLevel int `kong:"help='Amount of output displayed by demo client',enum='0,1,2,3',default='2'"`
OutputType string `kong:"help='Type of output to display',enum='plaintext,tabular,json',default='tabular'"`
DisableColor bool `kong:"help='Disable color output',default='false'"`
InjectLogger bool `kong:"help='Inject logger into SDK',default='false'"`
UniqueOperationName bool `kong:"help='Use unique operation name for each instance (adds -$i suffix)',default='false'"`
Debug bool `kong:"help='Enable debug log output',short='d'"`

// Internal bits
Ctx *kong.Context `kong:"-"`
Expand Down

0 comments on commit 8d7cd06

Please sign in to comment.