Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 160 additions & 42 deletions api/channel_conversation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ const (
)

type channelConversationUpsertRequest struct {
RequestID string `json:"requestId,omitempty"`
Namespace string `json:"namespace,omitempty"`
ConversationID string `json:"conversationId,omitempty"`
PrincipalID string `json:"principalId"`
InstanceID string `json:"instanceId"`
OwnerID string `json:"ownerId"`
Provider string `json:"provider"`
ExternalScopeType string `json:"externalScopeType"`
ExternalTenantID string `json:"externalTenantId"`
ExternalChannelID string `json:"externalChannelId"`
ExternalConversationID string `json:"externalConversationId"`
Title string `json:"title,omitempty"`
CWD string `json:"cwd,omitempty"`
RequestID string `json:"requestId,omitempty"`
Namespace string `json:"namespace,omitempty"`
ConversationID string `json:"conversationId,omitempty"`
PrincipalID string `json:"principalId"`
InstanceID string `json:"instanceId"`
OwnerID string `json:"ownerId"`
Provider string `json:"provider"`
ExternalScopeType string `json:"externalScopeType"`
ExternalTenantID string `json:"externalTenantId"`
ExternalChannelID string `json:"externalChannelId"`
ExternalConversationID string `json:"externalConversationId"`
LookupExternalConversationIDs []string `json:"lookupExternalConversationIds,omitempty"`
Title string `json:"title,omitempty"`
CWD string `json:"cwd,omitempty"`
}

