Skip to content
This repository has been archived by the owner on Mar 14, 2022. It is now read-only.

Stream create file actions to Kinesis #255

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions handlers/deposit_file.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handlers

import (
"encoding/json"
"log"

"github.com/go-openapi/runtime"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/sul-dlss-labs/taco/generated/restapi/operations"
"github.com/sul-dlss-labs/taco/identifier"
"github.com/sul-dlss-labs/taco/storage"
"github.com/sul-dlss-labs/taco/streaming"
"github.com/sul-dlss-labs/taco/uploaded"
"github.com/sul-dlss-labs/taco/validators"
)
Expand All @@ -18,15 +20,19 @@ const atContext = "http://sdr.sul.stanford.edu/contexts/taco-base.jsonld"
const fileType = "http://sdr.sul.stanford.edu/contexts/sdr3-file.jsonld"

// NewDepositFile -- Accepts requests to create a file and pushes it to s3.
func NewDepositFile(database db.Database, uploader storage.Storage, identifierService identifier.Service) operations.DepositFileHandler {
return &depositFileEntry{database: database,
func NewDepositFile(database db.Database, uploader storage.Storage, stream streaming.Stream, identifierService identifier.Service) operations.DepositFileHandler {
return &depositFileEntry{
database: database,
storage: uploader,
identifierService: identifierService}
stream: stream,
identifierService: identifierService,
}
}

type depositFileEntry struct {
database db.Database
storage storage.Storage
stream streaming.Stream
identifierService identifier.Service
}

Expand All @@ -52,6 +58,11 @@ func (d *depositFileEntry) Handle(params operations.DepositFileParams) middlewar
if err := d.createFileResource(id, params.Upload.Header.Filename); err != nil {
panic(err)
}

if err := d.addToStream(&id); err != nil {
panic(err)
}

// TODO: return file location: https://github.com/sul-dlss-labs/taco/issues/160
response := map[string]interface{}{"id": id}
return operations.NewDepositResourceCreated().WithPayload(response)
Expand All @@ -71,6 +82,15 @@ func (d *depositFileEntry) createFileResource(resourceID string, filename string
return d.database.Insert(resource)
}

func (d *depositFileEntry) addToStream(id *string) error {
message, err := json.Marshal(id)
if err != nil {
return err
}

return d.stream.SendMessage(string(message))
}

func (d *depositFileEntry) buildPersistableResource(resourceID string, filename string) datautils.Resource {
// TODO: Expand here if we need to set any default properties on the file
identification := map[string]interface{}{"filename": filename, "identifier": resourceID, "sdrUUID": resourceID}
Expand Down
5 changes: 4 additions & 1 deletion handlers/deposit_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ func TestCreateFileHappyPath(t *testing.T) {
r := gofight.New()
storage := NewMockStorage()
repo := NewMockDatabase(nil)
stream := NewMockStream()

r.POST(filePath).
SetHeader(gofight.H{
"Content-Type": contentType,
}).
SetBody(body).
Run(handler(repo, nil, storage),
Run(handler(repo, stream, storage),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusCreated, r.Code)
assert.Equal(t, 1, len(storage.(*MockStorage).CreatedFiles))
assert.Equal(t, 1, len(repo.(*MockDatabase).CreatedResources))
fileResource := repo.(*MockDatabase).CreatedResources[0]
fileName := fileResource["identification"].(map[string]interface{})["filename"]
assert.Equal(t, fileName, "foo.txt")
assert.Equal(t, 1, len(stream.(*MockStream).Messages))

})
}

Expand Down
5 changes: 1 addition & 4 deletions handlers/deposit_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package handlers

import (
"encoding/json"
"log"

"github.com/go-openapi/runtime/middleware"
"github.com/sul-dlss-labs/taco/datautils"
Expand Down Expand Up @@ -71,8 +70,6 @@ func (d *depositResource) addToStream(id *string) error {
if err != nil {
return err
}
if d.stream == nil {
log.Printf("Stream is nil")
}

return d.stream.SendMessage(string(message))
}
9 changes: 4 additions & 5 deletions handlers/deposit_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ func postData() map[string]interface{} {
func TestCreateResourceHappyPath(t *testing.T) {
r := gofight.New()
repo := NewMockDatabase(nil)
stream := NewMockStream("")

r.POST("/v1/resource").
SetJSON(postData()).
Run(handler(repo, stream, nil),
Run(handler(repo, nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusCreated, r.Code)
assert.Equal(t, 1, len(repo.(*MockDatabase).CreatedResources))
Expand All @@ -45,7 +44,7 @@ func TestCreateResourceMissingSourceId(t *testing.T) {
"id": "oo000oo0001",
"title": "My work",
}).
Run(handler(NewMockDatabase(nil), NewMockStream(""), nil),
Run(handler(nil, nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusUnprocessableEntity, r.Code)
})
Expand All @@ -63,7 +62,7 @@ func TestCreateResourceSemanticallyValid(t *testing.T) {
"preserve": true,
"publish": true,
"sourceId": "bib12345678"}).
Run(handler(NewMockDatabase(nil), NewMockStream(""), nil),
Run(handler(NewMockDatabase(nil), nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusUnprocessableEntity, r.Code)
})
Expand All @@ -75,7 +74,7 @@ func TestCreateResourceFailure(t *testing.T) {
func() {
r.POST("/v1/resource").
SetJSON(postData()).
Run(handler(NewMockErrorDatabase(), NewMockStream(""), nil),
Run(handler(NewMockErrorDatabase(), nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {})
})
}
2 changes: 1 addition & 1 deletion handlers/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestHealthCheck(t *testing.T) {
r := gofight.New()
r.GET("/v1/healthcheck").
Run(handler(NewMockDatabase(nil), NewMockStream(""), nil),
Run(handler(nil, nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusOK, r.Code)
stat, _ := jsonparser.GetString(r.Body.Bytes(), "status")
Expand Down
2 changes: 1 addition & 1 deletion handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func BuildAPI(database db.Database, stream streaming.Stream, storage storage.Sto
api.DepositResourceHandler = NewDepositResource(database, stream, depositValidator, identifierService)
updateValidator := validators.NewUpdateResourceValidator(database)
api.UpdateResourceHandler = NewUpdateResource(database, stream, updateValidator)
api.DepositFileHandler = NewDepositFile(database, storage, identifierService)
api.DepositFileHandler = NewDepositFile(database, storage, stream, identifierService)
api.HealthCheckHandler = NewHealthCheck()
return api
}
Expand Down
2 changes: 1 addition & 1 deletion handlers/stub_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func handler(database db.Database, stream streaming.Stream, storage storage.Stor
database = NewMockDatabase(nil)
}
if stream == nil {
stream = NewMockStream("")
stream = NewMockStream()
}
if storage == nil {
storage = NewMockStorage()
Expand Down
9 changes: 5 additions & 4 deletions handlers/stub_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package handlers
import "github.com/sul-dlss-labs/taco/streaming"

type MockStream struct {
message string
Messages []string
}

func NewMockStream(message string) streaming.Stream {
return &MockStream{message: message}
func NewMockStream() streaming.Stream {
return &MockStream{Messages: []string{}}
}

func (d *MockStream) SendMessage(message string) error {
func (s *MockStream) SendMessage(message string) error {
s.Messages = append(s.Messages, message)
return nil
}
6 changes: 3 additions & 3 deletions handlers/update_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestUpdateResourceHappyPath(t *testing.T) {
r.PATCH("/v1/resource/99").
SetHeader(gofight.H{"Content-Type": "application/json"}).
SetJSON(updateMessage).
Run(handler(repo, NewMockStream(""), nil),
Run(handler(repo, nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusOK, r.Code)
})
Expand All @@ -55,7 +55,7 @@ func TestUpdateResourceNotFound(t *testing.T) {
r.PATCH("/v1/resource/99").
SetHeader(gofight.H{"Content-Type": "application/json"}).
SetJSON(updateMessage).
Run(handler(NewMockDatabase(nil), NewMockStream(""), nil),
Run(handler(nil, nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusNotFound, r.Code)
})
Expand All @@ -64,7 +64,7 @@ func TestUpdateResourceNotFound(t *testing.T) {
func TestUpdateResourceEmptyRequest(t *testing.T) {
r := gofight.New()
r.PATCH("/v1/resource/100").
Run(handler(NewMockDatabase(nil), NewMockStream(""), nil),
Run(handler(nil, nil, nil),
func(r gofight.HTTPResponse, rq gofight.HTTPRequest) {
assert.Equal(t, http.StatusUnprocessableEntity, r.Code)
})
Expand Down