Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixed #1675: Updated exemplary streaming collector plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
IzabellaRaulin committed Jul 10, 2017
1 parent a492827 commit f4df5b4
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 13 deletions.
2 changes: 1 addition & 1 deletion control/control_security_test.go
Expand Up @@ -217,7 +217,7 @@ func TestSecureStreamingCollector(t *testing.T) {
SetCertPath(tlsTestSrv+fixtures.TestCrtFileExt).
SetKeyPath(tlsTestSrv+fixtures.TestKeyFileExt).
SetCACertPaths(tlsTestCA+fixtures.TestCrtFileExt).
SetTLSEnabled(true), helper.PluginFilePath("snap-plugin-stream-collector-rand1"),
SetTLSEnabled(true), helper.PluginFilePath("snap-plugin-streaming-collector-rand1"),
security)
So(err, ShouldBeNil)
Convey("and valid plugin client should be obtained", func() {
Expand Down
8 changes: 4 additions & 4 deletions control/control_test.go
Expand Up @@ -1367,7 +1367,7 @@ func TestStreamMetrics(t *testing.T) {
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)

// Load plugin
_, e := load(c, fixtures.PluginPathStreamRand1)
_, e := load(c, fixtures.PluginPathStreamingRand1)
So(e, ShouldBeNil)
<-lpe.done
mts, err := c.MetricCatalog()
Expand All @@ -1392,7 +1392,7 @@ func TestStreamMetrics(t *testing.T) {
}

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
lp, err := c.pluginManager.get("streaming-collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)

Expand All @@ -1406,10 +1406,10 @@ func TestStreamMetrics(t *testing.T) {
taskHit := "hitting"

Convey("create a pool, add subscriptions and start plugins", func() {
serrs := c.SubscribeDeps(taskHit, r, []core.SubscribedPlugin{subscribedPlugin{typeName: "collector", name: "test-rand-streamer", version: 1}}, cdt)
serrs := c.SubscribeDeps(taskHit, r, []core.SubscribedPlugin{subscribedPlugin{typeName: "streaming-collector", name: "test-rand-streamer", version: 1}}, cdt)
So(serrs, ShouldBeNil)

pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("streaming-collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)

Expand Down
4 changes: 2 additions & 2 deletions control/fixtures/fixtures.go
Expand Up @@ -39,8 +39,8 @@ var (
PluginNameMock2 = "snap-plugin-collector-mock2"
PluginPathMock2 = helper.PluginFilePath(PluginNameMock2)

PluginNameStreamRand1 = "snap-plugin-stream-collector-rand1"
PluginPathStreamRand1 = helper.PluginFilePath(PluginNameStreamRand1)
PluginNameStreamingRand1 = "snap-plugin-streaming-collector-rand1"
PluginPathStreamingRand1 = helper.PluginFilePath(PluginNameStreamingRand1)

PluginNameMock2Grpc = "snap-plugin-collector-mock2-grpc"
PluginPathMock2Grpc = helper.PluginFilePath(PluginNameMock2Grpc)
Expand Down
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 57 additions & 0 deletions mgmt/rest/client/client_func_test.go
Expand Up @@ -51,6 +51,8 @@ var (
MOCK_PLUGIN_PATH2 = []string{helper.PluginFilePath("snap-plugin-collector-mock2")}
ANOTHERMOCK_PLUGIN_PATH = []string{helper.PluginFilePath("snap-plugin-collector-anothermock1")}
FILE_PLUGIN_PATH = []string{helper.PluginFilePath("snap-plugin-publisher-mock-file")}
RAND_PLUGIN_PATH = []string{helper.PluginFilePath("snap-plugin-streaming-collector-rand1")}
PASSTHRU_PLUGIN_PATH = []string{helper.PluginFilePath("snap-plugin-processor-passthru")}
DIRECTORY_PATH = []string{helper.PluginPath()}

NextPort = 45000
Expand Down Expand Up @@ -640,6 +642,61 @@ func TestSnapClient(t *testing.T) {
})
}

func TestClient_SimpleLoadAndUnloadPlugin(t *testing.T) {
CompressUpload = false

Convey("Client should exist", t, func() {
uri := startAPI()
c, cerr := New(uri, "v1", true)
So(cerr, ShouldBeNil)

Convey("load collector plugin", func() {
p := c.LoadPlugin(MOCK_PLUGIN_PATH1)
So(p.Err, ShouldBeNil)
Convey("unload loaded plugin", func() {
p := c.UnloadPlugin("collector", "mock", 1)
So(p.Err, ShouldBeNil)
So(p.Name, ShouldEqual, "mock")
So(p.Version, ShouldEqual, 1)
So(p.Type, ShouldEqual, "collector")
})
})
Convey("load publisher plugin", func() {
p := c.LoadPlugin(FILE_PLUGIN_PATH)
So(p.Err, ShouldBeNil)
Convey("unload loaded plugin", func() {
p := c.UnloadPlugin("publisher", "mock-file", 3)
So(p.Err, ShouldBeNil)
So(p.Name, ShouldEqual, "mock-file")
So(p.Version, ShouldEqual, 3)
So(p.Type, ShouldEqual, "publisher")
})
})
Convey("load processor plugin", func() {
p := c.LoadPlugin(PASSTHRU_PLUGIN_PATH)
So(p.Err, ShouldBeNil)
Convey("unload loaded plugin", func() {
p := c.UnloadPlugin("processor", "passthru", 1)
So(p.Err, ShouldBeNil)
So(p.Name, ShouldEqual, "passthru")
So(p.Version, ShouldEqual, 1)
So(p.Type, ShouldEqual, "processor")
})
})
Convey("load streaming collector plugin", func() {
p := c.LoadPlugin(RAND_PLUGIN_PATH)
So(p.Err, ShouldBeNil)
Convey("unload loaded plugin", func() {
p := c.UnloadPlugin("streaming-collector", "test-rand-streamer", 1)
So(p.Err, ShouldBeNil)
So(p.Name, ShouldEqual, "test-rand-streamer")
So(p.Version, ShouldEqual, 1)
So(p.Type, ShouldEqual, "streaming-collector")
})
})
})
}

func TestClient_UnloadPlugin(t *testing.T) {
CompressUpload = false

Expand Down
Binary file not shown.
Binary file not shown.
@@ -1,3 +1,8 @@
## Notice
This exemplary streaming collector plugin comes from snap-plugin-lib-go repo. It is highly recommended to use up-to-date source code from
the original location: https://github.com/intelsdi-x/snap-plugin-lib-go/tree/master/examples



## Snap Plugin Go Library: Stream Collector Plugin Example
Here you will find an example plugin that covers the basics for writing a stream collector plugin.
Expand Down
Expand Up @@ -21,7 +21,7 @@ package main

import (
"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin"
"github.com/intelsdi-x/snap/plugin/collector/snap-plugin-stream-collector-rand1/rand"
"github.com/intelsdi-x/snap/plugin/collector/snap-plugin-streaming-collector-rand1/rand"
)

const (
Expand Down
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package rand

import (
"context"
"fmt"
"math/rand"
"time"
Expand Down Expand Up @@ -61,21 +62,21 @@ type RandCollector struct {
metrics []plugin.Metric
}

// StreamMetrics taks both an in and out channel of []plugin.Metric
// StreamMetrics takes both an in and out channel of []plugin.Metric
//
// The metrics_in channel is used to set/update the metrics that Snap is
// currently requesting to be collected by the plugin.
//
// The metrics_out channel is used by the plugin to send the collected metrics
// to Snap.
func (r *RandCollector) StreamMetrics(
ctx context.Context,
metrics_in chan []plugin.Metric,
metrics_out chan []plugin.Metric,
err chan string) error {

go r.streamIt(metrics_out, err)
go r.drainMetrics(metrics_in)

r.drainMetrics(metrics_in)
return nil
}

Expand Down Expand Up @@ -145,7 +146,6 @@ func (RandCollector) GetMetricTypes(cfg plugin.Config) ([]plugin.Metric, error)
for _, val := range vals {
metric := plugin.Metric{
Namespace: plugin.NewNamespace("random", val),
Version: 1,
}
metrics = append(metrics, metric)
}
Expand Down

0 comments on commit f4df5b4

Please sign in to comment.