Skip to content

Commit

Permalink
feat: setup pubsub service
Browse files Browse the repository at this point in the history
  • Loading branch information
Salaton committed Mar 21, 2022
1 parent 079c849 commit 23f5049
Show file tree
Hide file tree
Showing 12 changed files with 852 additions and 70 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ env:
GOOGLE_CLOUD_PROJECT: ${{ secrets.GOOGLE_CLOUD_PROJECT }}
FIREBASE_WEB_API_KEY: ${{ secrets.FIREBASE_WEB_API_KEY }}
ENVIRONMENT: ${{ secrets.ENVIRONMENT }}
SERVICE_HOST: ${{ secrets.SERVICE_HOST }}
ROOT_COLLECTION_SUFFIX: ${{ secrets.ROOT_COLLECTION_SUFFIX }}
CLOUD_HEALTH_PUBSUB_TOPIC: ${{ secrets.CLOUD_HEALTH_PUBSUB_TOPIC }}
CLOUD_HEALTH_DATASET_ID: ${{ secrets.CLOUD_HEALTH_DATASET_ID }}
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jobs:
--set-env-vars "CLOUD_HEALTH_DATASET_ID=${{ secrets.CLOUD_HEALTH_DATASET_ID }}" \
--set-env-vars "CLOUD_HEALTH_FHIRSTORE_ID=${{ secrets.CLOUD_HEALTH_FHIRSTORE_ID }}" \
--set-env-vars "OPENCONCEPTLAB_TOKEN=${{ secrets.OPENCONCEPTLAB_TOKEN }}" \
--set-env-vars "SERVICE_HOST=${{ secrets.SERVICE_HOST }}" \
--set-env-vars "OPENCONCEPTLAB_API_URL=${{ secrets.OPENCONCEPTLAB_API_URL }}"
Expand Down Expand Up @@ -139,6 +140,7 @@ jobs:
--set-env-vars "CLOUD_HEALTH_DATASET_ID=${{ secrets.CLOUD_HEALTH_DATASET_ID }}" \
--set-env-vars "CLOUD_HEALTH_FHIRSTORE_ID=${{ secrets.CLOUD_HEALTH_FHIRSTORE_ID }}" \
--set-env-vars "OPENCONCEPTLAB_TOKEN=${{ secrets.OPENCONCEPTLAB_TOKEN }}" \
--set-env-vars "SERVICE_HOST=${{ secrets.SERVICE_HOST }}" \
--set-env-vars "OPENCONCEPTLAB_API_URL=${{ secrets.OPENCONCEPTLAB_API_URL }}"
deploy_to_prod:
Expand Down Expand Up @@ -194,6 +196,7 @@ jobs:
--set-env-vars "CLOUD_HEALTH_DATASET_ID=${{ secrets.CLOUD_HEALTH_DATASET_ID }}" \
--set-env-vars "CLOUD_HEALTH_FHIRSTORE_ID=${{ secrets.CLOUD_HEALTH_FHIRSTORE_ID }}" \
--set-env-vars "OPENCONCEPTLAB_TOKEN=${{ secrets.OPENCONCEPTLAB_TOKEN }}" \
--set-env-vars "SERVICE_HOST=${{ secrets.SERVICE_HOST }}" \
--set-env-vars "OPENCONCEPTLAB_API_URL=${{ secrets.OPENCONCEPTLAB_API_URL }}"
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ RUN apk add --no-cache ca-certificates
COPY --from=builder /app/server /server
COPY --from=builder /app/deps.yaml /deps.yaml

COPY --from=builder /app/pkg/clinical/pkg/clinical/application/authorization/rbac_model.conf /app/pkg/clinical/pkg/clinical/application/authorization/rbac_model.conf
COPY --from=builder /app/pkg/clinical/pkg/clinical/application/authorization/data/rbac_policy.csv /app/pkg/clinical/pkg/clinical/application/authorization/data/rbac_policy.csv
COPY --from=builder /app/pkg/clinical/application/authorization/rbac_model.conf /app/pkg/clinical/application/authorization/rbac_model.conf
COPY --from=builder /app/pkg/clinical/application/authorization/data/rbac_policy.csv /app/pkg/clinical/application/authorization/data/rbac_policy.csv

# Run the web service on container startup.
CMD ["/server"]
15 changes: 10 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ module github.com/savannahghi/clinical
go 1.16

