Skip to content

Commit

Permalink
chore: remove pubsub code from extension file
Browse files Browse the repository at this point in the history
  • Loading branch information
Salaton committed Feb 17, 2023
1 parent 3c4bff1 commit e95f5da
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 165 deletions.
4 changes: 1 addition & 3 deletions pkg/clinical/application/common/testutils/testhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import (
dataset "github.com/savannahghi/clinical/pkg/clinical/infrastructure/datastore/cloudhealthcare/fhirdataset"
"github.com/savannahghi/clinical/pkg/clinical/infrastructure/services/openconceptlab"
"github.com/savannahghi/clinical/pkg/clinical/usecases"
"github.com/savannahghi/firebasetools"
)

// InitializeTestService sets up the structure that will be used by the usecase layer for
// integration tests
func InitializeTestService(ctx context.Context) (usecases.Interactor, error) {
fc := &firebasetools.FirebaseClient{}
baseExtension := extensions.NewBaseExtensionImpl(fc)
baseExtension := extensions.NewBaseExtensionImpl()
repo := dataset.NewFHIRRepository()
fhir := fhir.NewFHIRStoreImpl(repo)
ocl := openconceptlab.NewServiceOCL()
Expand Down
138 changes: 2 additions & 136 deletions pkg/clinical/application/extensions/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"net/http"

"cloud.google.com/go/pubsub"
"github.com/savannahghi/converterandformatter"
"github.com/savannahghi/firebasetools"
"github.com/savannahghi/interserviceclient"
"github.com/savannahghi/profileutils"
"github.com/savannahghi/pubsubtools"
"github.com/savannahghi/serverutils"
)

Expand Down Expand Up @@ -49,53 +47,15 @@ type BaseExtension interface {
source interface{},
status int,
)

// PubSub
EnsureTopicsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicIDs []string,
) error
NamespacePubsubIdentifier(
serviceName string,
topicID string,
environment string,
version string,
) string
PublishToPubsub(
ctx context.Context,
pubsubClient *pubsub.Client,
topicID string,
environment string,
serviceName string,
version string,
payload []byte,
) error
EnsureSubscriptionsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicSubscriptionMap map[string]string,
callbackURL string,
) error
SubscriptionIDs(topicIDs []string) map[string]string
PubSubHandlerPath() string
VerifyPubSubJWTAndDecodePayload(
w http.ResponseWriter,
r *http.Request,
) (*pubsubtools.PubSubPayload, error)
GetPubSubTopic(m *pubsubtools.PubSubPayload) (string, error)
}

// BaseExtensionImpl ...
type BaseExtensionImpl struct {
fc firebasetools.IFirebaseClient
}

// NewBaseExtensionImpl ...
func NewBaseExtensionImpl(fc firebasetools.IFirebaseClient) BaseExtension {
return &BaseExtensionImpl{
fc: fc,
}
func NewBaseExtensionImpl() BaseExtension {
return &BaseExtensionImpl{}
}

// GetLoggedInUser retrieves logged in user information
Expand Down Expand Up @@ -143,97 +103,3 @@ func (b *BaseExtensionImpl) WriteJSONResponse(
func (b *BaseExtensionImpl) ErrorMap(err error) map[string]string {
return serverutils.ErrorMap(err)
}

// EnsureTopicsExist creates the topic(s) in the suppplied list if they do not
// already exist.
func (b *BaseExtensionImpl) EnsureTopicsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicIDs []string,
) error {
return pubsubtools.EnsureTopicsExist(ctx, pubsubClient, topicIDs)
}

// NamespacePubsubIdentifier uses the service name, environment and version to
// create a "namespaced" pubsub identifier. This could be a topicID or
// subscriptionID.
func (b *BaseExtensionImpl) NamespacePubsubIdentifier(
serviceName string,
topicID string,
environment string,
version string,
) string {
return pubsubtools.NamespacePubsubIdentifier(
serviceName,
topicID,
environment,
version,
)
}

// PublishToPubsub sends the supplied payload to the indicated topic
func (b *BaseExtensionImpl) PublishToPubsub(
ctx context.Context,
pubsubClient *pubsub.Client,
topicID string,
environment string,
serviceName string,
version string,
payload []byte,
) error {
return pubsubtools.PublishToPubsub(
ctx,
pubsubClient,
topicID,
environment,
serviceName,
version,
payload,
)
}

// EnsureSubscriptionsExist ensures that the subscriptions named in the supplied
// topic:subscription map exist. If any does not exist, it is created.
func (b *BaseExtensionImpl) EnsureSubscriptionsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicSubscriptionMap map[string]string,
callbackURL string,
) error {
return pubsubtools.EnsureSubscriptionsExist(
ctx,
pubsubClient,
topicSubscriptionMap,
callbackURL,
)
}

// SubscriptionIDs returns a map of topic IDs to subscription IDs
func (b *BaseExtensionImpl) SubscriptionIDs(topicIDs []string) map[string]string {
return pubsubtools.SubscriptionIDs(topicIDs)
}