type normalizedChannelConversationIdentity struct {
Expand Down Expand Up @@ -93,6 +94,10 @@ func normalizeChannelConversationUpsertRequest(body channelConversationUpsertReq
if body.ExternalConversationID == "" {
return channelConversationUpsertRequest{}, normalizedChannelConversationIdentity{}, echo.NewHTTPError(http.StatusBadRequest, "externalConversationId is required")
}
body.LookupExternalConversationIDs = normalizeLookupExternalConversationIDs(
body.ExternalConversationID,
body.LookupExternalConversationIDs,
)

return body, normalizedChannelConversationIdentity{
principalID: body.PrincipalID,
Expand All @@ -104,6 +109,30 @@ func normalizeChannelConversationUpsertRequest(body channelConversationUpsertReq
}, nil
}

func normalizeLookupExternalConversationIDs(primaryID string, lookupIDs []string) []string {
primaryID = strings.TrimSpace(primaryID)
if len(lookupIDs) == 0 {
return nil
}
normalized := make([]string, 0, len(lookupIDs))
seen := map[string]struct{}{}
for _, lookupID := range lookupIDs {
lookupID = strings.TrimSpace(lookupID)
if lookupID == "" || lookupID == primaryID {
continue
}
if _, ok := seen[lookupID]; ok {
continue
}
seen[lookupID] = struct{}{}
normalized = append(normalized, lookupID)
}
if len(normalized) == 0 {
return nil
}
return normalized
}

func channelConversationRouteHash(identity normalizedChannelConversationIdentity, ownerID, instanceID string) string {
sum := sha256.Sum256([]byte(strings.Join([]string{
identity.principalID,
Expand Down Expand Up @@ -211,6 +240,15 @@ func channelConversationHasExternalConversationID(conversation *spritzv1.SpritzC
return false
}

func channelConversationHasAnyExternalConversationID(conversation *spritzv1.SpritzConversation, externalConversationIDs []string) bool {
for _, externalConversationID := range externalConversationIDs {
if channelConversationHasExternalConversationID(conversation, externalConversationID) {
return true
}
}
return false
}

func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool {
return channelConversationMatchesBaseIdentity(conversation, identity) &&
channelConversationHasExternalConversationID(conversation, identity.externalConversationID)
Expand Down Expand Up @@ -275,31 +313,42 @@ func (s *server) getAdminScopedACPReadySpritz(c echo.Context, namespace, instanc
return spritz, nil
}

func (s *server) findChannelConversation(c echo.Context, namespace string, spritz *spritzv1.Spritz, identity normalizedChannelConversationIdentity) (*spritzv1.SpritzConversation, bool, error) {
exactList := &spritzv1.SpritzConversationList{}
if err := s.client.List(
c.Request().Context(),
exactList,
client.InNamespace(namespace),
client.MatchingLabels{
acpConversationLabelKey: acpConversationLabelValue,
acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID),
acpConversationSpritzLabelKey: spritz.Name,
channelConversationRouteLabelKey: channelConversationRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name),
},
); err != nil {
return nil, false, err
}
func (s *server) findChannelConversation(c echo.Context, namespace string, spritz *spritzv1.Spritz, identity normalizedChannelConversationIdentity, lookupExternalConversationIDs []string) (*spritzv1.SpritzConversation, bool, error) {
matchExternalConversationIDs := append(
[]string{identity.externalConversationID},
lookupExternalConversationIDs...,
)
var match *spritzv1.SpritzConversation
for i := range exactList.Items {
item := &exactList.Items[i]
if !channelConversationMatchesIdentity(item, identity) {
continue
for _, externalConversationID := range matchExternalConversationIDs {
candidateIdentity := identity
candidateIdentity.externalConversationID = externalConversationID
exactList := &spritzv1.SpritzConversationList{}
if err := s.client.List(
c.Request().Context(),
exactList,
client.InNamespace(namespace),
client.MatchingLabels{
acpConversationLabelKey: acpConversationLabelValue,
acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID),
acpConversationSpritzLabelKey: spritz.Name,
channelConversationRouteLabelKey: channelConversationRouteHash(candidateIdentity, spritz.Spec.Owner.ID, spritz.Name),
},
); err != nil {
return nil, false, err
}
if match != nil {
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
for i := range exactList.Items {
item := &exactList.Items[i]
if !channelConversationMatchesBaseIdentity(item, identity) || !channelConversationHasAnyExternalConversationID(item, matchExternalConversationIDs) {
continue
}
if match != nil && item.Name == match.Name {
continue
}
if match != nil {
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
}
match = item.DeepCopy()
}
match = item.DeepCopy()
}

baseList := &spritzv1.SpritzConversationList{}
Expand All @@ -320,23 +369,92 @@ func (s *server) findChannelConversation(c echo.Context, namespace string, sprit
); err != nil {
return nil, false, err
}
var fallbackMatch *spritzv1.SpritzConversation
fallbackMatchCount := 0
for i := range baseList.Items {
item := &baseList.Items[i]
if !channelConversationMatchesIdentity(item, identity) {
if !channelConversationMatchesBaseIdentity(item, identity) {
continue
}
if match != nil && item.Name == match.Name {
if channelConversationHasAnyExternalConversationID(item, matchExternalConversationIDs) {
if match != nil && item.Name == match.Name {
continue
}
if match != nil {
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
}
match = item.DeepCopy()
continue
}

// During the Slack cutover, a previously used channel may only have an
// older per-thread/per-message identity. Reuse that lone base-route match
// instead of forking a fresh channel-scoped conversation.
if match != nil {
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
continue
}
if fallbackMatch != nil && item.Name == fallbackMatch.Name {
continue
}
if fallbackMatch == nil {
fallbackMatch = item.DeepCopy()
}
fallbackMatchCount++
}
if match != nil {
return match, true, nil
}

legacyList := &spritzv1.SpritzConversationList{}
if err := s.client.List(
c.Request().Context(),
legacyList,
client.InNamespace(namespace),
client.MatchingLabels{
acpConversationLabelKey: acpConversationLabelValue,
acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID),
acpConversationSpritzLabelKey: spritz.Name,
},
); err != nil {
return nil, false, err
}
for i := range legacyList.Items {
item := &legacyList.Items[i]
if strings.TrimSpace(item.Labels[channelConversationBaseRouteLabelKey]) != "" {
continue
}
if !channelConversationMatchesBaseIdentity(item, identity) {
continue
}
if channelConversationHasAnyExternalConversationID(item, matchExternalConversationIDs) {
if match != nil && item.Name == match.Name {
continue
}
if match != nil {
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
}
match = item.DeepCopy()
continue
}

// Some pre-cutover conversations predate the base-route label entirely.
// Reuse that lone legacy match instead of forking a new channel-scoped
// conversation on the first post-deploy top-level message.
if fallbackMatch != nil && item.Name == fallbackMatch.Name {
continue
}
match = item.DeepCopy()
if fallbackMatch == nil {
fallbackMatch = item.DeepCopy()
}
fallbackMatchCount++
}
if match != nil {
return match, true, nil
}
if match == nil {
return nil, false, nil
if fallbackMatchCount == 1 {
return fallbackMatch, true, nil
}
return match, true, nil
return nil, false, nil
}

func applyChannelConversationMetadata(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity, requestID string, spritz *spritzv1.Spritz) {
Expand Down
87 changes: 67 additions & 20 deletions api/channel_conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,52 @@ import (

"github.com/labstack/echo/v4"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/retry"
spritzv1 "spritz.sh/operator/api/v1"
)

func (s *server) backfillFoundChannelConversation(
c echo.Context,
namespace string,
conversationName string,
identity normalizedChannelConversationIdentity,
spritz *spritzv1.Spritz,
requestID string,
) (*spritzv1.SpritzConversation, error) {
var updated *spritzv1.SpritzConversation
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
current := &spritzv1.SpritzConversation{}
if err := s.client.Get(c.Request().Context(), clientKey(namespace, conversationName), current); err != nil {
return err
}
changed := ensureChannelConversationBaseRouteLabel(current, identity, spritz)
if !channelConversationHasExternalConversationID(current, identity.externalConversationID) {
aliasChanged, err := appendChannelConversationAlias(current, identity.externalConversationID)
if err != nil {
return err
}
changed = changed || aliasChanged
}
if requestID != "" {
if current.Annotations == nil {
current.Annotations = map[string]string{}
}
current.Annotations[requestIDAnnotationKey] = requestID
changed = true
}
if !changed {
updated = current.DeepCopy()
return nil
}
if err := s.client.Update(c.Request().Context(), current); err != nil {
return err
}
updated = current.DeepCopy()
return nil
})
return updated, err
}

func (s *server) upsertChannelConversation(c echo.Context) error {
if !s.acp.enabled {
return writeError(c, http.StatusNotFound, "acp disabled")
Expand Down Expand Up @@ -61,7 +104,7 @@ func (s *server) upsertChannelConversation(c echo.Context) error {
if !channelConversationMatchesBaseIdentity(existing, identity) || !channelConversationBelongsToSpritz(existing, spritz) {
return writeError(c, http.StatusConflict, "channel conversation is ambiguous")
}
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity)
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity, normalizedBody.LookupExternalConversationIDs)
if err != nil {
if httpErr, ok := err.(*echo.HTTPError); ok {
return writeError(c, httpErr.Code, httpErr.Message.(string))
Expand All @@ -71,35 +114,39 @@ func (s *server) upsertChannelConversation(c echo.Context) error {
if found && conversation.Name != existing.Name {
return writeError(c, http.StatusConflict, "channel conversation is ambiguous")
}
changed := ensureChannelConversationBaseRouteLabel(existing, identity, spritz)
aliasChanged, err := appendChannelConversationAlias(existing, identity.externalConversationID)
updatedConversation, err := s.backfillFoundChannelConversation(
c,
namespace,
existing.Name,
identity,
spritz,
normalizedBody.RequestID,
)
if err != nil {
return writeError(c, http.StatusInternalServerError, err.Error())
}
changed = changed || aliasChanged
if normalizedBody.RequestID != "" {
if existing.Annotations == nil {
existing.Annotations = map[string]string{}
}
existing.Annotations[requestIDAnnotationKey] = normalizedBody.RequestID
changed = true
}
if changed {
if err := s.client.Update(c.Request().Context(), existing); err != nil {
return s.writeACPResourceError(c, err)
}
return s.writeACPResourceError(c, err)
}
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": existing})
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": updatedConversation})
}
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity)
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity, normalizedBody.LookupExternalConversationIDs)
if err != nil {
if httpErr, ok := err.(*echo.HTTPError); ok {
return writeError(c, httpErr.Code, httpErr.Message.(string))
}
return writeError(c, http.StatusInternalServerError, err.Error())
}
if found {
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": conversation})
updatedConversation, err := s.backfillFoundChannelConversation(
c,
namespace,
conversation.Name,
identity,
spritz,
normalizedBody.RequestID,
)
if err != nil {
return s.writeACPResourceError(c, err)
}
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": updatedConversation})
}

conversation, err = buildACPConversationResource(spritz, normalizedBody.Title, normalizedBody.CWD)
Expand Down
Loading
Loading