require (
cloud.google.com/go v0.87.0 // indirect
cloud.google.com/go/errorreporting v0.2.0 // indirect
cloud.google.com/go/firestore v1.5.0
cloud.google.com/go/monitoring v1.4.0 // indirect
cloud.google.com/go/profiler v0.2.0 // indirect
cloud.google.com/go/pubsub v1.19.0
cloud.google.com/go/trace v1.2.0 // indirect
firebase.google.com/go v3.13.0+incompatible
github.com/99designs/gqlgen v0.13.0
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
Expand All @@ -21,21 +25,22 @@ require (
github.com/rs/xid v1.3.0
github.com/savannahghi/converterandformatter v0.0.11
github.com/savannahghi/enumutils v0.0.3
github.com/savannahghi/errorcodeutil v0.0.5 // indirect
github.com/savannahghi/feedlib v0.0.6 // indirect
github.com/savannahghi/firebasetools v0.0.15
github.com/savannahghi/interserviceclient v0.0.18
github.com/savannahghi/onboarding v0.0.22
github.com/savannahghi/profileutils v0.0.17
github.com/savannahghi/pubsubtools v0.0.2
github.com/savannahghi/scalarutils v0.0.4
github.com/savannahghi/serverutils v0.0.4
github.com/segmentio/ksuid v1.0.4
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/tj/assert v0.0.3
github.com/vektah/gqlparser/v2 v2.1.0
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/api v0.50.0
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
google.golang.org/api v0.70.0
gopkg.in/yaml.v2 v2.4.0
)
132 changes: 116 additions & 16 deletions go.sum

Large diffs are not rendered by default.

131 changes: 131 additions & 0 deletions pkg/clinical/application/extensions/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"net/http"

"cloud.google.com/go/pubsub"
"github.com/savannahghi/converterandformatter"
"github.com/savannahghi/firebasetools"
"github.com/savannahghi/interserviceclient"
"github.com/savannahghi/profileutils"
"github.com/savannahghi/pubsubtools"
"github.com/savannahghi/serverutils"
)

Expand Down Expand Up @@ -47,6 +49,41 @@ type BaseExtension interface {
source interface{},
status int,
)

// PubSub
EnsureTopicsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicIDs []string,
) error
NamespacePubsubIdentifier(
serviceName string,
topicID string,
environment string,
version string,
) string
PublishToPubsub(
ctx context.Context,
pubsubClient *pubsub.Client,
topicID string,
environment string,
serviceName string,
version string,
payload []byte,
) error
EnsureSubscriptionsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicSubscriptionMap map[string]string,
callbackURL string,
) error
SubscriptionIDs(topicIDs []string) map[string]string
PubSubHandlerPath() string
VerifyPubSubJWTAndDecodePayload(
w http.ResponseWriter,
r *http.Request,
) (*pubsubtools.PubSubPayload, error)
GetPubSubTopic(m *pubsubtools.PubSubPayload) (string, error)
}

// BaseExtensionImpl ...
Expand Down Expand Up @@ -106,3 +143,97 @@ func (b *BaseExtensionImpl) WriteJSONResponse(
func (b *BaseExtensionImpl) ErrorMap(err error) map[string]string {
return serverutils.ErrorMap(err)
}

// EnsureTopicsExist creates the topic(s) in the suppplied list if they do not
// already exist.
func (b *BaseExtensionImpl) EnsureTopicsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicIDs []string,
) error {
return pubsubtools.EnsureTopicsExist(ctx, pubsubClient, topicIDs)
}

// NamespacePubsubIdentifier uses the service name, environment and version to
// create a "namespaced" pubsub identifier. This could be a topicID or
// subscriptionID.
func (b *BaseExtensionImpl) NamespacePubsubIdentifier(
serviceName string,
topicID string,
environment string,
version string,
) string {
return pubsubtools.NamespacePubsubIdentifier(
serviceName,
topicID,
environment,
version,
)
}

// PublishToPubsub sends the supplied payload to the indicated topic
func (b *BaseExtensionImpl) PublishToPubsub(
ctx context.Context,
pubsubClient *pubsub.Client,
topicID string,
environment string,
serviceName string,
version string,
payload []byte,
) error {
return pubsubtools.PublishToPubsub(
ctx,
pubsubClient,
topicID,
environment,
serviceName,
version,
payload,
)
}

// EnsureSubscriptionsExist ensures that the subscriptions named in the supplied
// topic:subscription map exist. If any does not exist, it is created.
func (b *BaseExtensionImpl) EnsureSubscriptionsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicSubscriptionMap map[string]string,
callbackURL string,
) error {
return pubsubtools.EnsureSubscriptionsExist(
ctx,
pubsubClient,
topicSubscriptionMap,
callbackURL,
)
}

// SubscriptionIDs returns a map of topic IDs to subscription IDs
func (b *BaseExtensionImpl) SubscriptionIDs(topicIDs []string) map[string]string {
return pubsubtools.SubscriptionIDs(topicIDs)
}

// PubSubHandlerPath returns pubsub hander path `/pubsub`
func (b *BaseExtensionImpl) PubSubHandlerPath() string {
return pubsubtools.PubSubHandlerPath
}

// VerifyPubSubJWTAndDecodePayload confirms that there is a valid Google signed
// JWT and decodes the pubsub message payload into a struct.
//
// It's use will simplify & shorten the handler funcs that process Cloud Pubsub
// push notifications.
func (b *BaseExtensionImpl) VerifyPubSubJWTAndDecodePayload(
w http.ResponseWriter,
r *http.Request,
) (*pubsubtools.PubSubPayload, error) {
return pubsubtools.VerifyPubSubJWTAndDecodePayload(
w,
r,
)
}

