Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

GCP PubSub Source -- Pull #39

Merged
merged 45 commits into from
Nov 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
d5c48fe
Initial work on GcpPubSubSource.
Harwayne Oct 25, 2018
2ffa444
Further work.
Harwayne Oct 26, 2018
d32238e
Add env vars.
Harwayne Oct 26, 2018
95d5f5f
Merge branch 'master' into pubsub
Harwayne Oct 26, 2018
4a58d7f
Missing auth
Harwayne Oct 26, 2018
a118767
Move to a pull model.
Harwayne Oct 26, 2018
9331ec3
Creating and deleting subscriptions and a non-working deployment.
Harwayne Oct 26, 2018
1e3ca40
It works!
Harwayne Oct 26, 2018
7ee1c01
Small clean up.
Harwayne Oct 26, 2018
0e08917
update-deps
Harwayne Oct 26, 2018
4ca01c1
Merge branch 'master' into pull-pubsub
Harwayne Oct 26, 2018
221470e
Merge branch 'master' into pull-pubsub
Harwayne Oct 29, 2018
9ce74b8
Add docs.
Harwayne Oct 29, 2018
57c4e31
update-deps
Harwayne Oct 29, 2018
2755d03
Move YAML changes to the proper location.
Harwayne Oct 29, 2018
f55e247
Respond to PR comments.
Harwayne Oct 29, 2018
6c60005
Merge branch 'master' into pull-pubsub
Harwayne Oct 29, 2018
a556d1a
update-manifests
Harwayne Oct 29, 2018
6e612ce
update-codegen
Harwayne Oct 29, 2018
d93d0fa
Move env vars to a patch file
Harwayne Oct 29, 2018
ca16e67
Make the GCP creds secret a field on each source, so that each source…
Harwayne Oct 29, 2018
3d21cd0
Change to using sets.String as the interface.
Harwayne Oct 29, 2018
f3b46be
Start the process of moving to a different kustomization.yaml.
Harwayne Oct 29, 2018
928049a
Automatically create and validate default-gcppubsub.yaml
Harwayne Oct 30, 2018
c9426a8
Respond to PR comments.
Harwayne Oct 30, 2018
235f06c
Switch to SecretKeySelector
Harwayne Oct 30, 2018
67d706d
labels.SelectorFromSet
Harwayne Oct 30, 2018
ea17d23
Merge branch 'master' into pull-pubsub
Harwayne Oct 30, 2018
451c04a
Begin adding unit tests.
Harwayne Oct 30, 2018
bebd636
Add more unit tests.
Harwayne Oct 30, 2018
82ef3ea
Add more test cases.
Harwayne Oct 30, 2018
127cef5
Merge branch 'master' of github.com:knative/eventing-sources into pul…
Harwayne Oct 31, 2018
b495965
Unit test for reusing existing receive adapter
Harwayne Oct 31, 2018
ec6e639
Add a sample and readme.
Harwayne Oct 31, 2018
ca83fb2
Add dumper to this repo.
Harwayne Oct 31, 2018
5df5d41
Update readme.
Harwayne Oct 31, 2018
97eee42
PR comments.
Harwayne Oct 31, 2018
bff371f
Move the receive adapter to cmd.
Harwayne Nov 1, 2018
3007bbe
PR comments
Harwayne Nov 1, 2018
c72ca24
Move makeReceiveAdapter into the resources subdirectory.
Harwayne Nov 1, 2018
cd05e79
Make the spec immutable.
Harwayne Nov 1, 2018
38a3f37
PR comments.
Harwayne Nov 1, 2018
384cb25
Each source can now specify its own receive adapter's service account…
Harwayne Nov 1, 2018
fd0c97c
update-manifests
Harwayne Nov 1, 2018
140e243
underscore
Harwayne Nov 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
129 changes: 128 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

126 changes: 126 additions & 0 deletions cmd/gcppubsub_receive_adapter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main
adamharwayne marked this conversation as resolved.
Show resolved Hide resolved

import (
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"

"go.uber.org/zap"

// Imports the Google Cloud Pub/Sub client package.
"cloud.google.com/go/pubsub"
"github.com/knative/eventing/pkg/event"
"golang.org/x/net/context"
)

const (
// Environment variable containing project id
envProject = "GCPPUBSUB_PROJECT"

// Sink for messages.
envSinkURI = "SINK_URI"

// envTopic is the name of the environment variable that contains the GCP PubSub Topic being
// subscribed to's name. In the form that is unique within the project. E.g. 'laconia', not
// 'projects/my-gcp-project/topics/laconia'.
envTopic = "GCPPUBSUB_TOPIC"

// Name of the subscription to use
envSubscription = "GCPPUBSUB_SUBSCRIPTION_ID"
)

func main() {
flag.Parse()

logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("Unable to create logger: %v", err)
}

projectID := getRequiredEnv(envProject)
topicID := getRequiredEnv(envTopic)
sinkURI := getRequiredEnv(envSinkURI)
subscriptionID := getRequiredEnv(envSubscription)

logger.Info("Starting.", zap.String("projectID", projectID), zap.String("topicID", topicID), zap.String("subscriptionID", subscriptionID), zap.String("sinkURI", sinkURI))

ctx := context.Background()
source := fmt.Sprintf("//pubsub.googleapis.com/%s/topics/%s", projectID, topicID)

// Creates a client.
// TODO: Support additional ways of specifying the credentials for creating.
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
logger.Fatal("Failed to create client: %v", zap.Error(err))
}

sub := client.Subscription(subscriptionID)

err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
logger.Debug("Received message", zap.Any("messageData", m.Data))
err = postMessage(sinkURI, source, m, logger)
if err != nil {
logger.Error("Failed to post message", zap.Error(err))
m.Nack()
} else {
m.Ack()
}
})
if err != nil {
logger.Fatal("Failed to create receive function", zap.Error(err))
}
}

func getRequiredEnv(envKey string) string {
if val, defined := os.LookupEnv(envKey); defined {
return val
}
log.Fatalf("required environment variable not defined '%s'", envKey)
// Unreachable.
return ""
}

func postMessage(sinkURI, source string, m *pubsub.Message, logger *zap.Logger) error {
ctx := event.EventContext{
CloudEventsVersion: event.CloudEventsVersion,
EventType: "google.pubsub.topic.publish",
EventID: m.ID,
EventTime: m.PublishTime,
Source: source,
}
req, err := event.Binary.NewRequest(sinkURI, m, ctx)
if err != nil {
logger.Error("Failed to marshal the message.", zap.Error(err), zap.Any("message", m))
return err
}

logger.Debug("Posting message", zap.String("sinkURI", sinkURI))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
logger.Debug("Response", zap.String("status", resp.Status), zap.ByteString("body", body))
return nil
}
39 changes: 39 additions & 0 deletions cmd/message_dumper/dumper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"log"
"net/http"
"net/http/httputil"
)

type MessageDumper struct{}

func (md *MessageDumper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if reqBytes, err := httputil.DumpRequest(r, true); err == nil {
log.Printf("Message Dumper received a message: %+v", string(reqBytes))
w.Write(reqBytes)
} else {
log.Printf("Error dumping the request: %+v :: %+v", err, r)
}
}

func main() {
http.ListenAndServe(":8080", &MessageDumper{})
}
Loading