// PubSubHandlerPath returns pubsub hander path `/pubsub`
func (b *BaseExtensionImpl) PubSubHandlerPath() string {
return pubsubtools.PubSubHandlerPath
}

// VerifyPubSubJWTAndDecodePayload confirms that there is a valid Google signed
// JWT and decodes the pubsub message payload into a struct.
//
// It's use will simplify & shorten the handler funcs that process Cloud Pubsub
// push notifications.
func (b *BaseExtensionImpl) VerifyPubSubJWTAndDecodePayload(
w http.ResponseWriter,
r *http.Request,
) (*pubsubtools.PubSubPayload, error) {
return pubsubtools.VerifyPubSubJWTAndDecodePayload(
w,
r,
)
}

// GetPubSubTopic retrieves a pubsub topic from a pubsub payload.
func (b *BaseExtensionImpl) GetPubSubTopic(m *pubsubtools.PubSubPayload) (string, error) {
return pubsubtools.GetPubSubTopic(m)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/savannahghi/clinical/pkg/clinical/presentation/interactor"
"github.com/savannahghi/converterandformatter"
"github.com/savannahghi/enumutils"
"github.com/savannahghi/firebasetools"
"github.com/savannahghi/interserviceclient"
"github.com/savannahghi/scalarutils"
"github.com/segmentio/ksuid"
Expand Down Expand Up @@ -76,8 +75,7 @@ func InitializeTestService(ctx context.Context, infra infrastructure.Infrastruct
}

func InitializeTestInfrastructure(ctx context.Context) (infrastructure.Infrastructure, error) {
fc := &firebasetools.FirebaseClient{}
baseExtension := extensions.NewBaseExtensionImpl(fc)
baseExtension := extensions.NewBaseExtensionImpl()
repo := dataset.NewFHIRRepository()
fhir := fhir.NewFHIRStoreImpl(repo)

Expand Down
13 changes: 7 additions & 6 deletions pkg/clinical/infrastructure/services/pubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/savannahghi/clinical/pkg/clinical/application/extensions"
"github.com/savannahghi/clinical/pkg/clinical/infrastructure"
"github.com/savannahghi/clinical/pkg/clinical/usecases/clinical"
"github.com/savannahghi/pubsubtools"
"github.com/savannahghi/serverutils"
)

Expand Down Expand Up @@ -93,7 +94,7 @@ func NewServicePubSubMessaging(
// <service name>-<topicName>-<environment>-v1
func (ps ServicePubSubMessaging) AddPubSubNamespace(topicName, ServiceName string) string {
environment := serverutils.GetRunningEnvironment()
return ps.baseExt.NamespacePubsubIdentifier(
return pubsubtools.NamespacePubsubIdentifier(
ServiceName,
topicName,
environment,
Expand Down Expand Up @@ -125,7 +126,7 @@ func (ps ServicePubSubMessaging) PublishToPubsub(
if err != nil {
return err
}
return ps.baseExt.PublishToPubsub(
return pubsubtools.PublishToPubsub(
ctx,
ps.client,
topicID,
Expand All @@ -142,7 +143,7 @@ func (ps ServicePubSubMessaging) EnsureTopicsExist(
ctx context.Context,
topicIDs []string,
) error {
return ps.baseExt.EnsureTopicsExist(
return pubsubtools.EnsureTopicsExist(
ctx,
ps.client,
topicIDs,
Expand All @@ -162,10 +163,10 @@ func (ps ServicePubSubMessaging) EnsureSubscriptionsExist(
callbackURL := fmt.Sprintf(
"%s%s",
hostName,
ps.baseExt.PubSubHandlerPath(),
pubsubtools.PubSubHandlerPath,
)

return ps.baseExt.EnsureSubscriptionsExist(
return pubsubtools.EnsureSubscriptionsExist(
ctx,
ps.client,
ps.SubscriptionIDs(),
Expand All @@ -175,5 +176,5 @@ func (ps ServicePubSubMessaging) EnsureSubscriptionsExist(

// SubscriptionIDs returns a map of topic IDs to subscription IDs
func (ps ServicePubSubMessaging) SubscriptionIDs() map[string]string {
return ps.baseExt.SubscriptionIDs(ps.TopicIDs())
return pubsubtools.SubscriptionIDs(ps.TopicIDs())
}
4 changes: 1 addition & 3 deletions pkg/clinical/infrastructure/services/pubsub/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ import (
"github.com/savannahghi/clinical/pkg/clinical/infrastructure/services/openconceptlab"
pubsubmessaging "github.com/savannahghi/clinical/pkg/clinical/infrastructure/services/pubsub"
"github.com/savannahghi/clinical/pkg/clinical/usecases"
"github.com/savannahghi/firebasetools"
"github.com/savannahghi/serverutils"
)

func InitializeTestPubSub(t *testing.T) (*pubsubmessaging.ServicePubSubMessaging, error) {
fc := &firebasetools.FirebaseClient{}
ctx := context.Background()
projectID, err := serverutils.GetEnvVar(serverutils.GoogleCloudProjectIDEnvVarName)
if err != nil {
Expand All @@ -36,7 +34,7 @@ func InitializeTestPubSub(t *testing.T) (*pubsubmessaging.ServicePubSubMessaging
}

// Initialize base (common) extension
baseExtension := extensions.NewBaseExtensionImpl(fc)
baseExtension := extensions.NewBaseExtensionImpl()
repo := dataset.NewFHIRRepository()
fhir := fhir.NewFHIRStoreImpl(repo)
ocl := openconceptlab.NewServiceOCL()
Expand Down
5 changes: 3 additions & 2 deletions pkg/clinical/infrastructure/services/pubsub/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/savannahghi/clinical/pkg/clinical/domain"
"github.com/savannahghi/clinical/pkg/clinical/infrastructure/services/openconceptlab"
"github.com/savannahghi/errorcodeutil"
"github.com/savannahghi/pubsubtools"
"github.com/savannahghi/scalarutils"
"github.com/savannahghi/serverutils"
)
Expand All @@ -31,7 +32,7 @@ func (ps ServicePubSubMessaging) ReceivePubSubPushMessages(
r *http.Request,
) {
ctx := r.Context()
message, err := ps.baseExt.VerifyPubSubJWTAndDecodePayload(w, r)
message, err := pubsubtools.VerifyPubSubJWTAndDecodePayload(w, r)
if err != nil {
serverutils.WriteJSONResponse(w, errorcodeutil.CustomError{
Err: err,
Expand All @@ -40,7 +41,7 @@ func (ps ServicePubSubMessaging) ReceivePubSubPushMessages(
return
}

topicID, err := ps.baseExt.GetPubSubTopic(message)
topicID, err := pubsubtools.GetPubSubTopic(message)
if err != nil {
serverutils.WriteJSONResponse(w, errorcodeutil.CustomError{
Err: err,
Expand Down
5 changes: 1 addition & 4 deletions pkg/clinical/presentation/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/savannahghi/clinical/pkg/clinical/presentation/graph/generated"
"github.com/savannahghi/clinical/pkg/clinical/presentation/rest"
"github.com/savannahghi/clinical/pkg/clinical/usecases"
"github.com/savannahghi/firebasetools"
"github.com/savannahghi/serverutils"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -106,9 +105,7 @@ func PrepareServer(

// Router sets up the ginContext router
func Router(ctx context.Context) (*mux.Router, error) {
fc := &firebasetools.FirebaseClient{}

baseExtension := extensions.NewBaseExtensionImpl(fc)
baseExtension := extensions.NewBaseExtensionImpl()

projectID := serverutils.MustGetEnvVar(serverutils.GoogleCloudProjectIDEnvVarName)
pubSubClient, err := pubsub.NewClient(ctx, projectID)
Expand Down
4 changes: 1 addition & 3 deletions pkg/clinical/usecases/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/savannahghi/clinical/pkg/clinical/usecases"
"github.com/savannahghi/clinical/pkg/clinical/usecases/ocl"
oclMock "github.com/savannahghi/clinical/pkg/clinical/usecases/ocl/mock"
"github.com/savannahghi/firebasetools"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -78,8 +77,7 @@ func InitializeTestService(ctx context.Context, infra infrastructure.Infrastruct
}

func InitializeTestInfrastructure(ctx context.Context) (infrastructure.Infrastructure, error) {
fc := &firebasetools.FirebaseClient{}
baseExtension := extensions.NewBaseExtensionImpl(fc)
baseExtension := extensions.NewBaseExtensionImpl()
repo := dataset.NewFHIRRepository()
fhir := fhir.NewFHIRStoreImpl(repo)
ocl := openconceptlab.NewServiceOCL()
Expand Down
7 changes: 2 additions & 5 deletions pkg/clinical/usecases/ocl/ocl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
dataset "github.com/savannahghi/clinical/pkg/clinical/infrastructure/datastore/cloudhealthcare/fhirdataset"
"github.com/savannahghi/clinical/pkg/clinical/infrastructure/services/openconceptlab"
"github.com/savannahghi/clinical/pkg/clinical/presentation/interactor"
"github.com/savannahghi/firebasetools"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -49,8 +48,7 @@ func TestMain(m *testing.M) {
}

func InitializeTestService(ctx context.Context) (interactor.Usecases, error) {
fc := &firebasetools.FirebaseClient{}
baseExtension := extensions.NewBaseExtensionImpl(fc)
baseExtension := extensions.NewBaseExtensionImpl()
repo := dataset.NewFHIRRepository()
fhir := fhir.NewFHIRStoreImpl(repo)
ocl := openconceptlab.NewServiceOCL()
Expand All @@ -64,8 +62,7 @@ func InitializeTestService(ctx context.Context) (interactor.Usecases, error) {
}

func InitializeTestInfrastructure(ctx context.Context) (infrastructure.Infrastructure, error) {
fc := &firebasetools.FirebaseClient{}
baseExtension := extensions.NewBaseExtensionImpl(fc)
baseExtension := extensions.NewBaseExtensionImpl()
repo := dataset.NewFHIRRepository()
fhir := fhir.NewFHIRStoreImpl(repo)

Expand Down

0 comments on commit e95f5da

Please sign in to comment.