// GetPubSubTopic retrieves a pubsub topic from a pubsub payload.
func (b *BaseExtensionImpl) GetPubSubTopic(m *pubsubtools.PubSubPayload) (string, error) {
return pubsubtools.GetPubSubTopic(m)
}
122 changes: 122 additions & 0 deletions pkg/clinical/application/extensions/mock/extension_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"net/http"

"cloud.google.com/go/pubsub"
"github.com/savannahghi/interserviceclient"
"github.com/savannahghi/profileutils"
"github.com/savannahghi/pubsubtools"
)

// FakeBaseExtension is an mock of the BaseExtension
Expand All @@ -22,6 +24,40 @@ type FakeBaseExtension struct {
source interface{},
status int,
)

MockEnsureTopicsExistFn func(
ctx context.Context,
pubsubClient *pubsub.Client,
topicIDs []string,
) error
MockNamespacePubsubIdentifierFn func(
serviceName string,
topicID string,
environment string,
version string,
) string
MockPublishToPubsubFn func(
ctx context.Context,
pubsubClient *pubsub.Client,
topicID string,
environment string,
serviceName string,
version string,
payload []byte,
) error
MockEnsureSubscriptionsExistFn func(
ctx context.Context,
pubsubClient *pubsub.Client,
topicSubscriptionMap map[string]string,
callbackURL string,
) error
MockSubscriptionIDsFn func(topicIDs []string) map[string]string
MockPubSubHandlerPathFn func() string
MockVerifyPubSubJWTAndDecodePayloadFn func(
w http.ResponseWriter,
r *http.Request,
) (*pubsubtools.PubSubPayload, error)
MockGetPubSubTopicFn func(m *pubsubtools.PubSubPayload) (string, error)
}

// GetLoggedInUser retrieves logged in user information
Expand Down Expand Up @@ -66,3 +102,89 @@ func (b *FakeBaseExtension) WriteJSONResponse(
status int,
) {
}

// EnsureTopicsExist creates the topic(s) in the suppplied list if they do not
// already exist.
func (b *FakeBaseExtension) EnsureTopicsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicIDs []string,
) error {
return b.MockEnsureTopicsExistFn(ctx, pubsubClient, topicIDs)
}

// NamespacePubsubIdentifier uses the service name, environment and version to
// create a "namespaced" pubsub identifier. This could be a topicID or
// subscriptionID.
func (b *FakeBaseExtension) NamespacePubsubIdentifier(
serviceName string,
topicID string,
environment string,
version string,
) string {
return b.MockNamespacePubsubIdentifierFn(serviceName, topicID, environment, version)
}

// PublishToPubsub sends the supplied payload to the indicated topic
func (b *FakeBaseExtension) PublishToPubsub(
ctx context.Context,
pubsubClient *pubsub.Client,
topicID string,
environment string,
serviceName string,
version string,
payload []byte,
) error {
return b.MockPublishToPubsubFn(
ctx,
pubsubClient,
topicID,
environment,
serviceName,
version,
payload,
)
}

// EnsureSubscriptionsExist ensures that the subscriptions named in the supplied
// topic:subscription map exist. If any does not exist, it is created.
func (b *FakeBaseExtension) EnsureSubscriptionsExist(
ctx context.Context,
pubsubClient *pubsub.Client,
topicSubscriptionMap map[string]string,
callbackURL string,
) error {
return b.MockEnsureSubscriptionsExistFn(
ctx,
pubsubClient,
topicSubscriptionMap,
callbackURL,
)
}

// SubscriptionIDs returns a map of topic IDs to subscription IDs
func (b *FakeBaseExtension) SubscriptionIDs(topicIDs []string) map[string]string {
return b.MockSubscriptionIDsFn(topicIDs)
}

// PubSubHandlerPath returns pubsub hander path `/pubsub`
func (b *FakeBaseExtension) PubSubHandlerPath() string {
return b.MockPubSubHandlerPathFn()
}

// VerifyPubSubJWTAndDecodePayload confirms that there is a valid Google signed
// JWT and decodes the pubsub message payload into a struct.
//
// It's use will simplify & shorten the handler funcs that process Cloud Pubsub
// push notifications.
func (b *FakeBaseExtension) VerifyPubSubJWTAndDecodePayload(
w http.ResponseWriter,
r *http.Request,
) (*pubsubtools.PubSubPayload, error) {
return b.MockVerifyPubSubJWTAndDecodePayloadFn(w, r)
}

// GetPubSubTopic retrieves a pubsub topic from a pubsub payload.
func (b *FakeBaseExtension) GetPubSubTopic(m *pubsubtools.PubSubPayload) (string, error) {
return b.MockGetPubSubTopicFn(m)
}
1 change: 1 addition & 0 deletions pkg/clinical/infrastructure/services/pubsub/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package pubsubmessaging
Loading

0 comments on commit 23f5049

Please sign in to comment.