Skip to content

Commit

Permalink
Refactored Notifier, Integration and Credential handling in server.
Browse files Browse the repository at this point in the history
  • Loading branch information
vhadianto committed Feb 16, 2024
1 parent f9dbd93 commit b6beb2c
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 33 deletions.
37 changes: 37 additions & 0 deletions internal/es/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,43 @@ func ListAllPipelines() ([]*modconfig.Pipeline, error) {
return pipelines, nil
}

func GetIntegration(name string) (modconfig.Integration, error) {

integrationCached, found := cache.GetCache().Get(name)
if !found {
return nil, perr.NotFoundWithMessage("integration definition not found: " + name)
}

integration, ok := integrationCached.(modconfig.Integration)
if !ok {
return nil, perr.InternalWithMessage("invalid integration")
}
return integration, nil
}

func ListAllIntegrations() ([]modconfig.Integration, error) {
integrationNamesCached, found := cache.GetCache().Get("#integration.names")
if !found {
return nil, perr.NotFoundWithMessage("integration names not found")
}

integrationNames, ok := integrationNamesCached.([]string)
if !ok {
return nil, perr.InternalWithMessage("integration name cached is not a list of string")
}

var integrations []modconfig.Integration
for _, name := range integrationNames {
integration, err := GetIntegration(name)
if err != nil {
return nil, err
}
integrations = append(integrations, integration)
}

return integrations, nil
}

func GetTrigger(name string) (*modconfig.Trigger, error) {
triggerCached, found := cache.GetCache().Get(name)
if !found {
Expand Down
7 changes: 7 additions & 0 deletions internal/es/estest/test_suite_mod/integrations.fpc
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,11 @@ notifier "my_notifier" {
integration = integration.webform.default
to = ["just some things here"]
}
}

integration "slack" "my_other_slack_app" {
token = "xoxp-111111"

# optional - if you want to verify the source
signing_secret = "Q#$$#@#$$#W"
}
18 changes: 13 additions & 5 deletions internal/es/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,15 @@ func (ex *Execution) buildIntegrationMapForEvalContext(pipelineDefn *modconfig.P
slackIntegrationMap := map[string]cty.Value{}
emailIntegrationMap := map[string]cty.Value{}

for _, p := range pipelineDefn.GetMod().ResourceMaps.Integrations {
fpConfig, err := db.GetFlowpipeConfig()
if err != nil {
return nil, err
}

for _, p := range fpConfig.Integrations {

parts := strings.Split(p.Name(), ".")
if len(parts) != 4 {
if len(parts) != 2 {
return nil, perr.BadRequestWithMessage("invalid integration name: " + p.Name())
}

Expand All @@ -285,14 +290,17 @@ func (ex *Execution) buildIntegrationMapForEvalContext(pipelineDefn *modconfig.P
return nil, err
}

integrationType := parts[2]
integrationType := parts[0]

switch integrationType {
case string(schema.IntegrationTypeSlack):
slackIntegrationMap[parts[3]] = pCty
slackIntegrationMap[parts[1]] = pCty

case string(schema.IntegrationTypeEmail):
emailIntegrationMap[parts[3]] = pCty
emailIntegrationMap[parts[1]] = pCty

case string(schema.IntegrationTypeWebform):
// do nothing

default:
return nil, perr.BadRequestWithMessage("invalid integration type: " + integrationType)
Expand Down
20 changes: 15 additions & 5 deletions internal/es/execution/execution_in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,22 @@ func (ex *ExecutionInMemory) buildPipelineMapForEvalContext() (map[string]cty.Va
return pipelineMap, nil
}

// TODO: this function is very similar to execution's buildIntegrationMapForEvalContext. Need to refactor
func (ex *ExecutionInMemory) buildIntegrationMapForEvalContext(pipelineDefn *modconfig.Pipeline) (map[string]cty.Value, error) {
integrationMap := map[string]cty.Value{}
slackIntegrationMap := map[string]cty.Value{}
emailIntegrationMap := map[string]cty.Value{}

for _, p := range pipelineDefn.GetMod().ResourceMaps.Integrations {
fpConfig, err := db.GetFlowpipeConfig()
if err != nil {
return nil, err
}

for _, p := range fpConfig.Integrations {

parts := strings.Split(p.Name(), ".")
if len(parts) != 4 {

if len(parts) != 2 {
return nil, perr.BadRequestWithMessage("invalid integration name: " + p.Name())
}

Expand All @@ -279,14 +286,17 @@ func (ex *ExecutionInMemory) buildIntegrationMapForEvalContext(pipelineDefn *mod
return nil, err
}

integrationType := parts[2]
integrationType := parts[0]

switch integrationType {
case string(schema.IntegrationTypeSlack):
slackIntegrationMap[parts[3]] = pCty
slackIntegrationMap[parts[1]] = pCty

case string(schema.IntegrationTypeEmail):
emailIntegrationMap[parts[3]] = pCty
emailIntegrationMap[parts[1]] = pCty

case string(schema.IntegrationTypeWebform):
// do nothing

default:
return nil, perr.BadRequestWithMessage("invalid integration type: " + integrationType)
Expand Down
1 change: 1 addition & 0 deletions internal/service/api/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (api *APIService) Start() error {
api.WebhookRegisterAPI(apiPrefixGroup)
api.InputRegisterAPI(apiPrefixGroup)
api.ModRegisterAPI(apiPrefixGroup)
api.IntegrationRegisterAPI(apiPrefixGroup)

api.apiPrefixGroup = apiPrefixGroup
api.router = router
Expand Down
118 changes: 118 additions & 0 deletions internal/service/api/integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package api

import (
"log/slog"
"net/http"
"sort"

"github.com/gin-gonic/gin"
"github.com/turbot/flowpipe/internal/es/db"
"github.com/turbot/flowpipe/internal/service/api/common"
"github.com/turbot/flowpipe/internal/types"
)

func (api *APIService) IntegrationRegisterAPI(router *gin.RouterGroup) {
router.GET("/integration", api.listIntegrations)
router.GET("/integration/:integration_name", api.getIntegration)
}

// @Summary List integrations
// @Description Lists integrations
// @ID integration_list
// @Tags Integration
// @Accept json
// @Produce json
// / ...
// @Param limit query int false "The max number of items to fetch per page of data, subject to a min and max of 1 and 100 respectively. If not specified will default to 25." default(25) minimum(1) maximum(100)
// @Param next_token query string false "When list results are truncated, next_token will be returned, which is a cursor to fetch the next page of data. Pass next_token to the subsequent list request to fetch the next page of data."
// ...
// @Success 200 {object} types.ListIntegrationResponse
// @Failure 400 {object} perr.ErrorModel
// @Failure 401 {object} perr.ErrorModel
// @Failure 403 {object} perr.ErrorModel
// @Failure 429 {object} perr.ErrorModel
// @Failure 500 {object} perr.ErrorModel
// @Router /integration [get]
func (api *APIService) listIntegrations(c *gin.Context) {
// Get paging parameters
nextToken, limit, err := common.ListPagingRequest(c)
if err != nil {
common.AbortWithError(c, err)
return
}

slog.Info("received list integrations request", "next_token", nextToken, "limit", limit)

result, err := ListIntegrations(api.EsService.RootMod.Name())
if err != nil {
common.AbortWithError(c, err)
return
}
c.JSON(http.StatusOK, result)
}

func ListIntegrations(rootMod string) (*types.ListIntegrationResponse, error) {
integrations, err := db.ListAllIntegrations()
if err != nil {
return nil, err
}

var listIntegrationResponseItems []types.FpIntegration

for _, integration := range integrations {
item, err := types.FpIntegrationFromModIntegration(integration, rootMod)
if err != nil {
return nil, err
}
listIntegrationResponseItems = append(listIntegrationResponseItems, *item)
}

sort.Slice(listIntegrationResponseItems, func(i, j int) bool {
return listIntegrationResponseItems[i].Name < listIntegrationResponseItems[j].Name
})

// TODO: paging, filter, sorting
result := &types.ListIntegrationResponse{
Items: listIntegrationResponseItems,
}
return result, nil
}

// @Summary Get integration
// @Description Get integration
// @ID integration_get
// @Tags Integration
// @Accept json
// @Produce json
// / ...
// @Param integration_name path string true "The name of the integration" format(^[a-z_]{0,32}$)
// / ...
// @Success 200 {object} types.Integration
// @Failure 400 {object} perr.ErrorModel
// @Failure 401 {object} perr.ErrorModel
// @Failure 403 {object} perr.ErrorModel
// @Failure 404 {object} perr.ErrorModel
// @Failure 429 {object} perr.ErrorModel
// @Failure 500 {object} perr.ErrorModel
// @Router /integration/{integration_name} [get]
func (api *APIService) getIntegration(c *gin.Context) {
var uri types.IntegrationRequestURI
if err := c.ShouldBindUri(&uri); err != nil {
common.AbortWithError(c, err)
return
}

integration, err := db.GetIntegration(uri.IntegrationName)
if err != nil {
common.AbortWithError(c, err)
return
}

integrationResponse, err := types.FpIntegrationFromModIntegration(integration, api.EsService.RootMod.Name())
if err != nil {
common.AbortWithError(c, err)
return
}

c.JSON(http.StatusOK, integrationResponse)
}
Loading

0 comments on commit b6beb2c

Please sign in to comment.