diff --git a/api/handlers/asset_handler.go b/api/handlers/asset_handler.go index 511a9d9a..846d8de2 100644 --- a/api/handlers/asset_handler.go +++ b/api/handlers/asset_handler.go @@ -14,6 +14,7 @@ import ( "github.com/odpf/columbus/asset" "github.com/odpf/columbus/discovery" "github.com/odpf/columbus/star" + "github.com/odpf/columbus/user" ) // AssetHandler exposes a REST interface to types @@ -39,9 +40,9 @@ func NewAssetHandler( return handler } -func (h *AssetHandler) Get(w http.ResponseWriter, r *http.Request) { +func (h *AssetHandler) GetAll(w http.ResponseWriter, r *http.Request) { config := h.buildAssetConfig(r.URL.Query()) - assets, err := h.assetRepository.Get(r.Context(), config) + assets, err := h.assetRepository.GetAll(r.Context(), config) if err != nil { internalServerError(w, h.logger, err.Error()) return @@ -90,6 +91,13 @@ func (h *AssetHandler) GetByID(w http.ResponseWriter, r *http.Request) { } func (h *AssetHandler) Upsert(w http.ResponseWriter, r *http.Request) { + userID := user.FromContext(r.Context()) + if userID == "" { + h.logger.Warn(errMissingUserInfo.Error()) + WriteJSONError(w, http.StatusBadRequest, errMissingUserInfo.Error()) + return + } + var ast asset.Asset err := json.NewDecoder(r.Body).Decode(&ast) if err != nil { @@ -101,6 +109,7 @@ func (h *AssetHandler) Upsert(w http.ResponseWriter, r *http.Request) { return } + ast.UpdatedBy.ID = userID assetID, err := h.assetRepository.Upsert(r.Context(), &ast) if errors.As(err, new(asset.InvalidError)) { WriteJSONError(w, http.StatusBadRequest, err.Error()) @@ -170,6 +179,57 @@ func (h *AssetHandler) GetStargazers(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, users) } +func (h *AssetHandler) GetVersionHistory(w http.ResponseWriter, r *http.Request) { + config := h.buildAssetConfig(r.URL.Query()) + + pathParams := mux.Vars(r) + assetID := pathParams["id"] + + assetVersions, err := h.assetRepository.GetVersionHistory(r.Context(), config, assetID) + if err != nil { + if errors.As(err, new(asset.InvalidError)) { + WriteJSONError(w, http.StatusBadRequest, err.Error()) + return + } + if errors.As(err, new(asset.NotFoundError)) { + WriteJSONError(w, http.StatusNotFound, err.Error()) + return + } + internalServerError(w, h.logger, err.Error()) + return + } + + writeJSON(w, http.StatusOK, assetVersions) +} + +func (h *AssetHandler) GetByVersion(w http.ResponseWriter, r *http.Request) { + + pathParams := mux.Vars(r) + assetID := pathParams["id"] + version := pathParams["version"] + + if _, err := asset.ParseVersion(version); err != nil { + WriteJSONError(w, http.StatusNotFound, err.Error()) + return + } + + ast, err := h.assetRepository.GetByVersion(r.Context(), assetID, version) + if err != nil { + if errors.As(err, new(asset.InvalidError)) { + WriteJSONError(w, http.StatusBadRequest, err.Error()) + return + } + if errors.As(err, new(asset.NotFoundError)) { + WriteJSONError(w, http.StatusNotFound, err.Error()) + return + } + internalServerError(w, h.logger, err.Error()) + return + } + + writeJSON(w, http.StatusOK, ast) +} + func (h *AssetHandler) validateAsset(ast asset.Asset) error { if ast.URN == "" { return fmt.Errorf("urn is required") diff --git a/api/handlers/asset_handler_test.go b/api/handlers/asset_handler_test.go index 38da9162..1787e3c4 100644 --- a/api/handlers/asset_handler_test.go +++ b/api/handlers/asset_handler_test.go @@ -32,6 +32,7 @@ var ( ) func TestAssetHandlerUpsert(t *testing.T) { + var userID = uuid.NewString() var validPayload = `{"urn": "test dagger", "type": "table", "name": "de-dagger-test", "service": "kafka", "data": {}}` t.Run("should return HTTP 400 for invalid payload", func(t *testing.T) { @@ -72,7 +73,10 @@ func TestAssetHandlerUpsert(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { rw := httptest.NewRecorder() + rr := httptest.NewRequest("PUT", "/", strings.NewReader(testCase.payload)) + ctx := user.NewContext(rr.Context(), userID) + rr = rr.WithContext(ctx) handler := handlers.NewAssetHandler(logger, nil, nil, nil) handler.Upsert(rw, rr) @@ -85,9 +89,12 @@ func TestAssetHandlerUpsert(t *testing.T) { }) } }) + t.Run("should return HTTP 500 if the asset creation/update fails", func(t *testing.T) { t.Run("AssetRepository fails", func(t *testing.T) { rr := httptest.NewRequest("PUT", "/", strings.NewReader(validPayload)) + ctx := user.NewContext(rr.Context(), userID) + rr = rr.WithContext(ctx) rw := httptest.NewRecorder() expectedErr := errors.New("unknown error") @@ -108,6 +115,8 @@ func TestAssetHandlerUpsert(t *testing.T) { }) t.Run("DiscoveryRepository fails", func(t *testing.T) { rr := httptest.NewRequest("PUT", "/", strings.NewReader(validPayload)) + ctx := user.NewContext(rr.Context(), userID) + rr = rr.WithContext(ctx) rw := httptest.NewRecorder() expectedErr := errors.New("unknown error") @@ -131,18 +140,22 @@ func TestAssetHandlerUpsert(t *testing.T) { assert.Contains(t, response.Reason, "Internal Server Error") }) }) + t.Run("should return HTTP 200 and asset's ID if the asset is successfully created/updated", func(t *testing.T) { ast := asset.Asset{ - URN: "test dagger", - Type: asset.TypeTable, - Name: "de-dagger-test", - Service: "kafka", - Data: map[string]interface{}{}, + URN: "test dagger", + Type: asset.TypeTable, + Name: "de-dagger-test", + Service: "kafka", + UpdatedBy: user.User{ID: userID}, + Data: map[string]interface{}{}, } assetWithID := ast assetWithID.ID = uuid.New().String() rr := httptest.NewRequest("PUT", "/", strings.NewReader(validPayload)) + ctx := user.NewContext(rr.Context(), userID) + rr = rr.WithContext(ctx) rw := httptest.NewRecorder() ar := new(mocks.AssetRepository) @@ -344,7 +357,7 @@ func TestAssetHandlerGet(t *testing.T) { Description: `should return http 500 if fetching fails`, ExpectStatus: http.StatusInternalServerError, Setup: func(ctx context.Context, ar *mocks.AssetRepository) { - ar.On("Get", ctx, asset.Config{}).Return([]asset.Asset{}, errors.New("unknown error")) + ar.On("GetAll", ctx, asset.Config{}).Return([]asset.Asset{}, errors.New("unknown error")) }, }, { @@ -352,7 +365,7 @@ func TestAssetHandlerGet(t *testing.T) { Querystring: "?with_total=1", ExpectStatus: http.StatusInternalServerError, Setup: func(ctx context.Context, ar *mocks.AssetRepository) { - ar.On("Get", ctx, asset.Config{}).Return([]asset.Asset{}, nil) + ar.On("GetAll", ctx, asset.Config{}).Return([]asset.Asset{}, nil) ar.On("GetCount", ctx, asset.Config{}).Return(0, errors.New("unknown error")) }, }, @@ -361,7 +374,7 @@ func TestAssetHandlerGet(t *testing.T) { Querystring: "?text=asd&type=table&service=bigquery&size=30&offset=50", ExpectStatus: http.StatusOK, Setup: func(ctx context.Context, ar *mocks.AssetRepository) { - ar.On("Get", ctx, asset.Config{ + ar.On("GetAll", ctx, asset.Config{ Text: "asd", Type: "table", Service: "bigquery", @@ -374,7 +387,7 @@ func TestAssetHandlerGet(t *testing.T) { Description: "should return http 200 status along with list of assets", ExpectStatus: http.StatusOK, Setup: func(ctx context.Context, ar *mocks.AssetRepository) { - ar.On("Get", ctx, asset.Config{}).Return([]asset.Asset{ + ar.On("GetAll", ctx, asset.Config{}).Return([]asset.Asset{ {ID: "testid-1"}, {ID: "testid-2"}, }, nil) @@ -406,7 +419,7 @@ func TestAssetHandlerGet(t *testing.T) { ExpectStatus: http.StatusOK, Querystring: "?with_total=true&text=dsa&type=job&service=kafka&size=10&offset=5", Setup: func(ctx context.Context, ar *mocks.AssetRepository) { - ar.On("Get", ctx, asset.Config{ + ar.On("GetAll", ctx, asset.Config{ Text: "dsa", Type: "job", Service: "kafka", @@ -458,7 +471,7 @@ func TestAssetHandlerGet(t *testing.T) { tc.Setup(rr.Context(), ar) handler := handlers.NewAssetHandler(logger, ar, nil, nil) - handler.Get(rw, rr) + handler.GetAll(rw, rr) if rw.Code != tc.ExpectStatus { t.Errorf("expected handler to return http %d, returned %d instead", tc.ExpectStatus, rw.Code) @@ -485,7 +498,7 @@ func TestAssetHandlerGetStargazers(t *testing.T) { offset := 10 size := 20 defaultStarCfg := star.Config{Offset: offset, Size: size} - assetID := "dummy-asset-id" + var assetID = uuid.NewString() var testCases = []testCase{ { @@ -541,17 +554,195 @@ func TestAssetHandlerGetStargazers(t *testing.T) { defer sr.AssertExpectations(t) tc.Setup(&tc, sr) - handler := handlers.NewAssetHandler(logger, nil, nil, sr) - router := mux.NewRouter() - router.Path("/assets/{id}/stargazers").Methods("GET").HandlerFunc(handler.GetStargazers) - rr := httptest.NewRequest("GET", "/assets", nil) + rr := httptest.NewRequest("GET", "/", nil) rw := httptest.NewRecorder() - + rr = mux.SetURLVars(rr, map[string]string{ + "id": assetID, + }) if tc.MutateRequest != nil { rr = tc.MutateRequest(rr) } - router.ServeHTTP(rw, rr) + handler := handlers.NewAssetHandler(logger, nil, nil, sr) + handler.GetStargazers(rw, rr) + + }) + } +} + +func TestAssetHandlerGetVersionHistory(t *testing.T) { + var assetID = uuid.NewString() + + type testCase struct { + Description string + Querystring string + ExpectStatus int + Setup func(context.Context, *mocks.AssetRepository) + PostCheck func(resp *http.Response) error + } + + var testCases = []testCase{ + { + Description: `should return http 400 if asset id is not uuid`, + ExpectStatus: http.StatusBadRequest, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetVersionHistory", ctx, asset.Config{}, assetID).Return([]asset.AssetVersion{}, asset.InvalidError{AssetID: assetID}) + }, + }, + { + Description: `should return http 500 if fetching fails`, + ExpectStatus: http.StatusInternalServerError, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetVersionHistory", ctx, asset.Config{}, assetID).Return([]asset.AssetVersion{}, errors.New("unknown error")) + }, + }, + { + Description: `should parse querystring to get config`, + Querystring: "?size=30&offset=50", + ExpectStatus: http.StatusOK, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetVersionHistory", ctx, asset.Config{ + Size: 30, + Offset: 50, + }, assetID).Return([]asset.AssetVersion{}, nil) + }, + }, + { + Description: "should return http 200 status along with list of asset versions", + ExpectStatus: http.StatusOK, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetVersionHistory", ctx, asset.Config{}, assetID).Return([]asset.AssetVersion{ + {ID: "testid-1"}, + {ID: "testid-2"}, + }, nil) + }, + PostCheck: func(r *http.Response) error { + expected := []asset.AssetVersion{ + {ID: "testid-1"}, + {ID: "testid-2"}, + } + var actual []asset.AssetVersion + err := json.NewDecoder(r.Body).Decode(&actual) + if err != nil { + return fmt.Errorf("error reading response body: %w", err) + } + if reflect.DeepEqual(actual, expected) == false { + return fmt.Errorf("expected payload to be to be %+v, was %+v", expected, actual) + } + return nil + }, + }, + } + for _, tc := range testCases { + t.Run(tc.Description, func(t *testing.T) { + rr := httptest.NewRequest("GET", "/"+tc.Querystring, nil) + rr = mux.SetURLVars(rr, map[string]string{ + "id": assetID, + }) + rw := httptest.NewRecorder() + + ar := new(mocks.AssetRepository) + tc.Setup(rr.Context(), ar) + + handler := handlers.NewAssetHandler(logger, ar, nil, nil) + handler.GetVersionHistory(rw, rr) + + if rw.Code != tc.ExpectStatus { + t.Errorf("expected handler to return http %d, returned %d instead", tc.ExpectStatus, rw.Code) + return + } + if tc.PostCheck != nil { + if err := tc.PostCheck(rw.Result()); err != nil { + t.Error(err) + return + } + } + }) + } +} + +func TestAssetHandlerGetByVersion(t *testing.T) { + var ( + assetID = uuid.NewString() + version = "0.2" + ast = asset.Asset{ + ID: assetID, + Version: version, + } + ) + + type testCase struct { + Description string + ExpectStatus int + Setup func(context.Context, *mocks.AssetRepository) + PostCheck func(resp *http.Response) error + } + + var testCases = []testCase{ + { + Description: `should return http 400 if asset id is not uuid`, + ExpectStatus: http.StatusBadRequest, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetByVersion", ctx, assetID, version).Return(asset.Asset{}, asset.InvalidError{AssetID: assetID}) + }, + }, + { + Description: `should return http 404 if asset doesn't exist`, + ExpectStatus: http.StatusNotFound, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetByVersion", ctx, assetID, version).Return(asset.Asset{}, asset.NotFoundError{AssetID: assetID}) + }, + }, + { + Description: `should return http 500 if fetching fails`, + ExpectStatus: http.StatusInternalServerError, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetByVersion", ctx, assetID, version).Return(asset.Asset{}, errors.New("unknown error")) + }, + }, + { + Description: "should return http 200 status along with the asset, if found", + ExpectStatus: http.StatusOK, + Setup: func(ctx context.Context, ar *mocks.AssetRepository) { + ar.On("GetByVersion", ctx, assetID, version).Return(ast, nil) + }, + PostCheck: func(r *http.Response) error { + var responsePayload asset.Asset + err := json.NewDecoder(r.Body).Decode(&responsePayload) + if err != nil { + return fmt.Errorf("error reading response body: %w", err) + } + if reflect.DeepEqual(responsePayload, ast) == false { + return fmt.Errorf("expected returned asset to be to be %+v, was %+v", ast, responsePayload) + } + return nil + }, + }, + } + for _, tc := range testCases { + t.Run(tc.Description, func(t *testing.T) { + rr := httptest.NewRequest("GET", "/", nil) + rw := httptest.NewRecorder() + rr = mux.SetURLVars(rr, map[string]string{ + "id": assetID, + "version": version, + }) + ar := new(mocks.AssetRepository) + tc.Setup(rr.Context(), ar) + + handler := handlers.NewAssetHandler(logger, ar, nil, nil) + handler.GetByVersion(rw, rr) + + if rw.Code != tc.ExpectStatus { + t.Errorf("expected handler to return http %d, returned %d instead", tc.ExpectStatus, rw.Code) + return + } + if tc.PostCheck != nil { + if err := tc.PostCheck(rw.Result()); err != nil { + t.Error(err) + return + } + } }) } } diff --git a/api/handlers/errors.go b/api/handlers/errors.go index c4b3d3ce..e72badc9 100644 --- a/api/handlers/errors.go +++ b/api/handlers/errors.go @@ -3,5 +3,5 @@ package handlers import "errors" var ( - errMissingUserID = errors.New("missing user id") + errMissingUserInfo = errors.New("missing user information") ) diff --git a/api/handlers/user_handler.go b/api/handlers/user_handler.go index 70478c83..6ae00bb5 100644 --- a/api/handlers/user_handler.go +++ b/api/handlers/user_handler.go @@ -19,8 +19,8 @@ type UserHandler struct { func (h *UserHandler) GetStarredAssetsWithHeader(w http.ResponseWriter, r *http.Request) { userID := user.FromContext(r.Context()) if userID == "" { - h.logger.Warn(errMissingUserID.Error()) - WriteJSONError(w, http.StatusBadRequest, errMissingUserID.Error()) + h.logger.Warn(errMissingUserInfo.Error()) + WriteJSONError(w, http.StatusBadRequest, errMissingUserInfo.Error()) return } @@ -46,7 +46,7 @@ func (h *UserHandler) GetStarredAssetsWithHeader(w http.ResponseWriter, r *http. func (h *UserHandler) GetStarredAssetsWithPath(w http.ResponseWriter, r *http.Request) { targetUserID := mux.Vars(r)["user_id"] if targetUserID == "" { - WriteJSONError(w, http.StatusBadRequest, errMissingUserID.Error()) + WriteJSONError(w, http.StatusBadRequest, errMissingUserInfo.Error()) return } @@ -72,8 +72,8 @@ func (h *UserHandler) GetStarredAssetsWithPath(w http.ResponseWriter, r *http.Re func (h *UserHandler) StarAsset(w http.ResponseWriter, r *http.Request) { userID := user.FromContext(r.Context()) if userID == "" { - h.logger.Warn(errMissingUserID.Error()) - WriteJSONError(w, http.StatusBadRequest, errMissingUserID.Error()) + h.logger.Warn(errMissingUserInfo.Error()) + WriteJSONError(w, http.StatusBadRequest, errMissingUserInfo.Error()) return } @@ -105,8 +105,8 @@ func (h *UserHandler) StarAsset(w http.ResponseWriter, r *http.Request) { func (h *UserHandler) GetStarredAsset(w http.ResponseWriter, r *http.Request) { userID := user.FromContext(r.Context()) if userID == "" { - h.logger.Warn(errMissingUserID.Error()) - WriteJSONError(w, http.StatusBadRequest, errMissingUserID.Error()) + h.logger.Warn(errMissingUserInfo.Error()) + WriteJSONError(w, http.StatusBadRequest, errMissingUserInfo.Error()) return } @@ -133,8 +133,8 @@ func (h *UserHandler) GetStarredAsset(w http.ResponseWriter, r *http.Request) { func (h *UserHandler) UnstarAsset(w http.ResponseWriter, r *http.Request) { userID := user.FromContext(r.Context()) if userID == "" { - h.logger.Warn(errMissingUserID.Error()) - WriteJSONError(w, http.StatusBadRequest, errMissingUserID.Error()) + h.logger.Warn(errMissingUserInfo.Error()) + WriteJSONError(w, http.StatusBadRequest, errMissingUserInfo.Error()) return } diff --git a/api/handlers/user_handler_test.go b/api/handlers/user_handler_test.go index e866a75a..645638b1 100644 --- a/api/handlers/user_handler_test.go +++ b/api/handlers/user_handler_test.go @@ -3,7 +3,6 @@ package handlers_test import ( "encoding/json" "errors" - "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -11,6 +10,7 @@ import ( "strconv" "testing" + "github.com/google/uuid" "github.com/gorilla/mux" "github.com/odpf/columbus/api/handlers" "github.com/odpf/columbus/asset" @@ -162,7 +162,7 @@ func TestGetStarredWithPath(t *testing.T) { PostCheck func(t *testing.T, tc *testCase, resp *http.Response) error } - pathUserID := "a-path-user-id" + pathUserID := uuid.NewString() offset := 10 size := 20 @@ -171,7 +171,6 @@ func TestGetStarredWithPath(t *testing.T) { Description: "should return 500 status code if failed to fetch starred", ExpectStatus: http.StatusInternalServerError, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s/starred", pathUserID) params := url.Values{} params.Add("offset", strconv.Itoa(offset)) params.Add("size", strconv.Itoa(size)) @@ -186,7 +185,6 @@ func TestGetStarredWithPath(t *testing.T) { Description: "should return 400 status code if star repository return invalid error", ExpectStatus: http.StatusBadRequest, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s/starred", pathUserID) params := url.Values{} params.Add("offset", strconv.Itoa(offset)) params.Add("size", strconv.Itoa(size)) @@ -201,7 +199,6 @@ func TestGetStarredWithPath(t *testing.T) { Description: "should return 404 status code if starred not found", ExpectStatus: http.StatusNotFound, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s/starred", pathUserID) params := url.Values{} params.Add("offset", strconv.Itoa(offset)) params.Add("size", strconv.Itoa(size)) @@ -216,7 +213,6 @@ func TestGetStarredWithPath(t *testing.T) { Description: "should return 200 starred assets of a user if no error", ExpectStatus: http.StatusOK, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s/starred", pathUserID) params := url.Values{} params.Add("offset", strconv.Itoa(offset)) params.Add("size", strconv.Itoa(size)) @@ -256,16 +252,17 @@ func TestGetStarredWithPath(t *testing.T) { tc.Setup(&tc, sr) handler := handlers.NewUserHandler(logger, sr) - router := mux.NewRouter() - router.Path("/v1beta1/{user_id}/starred").Methods("GET").HandlerFunc(handler.GetStarredAssetsWithPath) - rr := httptest.NewRequest("GET", "/v1beta1", nil) + rr := httptest.NewRequest("GET", "/", nil) rw := httptest.NewRecorder() + rr = mux.SetURLVars(rr, map[string]string{ + "user_id": pathUserID, + }) if tc.MutateRequest != nil { rr = tc.MutateRequest(rr) } - router.ServeHTTP(rw, rr) + handler.GetStarredAssetsWithPath(rw, rr) if rw.Code != tc.ExpectStatus { t.Errorf("expected handler to return %d status, was %d instead", tc.ExpectStatus, rw.Code) return @@ -295,17 +292,12 @@ func TestStarAsset(t *testing.T) { { Description: "should return 400 status code if user id not found in context", ExpectStatus: http.StatusBadRequest, - MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) - return req - }, - Setup: func(tc *testCase, sr *mocks.StarRepository) {}, + Setup: func(tc *testCase, sr *mocks.StarRepository) {}, }, { Description: "should return 400 status code if asset id in param is invalid", ExpectStatus: http.StatusBadRequest, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -317,7 +309,6 @@ func TestStarAsset(t *testing.T) { Description: "should return 400 status code if star repository return invalid error", ExpectStatus: http.StatusBadRequest, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -329,7 +320,6 @@ func TestStarAsset(t *testing.T) { Description: "should return 404 status code if user not found", ExpectStatus: http.StatusNotFound, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -341,7 +331,6 @@ func TestStarAsset(t *testing.T) { Description: "should return 500 status code if failed to star an asset", ExpectStatus: http.StatusInternalServerError, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -353,7 +342,6 @@ func TestStarAsset(t *testing.T) { Description: "should return 204 if starring success", ExpectStatus: http.StatusNoContent, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -365,7 +353,6 @@ func TestStarAsset(t *testing.T) { Description: "should return 204 if asset is already starred", ExpectStatus: http.StatusNoContent, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -382,16 +369,17 @@ func TestStarAsset(t *testing.T) { tc.Setup(&tc, sr) handler := handlers.NewUserHandler(logger, sr) - router := mux.NewRouter() - router.Path("/user/starred/{asset_id}").Methods("PUT").HandlerFunc(handler.StarAsset) - rr := httptest.NewRequest("PUT", "/user/starred", nil) + rr := httptest.NewRequest("PUT", "/", nil) rw := httptest.NewRecorder() + rr = mux.SetURLVars(rr, map[string]string{ + "asset_id": assetID, + }) if tc.MutateRequest != nil { rr = tc.MutateRequest(rr) } - router.ServeHTTP(rw, rr) + handler.StarAsset(rw, rr) if rw.Code != tc.ExpectStatus { t.Errorf("expected handler to return %d status, was %d instead", tc.ExpectStatus, rw.Code) return @@ -418,17 +406,12 @@ func TestGetStarredAsset(t *testing.T) { { Description: "should return 400 status code if user id not found in context", ExpectStatus: http.StatusBadRequest, - MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) - return req - }, - Setup: func(tc *testCase, sr *mocks.StarRepository) {}, + Setup: func(tc *testCase, sr *mocks.StarRepository) {}, }, { Description: "should return 400 status code if asset id in param is invalid", ExpectStatus: http.StatusBadRequest, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -440,7 +423,6 @@ func TestGetStarredAsset(t *testing.T) { Description: "should return 400 status code if star repository return invalid error", ExpectStatus: http.StatusBadRequest, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -452,7 +434,6 @@ func TestGetStarredAsset(t *testing.T) { Description: "should return 404 status code if a star not found", ExpectStatus: http.StatusNotFound, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -464,7 +445,6 @@ func TestGetStarredAsset(t *testing.T) { Description: "should return 500 status code if failed to fetch a starred asset", ExpectStatus: http.StatusInternalServerError, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -476,7 +456,6 @@ func TestGetStarredAsset(t *testing.T) { Description: "should return 200 starred assets of a user if no error", ExpectStatus: http.StatusOK, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -504,16 +483,17 @@ func TestGetStarredAsset(t *testing.T) { tc.Setup(&tc, sr) handler := handlers.NewUserHandler(logger, sr) - router := mux.NewRouter() - router.Path("/user/starred/{asset_id}").Methods("GET").HandlerFunc(handler.GetStarredAsset) - rr := httptest.NewRequest("GET", "/user/starred", nil) + rr := httptest.NewRequest("GET", "/", nil) rw := httptest.NewRecorder() + rr = mux.SetURLVars(rr, map[string]string{ + "asset_id": assetID, + }) if tc.MutateRequest != nil { rr = tc.MutateRequest(rr) } - router.ServeHTTP(rw, rr) + handler.GetStarredAsset(rw, rr) if rw.Code != tc.ExpectStatus { t.Errorf("expected handler to return %d status, was %d instead", tc.ExpectStatus, rw.Code) return @@ -543,17 +523,12 @@ func TestUnstarAsset(t *testing.T) { { Description: "should return 400 status code if user id not found in context", ExpectStatus: http.StatusBadRequest, - MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) - return req - }, - Setup: func(tc *testCase, sr *mocks.StarRepository) {}, + Setup: func(tc *testCase, sr *mocks.StarRepository) {}, }, { Description: "should return 400 status code if asset id is empty", ExpectStatus: http.StatusBadRequest, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -565,7 +540,6 @@ func TestUnstarAsset(t *testing.T) { Description: "should return 400 status code if star repository return invalid error", ExpectStatus: http.StatusBadRequest, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -577,7 +551,6 @@ func TestUnstarAsset(t *testing.T) { Description: "should return 500 status code if failed to unstar an asset", ExpectStatus: http.StatusInternalServerError, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -589,7 +562,6 @@ func TestUnstarAsset(t *testing.T) { Description: "should return 204 if unstarring success", ExpectStatus: http.StatusNoContent, MutateRequest: func(req *http.Request) *http.Request { - req.URL.Path += fmt.Sprintf("/%s", assetID) ctx := user.NewContext(req.Context(), userID) return req.WithContext(ctx) }, @@ -606,16 +578,17 @@ func TestUnstarAsset(t *testing.T) { tc.Setup(&tc, sr) handler := handlers.NewUserHandler(logger, sr) - router := mux.NewRouter() - router.Path("/user/starred/{asset_id}").Methods("DELETE").HandlerFunc(handler.UnstarAsset) - rr := httptest.NewRequest("DELETE", "/user/starred", nil) + rr := httptest.NewRequest("DELETE", "/", nil) rw := httptest.NewRecorder() + rr = mux.SetURLVars(rr, map[string]string{ + "asset_id": assetID, + }) if tc.MutateRequest != nil { rr = tc.MutateRequest(rr) } - router.ServeHTTP(rw, rr) + handler.UnstarAsset(rw, rr) if rw.Code != tc.ExpectStatus { t.Errorf("expected handler to return %d status, was %d instead", tc.ExpectStatus, rw.Code) return diff --git a/api/middleware/user_test.go b/api/middleware/user_test.go index 5bb4ecab..660c2888 100644 --- a/api/middleware/user_test.go +++ b/api/middleware/user_test.go @@ -22,16 +22,16 @@ const ( identityProviderHeader = "Columbus-User-Provider" ) +var userCfg = user.Config{IdentityProviderDefaultName: "shield"} + func TestValidateUser(t *testing.T) { middlewareCfg := Config{ Logger: log.NewNoop(), IdentityHeader: identityHeader, } - userCfg := user.Config{ - IdentityProviderDefaultName: "shield", - } + t.Run("should return HTTP 400 when identity header not present", func(t *testing.T) { - userSvc := user.NewService(userCfg, nil) + userSvc := user.NewService(nil, userCfg) r := mux.NewRouter() r.Use(ValidateUser(middlewareCfg, userSvc)) r.Path(dummyRoute).Methods(http.MethodGet) @@ -58,7 +58,7 @@ func TestValidateUser(t *testing.T) { mockUserRepository.On("GetID", mock.Anything, mock.Anything).Return("", customError) mockUserRepository.On("Create", mock.Anything, mock.Anything).Return("", customError) - userSvc := user.NewService(userCfg, mockUserRepository) + userSvc := user.NewService(mockUserRepository, userCfg) r := mux.NewRouter() r.Use(ValidateUser(middlewareCfg, userSvc)) r.Path(dummyRoute).Methods(http.MethodGet) @@ -83,11 +83,12 @@ func TestValidateUser(t *testing.T) { t.Run("should return HTTP 200 with propagated user ID when user validation success", func(t *testing.T) { userID := "user-id" + userEmail := "some-email" mockUserRepository := &mocks.UserRepository{} mockUserRepository.On("GetID", mock.Anything, mock.Anything).Return(userID, nil) mockUserRepository.On("Create", mock.Anything, mock.Anything).Return(userID, nil) - userSvc := user.NewService(userCfg, mockUserRepository) + userSvc := user.NewService(mockUserRepository, userCfg) r := mux.NewRouter() r.Use(ValidateUser(middlewareCfg, userSvc)) r.Path(dummyRoute).Methods(http.MethodGet).HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { @@ -100,7 +101,7 @@ func TestValidateUser(t *testing.T) { }) req, _ := http.NewRequest("GET", dummyRoute, nil) - req.Header.Set(identityHeader, "some-email") + req.Header.Set(identityHeader, userEmail) req.Header.Set(identityProviderHeader, "some-provider") rr := httptest.NewRecorder() diff --git a/api/v1beta1.go b/api/v1beta1.go index 914f99ba..4bd1abad 100644 --- a/api/v1beta1.go +++ b/api/v1beta1.go @@ -45,7 +45,7 @@ func setupV1Beta1AssetRoutes(router *mux.Router, ah *handlers.AssetHandler) { router.Path(url). Methods(http.MethodGet, http.MethodHead). - HandlerFunc(ah.Get) + HandlerFunc(ah.GetAll) router.Path(url). Methods(http.MethodPut, http.MethodHead). @@ -62,6 +62,14 @@ func setupV1Beta1AssetRoutes(router *mux.Router, ah *handlers.AssetHandler) { router.Path(url+"/{id}/stargazers"). Methods(http.MethodGet, http.MethodHead). HandlerFunc(ah.GetStargazers) + + router.Path(url+"/{id}/versions"). + Methods(http.MethodGet, http.MethodHead). + HandlerFunc(ah.GetVersionHistory) + + router.Path(url+"/{id}/versions/{version}"). + Methods(http.MethodGet, http.MethodHead). + HandlerFunc(ah.GetByVersion) } func setupV1Beta1TypeRoutes(router *mux.Router, th *handlers.TypeHandler, rh *handlers.RecordHandler) { diff --git a/asset/asset.go b/asset/asset.go index 35121385..292e4aa7 100644 --- a/asset/asset.go +++ b/asset/asset.go @@ -6,35 +6,48 @@ import ( "time" "github.com/odpf/columbus/user" + + "github.com/r3labs/diff/v2" ) -// Asset is a model that wraps arbitrary data with Columbus' context -type Asset struct { - ID string `json:"id"` - URN string `json:"urn"` - Type Type `json:"type"` - Name string `json:"name"` - Service string `json:"service"` - Description string `json:"description"` - Data map[string]interface{} `json:"data"` - Labels map[string]string `json:"labels"` - Owners []user.User `json:"owners,omitempty"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` +type Config struct { + Text string `json:"text"` + Type Type `json:"type"` + Service string `json:"service"` + Size int `json:"size"` + Offset int `json:"offset"` } type Repository interface { - Get(context.Context, Config) ([]Asset, error) + GetAll(context.Context, Config) ([]Asset, error) GetCount(context.Context, Config) (int, error) GetByID(ctx context.Context, id string) (Asset, error) - Upsert(context.Context, *Asset) (string, error) + GetVersionHistory(ctx context.Context, cfg Config, id string) ([]AssetVersion, error) + GetByVersion(ctx context.Context, id string, version string) (Asset, error) + Upsert(ctx context.Context, ast *Asset) (string, error) Delete(ctx context.Context, id string) error } -type Config struct { - Text string `json:"text"` - Type Type `json:"type"` - Service string `json:"service"` - Size int `json:"size"` - Offset int `json:"offset"` +// Asset is a model that wraps arbitrary data with Columbus' context +type Asset struct { + ID string `json:"id" diff:"-"` + URN string `json:"urn" diff:"-"` + Type Type `json:"type" diff:"-"` + Service string `json:"service" diff:"-"` + Name string `json:"name" diff:"name"` + Description string `json:"description" diff:"description"` + Data map[string]interface{} `json:"data" diff:"data"` + Labels map[string]string `json:"labels" diff:"labels"` + Owners []user.User `json:"owners,omitempty" diff:"owners"` + CreatedAt time.Time `json:"created_at" diff:"-"` + UpdatedAt time.Time `json:"updated_at" diff:"-"` + Version string `json:"version" diff:"-"` + UpdatedBy user.User `json:"updated_by" diff:"-"` + Changelog diff.Changelog `json:"changelog,omitempty" diff:"-"` +} + +// Diff returns nil changelog with nil error if equal +// returns wrapped r3labs/diff Changelog struct with nil error if not equal +func (a *Asset) Diff(otherAsset *Asset) (diff.Changelog, error) { + return diff.Diff(a, otherAsset, diff.DiscardComplexOrigin(), diff.AllowTypeMismatch(true)) } diff --git a/asset/asset_test.go b/asset/asset_test.go new file mode 100644 index 00000000..6ee7b92a --- /dev/null +++ b/asset/asset_test.go @@ -0,0 +1,264 @@ +package asset_test + +import ( + "encoding/json" + "testing" + + "github.com/odpf/columbus/asset" + "github.com/r3labs/diff/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiffTopLevel(t *testing.T) { + cases := []struct { + Name string + Source, Target string + Changelog diff.Changelog + Error error + }{ + { + "ignored field won't be compared", + `{ + "id": "1234", + "urn": "urn1234", + "type": "dashboard", + "service": "service1234" + }`, + `{ + "id": "5678", + "urn": "urn5678", + "type": "job", + "service": "service5678" + }`, + nil, + nil, + }, + { + "updated top level field should be reflected", + `{ + "name": "old-name" + }`, + `{ + "name": "updated-name", + "description": "updated-decsription" + }`, + diff.Changelog{ + diff.Change{Type: diff.UPDATE, Path: []string{"name"}, From: "old-name", To: "updated-name"}, + diff.Change{Type: diff.UPDATE, Path: []string{"description"}, From: "", To: "updated-decsription"}, + }, + nil, + }, + { + "created owners should be reflected", + `{ + "name": "old-name" + }`, + `{ + "name": "old-name", + "owners": [ + { + "email": "email@odpf.io" + } + ] + }`, + diff.Changelog{ + diff.Change{Type: diff.CREATE, Path: []string{"owners", "0", "email"}, To: "email@odpf.io"}, + }, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + + var sourceAsset asset.Asset + err := json.Unmarshal([]byte(tc.Source), &sourceAsset) + if err != nil { + t.Fatal(err) + } + var targetAsset asset.Asset + err = json.Unmarshal([]byte(tc.Target), &targetAsset) + if err != nil { + t.Fatal(err) + } + + cl, err := sourceAsset.Diff(&targetAsset) + + assert.Equal(t, tc.Error, err) + require.Equal(t, len(tc.Changelog), len(cl)) + + for i, c := range cl { + assert.Equal(t, tc.Changelog[i].Type, c.Type) + assert.Equal(t, tc.Changelog[i].Path, c.Path) + assert.Equal(t, tc.Changelog[i].From, c.From) + assert.Equal(t, tc.Changelog[i].To, c.To) + } + }) + } +} + +func TestDiffData(t *testing.T) { + cases := []struct { + Name string + Source, Target string + Changelog diff.Changelog + Error error + }{ + { + "updated data value string should be reflected", + `{ + "name": "jane-kafka-1a", + "description": "", + "data": { + "title": "jane-kafka-1a", + "entity": "odpf", + "country": "vn" + } + }`, + `{ + "name": "jane-kafka-1a", + "service": "kafka", + "description": "", + "data": { + "title": "jane-kafka-1a", + "description": "a new description inside", + "entity": "odpf", + "country": "id" + } + }`, + diff.Changelog{ + diff.Change{Type: diff.UPDATE, Path: []string{"data", "country"}, From: "vn", To: "id"}, + diff.Change{Type: diff.CREATE, Path: []string{"data", "description"}, To: "a new description inside"}, + }, + nil, + }, + { + "updated data value array should be reflected", + `{ + "name": "jane-kafka-1a", + "data": { + "some_array": [ + { + "id": "element1id" + } + ], + "entity": "odpf", + "country": "vn" + } + }`, + `{ + "name": "jane-kafka-1a", + "data": { + "some_array": [ + { + "id": "element2id" + } + ], + "entity": "odpf", + "country": "vn" + } + }`, + diff.Changelog{ + diff.Change{Type: diff.UPDATE, Path: []string{"data", "some_array", "0", "id"}, From: "element1id", To: "element2id"}, + }, + nil, + }, + { + "created data value array should be reflected", + `{ + "name": "jane-kafka-1a", + "data": { + "some_array": [ + { + "id": "element1id" + } + ], + "entity": "odpf", + "country": "vn" + } + }`, + `{ + "name": "jane-kafka-1a", + "data": { + "some_array": [ + { + "id": "element1id" + }, + { + "id": "element2id" + } + ], + "entity": "odpf", + "country": "vn" + } + }`, + diff.Changelog{ + diff.Change{Type: diff.CREATE, Path: []string{"data", "some_array", "1"}, To: map[string]interface{}(map[string]interface{}{"id": "element2id"})}, + }, + nil, + }, + { + "deleted data value array should be reflected", + `{ + "name": "jane-kafka-1a", + "data": { + "some_array": [ + { + "id": "element1id" + }, + { + "id": "element2id" + } + ], + "entity": "odpf", + "country": "vn" + } + }`, + `{ + "name": "jane-kafka-1a", + "data": { + "some_array": [ + { + "id": "element1id" + } + ], + "entity": "odpf", + "country": "vn" + } + }`, + diff.Changelog{ + diff.Change{Type: diff.DELETE, Path: []string{"data", "some_array", "1"}, From: map[string]interface{}(map[string]interface{}{"id": "element2id"})}, + }, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + + var sourceAsset asset.Asset + err := json.Unmarshal([]byte(tc.Source), &sourceAsset) + if err != nil { + t.Fatal(err) + } + var targetAsset asset.Asset + err = json.Unmarshal([]byte(tc.Target), &targetAsset) + if err != nil { + t.Fatal(err) + } + + cl, err := sourceAsset.Diff(&targetAsset) + + assert.Equal(t, tc.Error, err) + require.Equal(t, len(tc.Changelog), len(cl)) + + for i, c := range cl { + assert.Equal(t, tc.Changelog[i].Type, c.Type) + assert.Equal(t, tc.Changelog[i].Path, c.Path) + assert.Equal(t, tc.Changelog[i].From, c.From) + assert.Equal(t, tc.Changelog[i].To, c.To) + } + }) + } +} diff --git a/asset/errors.go b/asset/errors.go index 193e71ba..57723803 100644 --- a/asset/errors.go +++ b/asset/errors.go @@ -8,6 +8,7 @@ import ( var ( ErrEmptyID = errors.New("asset does not have ID") ErrUnknownType = errors.New("unknown type") + ErrNilAsset = errors.New("nil asset") ) type NotFoundError struct { diff --git a/asset/version.go b/asset/version.go new file mode 100644 index 00000000..ec29d212 --- /dev/null +++ b/asset/version.go @@ -0,0 +1,44 @@ +package asset + +import ( + "fmt" + "time" + + "github.com/Masterminds/semver/v3" + "github.com/odpf/columbus/user" + "github.com/r3labs/diff/v2" +) + +const BaseVersion = "0.1" + +// AssetVersion is the changes summary of asset versions +type AssetVersion struct { + ID string `json:"id" db:"id"` + URN string `json:"urn" db:"urn"` + Type string `json:"type" db:"type"` + Service string `json:"service" db:"service"` + Version string `json:"version" db:"version"` + Changelog diff.Changelog `json:"changelog" db:"changelog"` + UpdatedBy user.User `json:"updated_by" db:"updated_by"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + UpdatedAt time.Time `json:"updated_at" db:"updated_at"` +} + +// ParseVersion returns error if version string is not in MAJOR.MINOR format +func ParseVersion(v string) (*semver.Version, error) { + semverVersion, err := semver.NewVersion(v) + if err != nil { + return nil, fmt.Errorf("invalid version \"%s\"", v) + } + return semverVersion, nil +} + +// IncreaseMinorVersion bumps up the minor version +0.1 +func IncreaseMinorVersion(v string) (string, error) { + oldVersion, err := ParseVersion(v) + if err != nil { + return "", err + } + newVersion := oldVersion.IncMinor() + return fmt.Sprintf("%d.%d", newVersion.Major(), newVersion.Minor()), nil +} diff --git a/cmd/serve.go b/cmd/serve.go index a43ccbb8..c8340cb4 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -84,11 +84,11 @@ func initRouter( if err != nil { logger.Fatal("failed to create new user repository", "error", err) } - userService := user.NewService(user.Config{ + userService := user.NewService(userRepository, user.Config{ IdentityProviderDefaultName: config.IdentityProviderDefaultName, - }, userRepository) + }) - assetRepository, err := postgres.NewAssetRepository(pgClient, userRepository, 0) + assetRepository, err := postgres.NewAssetRepository(pgClient, userRepository, 0, config.IdentityProviderDefaultName) if err != nil { logger.Fatal("failed to create new asset repository", "error", err) } @@ -167,9 +167,9 @@ func initElasticsearch(config Config, logger log.Logger) *elasticsearch.Client { Transport: nrelasticsearch.NewRoundTripper(nil), // uncomment below code to debug request and response to elasticsearch // Logger: &estransport.ColorLogger{ - // Output: os.Stdout, - // EnableRequestBody: true, - // EnableResponseBody: true, + // Output: os.Stdout, + // EnableRequestBody: true, + // EnableResponseBody: true, // }, }) if err != nil { diff --git a/go.mod b/go.mod index 5e33cf3e..a8b794ec 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/MakeNowJust/heredoc v1.0.0 + github.com/Masterminds/semver/v3 v3.1.1 github.com/Masterminds/squirrel v1.5.2 github.com/elastic/go-elasticsearch v0.0.0 github.com/elastic/go-elasticsearch/v7 v7.16.0 @@ -30,6 +31,7 @@ require ( github.com/odpf/salt v0.0.0-20220106155451-62e8c849ae81 github.com/olivere/elastic/v7 v7.0.31 github.com/ory/dockertest/v3 v3.8.1 + github.com/r3labs/diff/v2 v2.15.0 github.com/spf13/cobra v1.3.0 github.com/spf13/viper v1.10.1 github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index 6c61aee8..5cc78817 100644 --- a/go.sum +++ b/go.sum @@ -76,6 +76,8 @@ github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHg github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= +github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= +github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Masterminds/squirrel v1.5.2 h1:UiOEi2ZX4RCSkpiNDQN5kro/XIBpSRk9iTqdIRPzUXE= github.com/Masterminds/squirrel v1.5.2/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= @@ -1035,6 +1037,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/r3labs/diff/v2 v2.15.0 h1:3TEoJ6dBqESl1YgL+7curys5PvuEnwrtjkFNskgUvfg= +github.com/r3labs/diff/v2 v2.15.0/go.mod h1:I8noH9Fc2fjSaMxqF3G2lhDdC0b+JXCfyx85tWFM9kc= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -1152,6 +1156,8 @@ github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:tw github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= +github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs= @@ -1654,6 +1660,7 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/cloud v0.0.0-20151119220103-975617b05ea8/go.mod h1:0H1ncTHf11KCFhTc/+EFRbzSCOZx+VUbRMk55Yv5MYk= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= diff --git a/lib/mocks/asset_repository.go b/lib/mocks/asset_repository.go index 3e4fb41d..c632ff2b 100644 --- a/lib/mocks/asset_repository.go +++ b/lib/mocks/asset_repository.go @@ -29,8 +29,8 @@ func (_m *AssetRepository) Delete(ctx context.Context, id string) error { return r0 } -// Get provides a mock function with given fields: _a0, _a1 -func (_m *AssetRepository) Get(_a0 context.Context, _a1 asset.Config) ([]asset.Asset, error) { +// GetAll provides a mock function with given fields: _a0, _a1 +func (_m *AssetRepository) GetAll(_a0 context.Context, _a1 asset.Config) ([]asset.Asset, error) { ret := _m.Called(_a0, _a1) var r0 []asset.Asset @@ -73,6 +73,27 @@ func (_m *AssetRepository) GetByID(ctx context.Context, id string) (asset.Asset, return r0, r1 } +// GetByVersion provides a mock function with given fields: ctx, id, version +func (_m *AssetRepository) GetByVersion(ctx context.Context, id string, version string) (asset.Asset, error) { + ret := _m.Called(ctx, id, version) + + var r0 asset.Asset + if rf, ok := ret.Get(0).(func(context.Context, string, string) asset.Asset); ok { + r0 = rf(ctx, id, version) + } else { + r0 = ret.Get(0).(asset.Asset) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, id, version) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetCount provides a mock function with given fields: _a0, _a1 func (_m *AssetRepository) GetCount(_a0 context.Context, _a1 asset.Config) (int, error) { ret := _m.Called(_a0, _a1) @@ -94,20 +115,43 @@ func (_m *AssetRepository) GetCount(_a0 context.Context, _a1 asset.Config) (int, return r0, r1 } -// Upsert provides a mock function with given fields: _a0, _a1 -func (_m *AssetRepository) Upsert(_a0 context.Context, _a1 *asset.Asset) (string, error) { - ret := _m.Called(_a0, _a1) +// GetVersionHistory provides a mock function with given fields: ctx, cfg, id +func (_m *AssetRepository) GetVersionHistory(ctx context.Context, cfg asset.Config, id string) ([]asset.AssetVersion, error) { + ret := _m.Called(ctx, cfg, id) + + var r0 []asset.AssetVersion + if rf, ok := ret.Get(0).(func(context.Context, asset.Config, string) []asset.AssetVersion); ok { + r0 = rf(ctx, cfg, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]asset.AssetVersion) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, asset.Config, string) error); ok { + r1 = rf(ctx, cfg, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Upsert provides a mock function with given fields: ctx, ast +func (_m *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (string, error) { + ret := _m.Called(ctx, ast) var r0 string if rf, ok := ret.Get(0).(func(context.Context, *asset.Asset) string); ok { - r0 = rf(_a0, _a1) + r0 = rf(ctx, ast) } else { r0 = ret.Get(0).(string) } var r1 error if rf, ok := ret.Get(1).(func(context.Context, *asset.Asset) error); ok { - r1 = rf(_a0, _a1) + r1 = rf(ctx, ast) } else { r1 = ret.Error(1) } diff --git a/star/star.go b/star/star.go index 4373edec..64675e6f 100644 --- a/star/star.go +++ b/star/star.go @@ -4,19 +4,11 @@ package star import ( "context" - "time" "github.com/odpf/columbus/asset" "github.com/odpf/columbus/user" ) -type Star struct { - ID string `json:"id"` - Asset asset.Asset `json:"asset"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` -} - type Repository interface { Create(ctx context.Context, userID string, assetID string) (string, error) GetStargazers(ctx context.Context, cfg Config, assetID string) ([]user.User, error) diff --git a/store/postgres/asset_model.go b/store/postgres/asset_model.go index 1b6d02dd..d7ed9eea 100644 --- a/store/postgres/asset_model.go +++ b/store/postgres/asset_model.go @@ -7,7 +7,10 @@ import ( "fmt" "time" + "github.com/jmoiron/sqlx/types" "github.com/odpf/columbus/asset" + "github.com/odpf/columbus/user" + "github.com/r3labs/diff/v2" ) type AssetModel struct { @@ -19,11 +22,17 @@ type AssetModel struct { Description string `db:"description"` Data JSONMap `db:"data"` Labels JSONMap `db:"labels"` + Version string `db:"version"` + UpdatedBy UserModel `db:"updated_by"` CreatedAt time.Time `db:"created_at"` UpdatedAt time.Time `db:"updated_at"` + // version specific information + Changelog types.JSONText `db:"changelog"` + Owners types.JSONText `db:"owners"` } -func (a *AssetModel) toAsset() asset.Asset { +func (a *AssetModel) toAsset(owners []user.User) asset.Asset { + return asset.Asset{ ID: a.ID, URN: a.URN, @@ -33,11 +42,65 @@ func (a *AssetModel) toAsset() asset.Asset { Description: a.Description, Data: a.Data, Labels: a.buildLabels(), + Owners: owners, + Version: a.Version, + UpdatedBy: a.UpdatedBy.toUser(), CreatedAt: a.CreatedAt, UpdatedAt: a.UpdatedAt, } } +func (a *AssetModel) toAssetVersion() (asset.AssetVersion, error) { + + var clog diff.Changelog + err := a.Changelog.Unmarshal(&clog) + if err != nil { + return asset.AssetVersion{}, err + } + + return asset.AssetVersion{ + ID: a.ID, + URN: a.URN, + Type: a.Type, + Service: a.Service, + Version: a.Version, + Changelog: clog, + UpdatedBy: a.UpdatedBy.toUser(), + CreatedAt: a.CreatedAt, + }, nil +} + +func (a *AssetModel) toVersionedAsset(latestAssetVersion asset.Asset) (asset.Asset, error) { + var owners []user.User + err := a.Owners.Unmarshal(&owners) + if err != nil { + return asset.Asset{}, err + } + + var clog diff.Changelog + err = a.Changelog.Unmarshal(&clog) + if err != nil { + return asset.Asset{}, err + } + + return asset.Asset{ + ID: latestAssetVersion.ID, + URN: latestAssetVersion.URN, + Type: asset.Type(latestAssetVersion.Type), + Name: a.Name, + Service: latestAssetVersion.Service, + Description: a.Description, + Data: a.Data, + Labels: a.buildLabels(), + Owners: owners, + Version: a.Version, + UpdatedBy: a.UpdatedBy.toUser(), + CreatedAt: a.CreatedAt, + UpdatedAt: a.UpdatedAt, + Changelog: clog, + }, nil +} + func (a *AssetModel) buildLabels() map[string]string { if a.Labels == nil { return nil diff --git a/store/postgres/asset_repository.go b/store/postgres/asset_repository.go index 3fc9df96..1610843d 100644 --- a/store/postgres/asset_repository.go +++ b/store/postgres/asset_repository.go @@ -3,31 +3,35 @@ package postgres import ( "context" "database/sql" + "encoding/json" "errors" "fmt" "strings" + "time" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" "github.com/odpf/columbus/asset" "github.com/odpf/columbus/user" + "github.com/r3labs/diff/v2" ) // AssetRepository is a type that manages user operation to the primary database type AssetRepository struct { - client *Client - userRepo *UserRepository - defaultGetMaxSize int + client *Client + userRepo *UserRepository + defaultGetMaxSize int + defaultUserProvider string } -// Get retrieves list of assets with filters via config -func (r *AssetRepository) Get(ctx context.Context, config asset.Config) (assets []asset.Asset, err error) { +// GetAll retrieves list of assets with filters via config +func (r *AssetRepository) GetAll(ctx context.Context, config asset.Config) (assets []asset.Asset, err error) { size := config.Size if size == 0 { size = r.defaultGetMaxSize } - builder := sq.Select("*").From("assets"). + builder := r.getAssetSQL(). Limit(uint64(size)). Offset(uint64(config.Offset)) builder = r.buildFilterQuery(builder, config) @@ -36,7 +40,6 @@ func (r *AssetRepository) Get(ctx context.Context, config asset.Config) (assets err = fmt.Errorf("error building query: %w", err) return } - ams := []*AssetModel{} err = r.client.db.SelectContext(ctx, &ams, query, args...) if err != nil { @@ -46,13 +49,13 @@ func (r *AssetRepository) Get(ctx context.Context, config asset.Config) (assets assets = []asset.Asset{} for _, am := range ams { - assets = append(assets, am.toAsset()) + assets = append(assets, am.toAsset(nil)) } return } -// Get retrieves list of assets with filters via config +// GetCount retrieves number of assets for every type func (r *AssetRepository) GetCount(ctx context.Context, config asset.Config) (total int, err error) { builder := sq.Select("count(1)").From("assets") builder = r.buildFilterQuery(builder, config) @@ -71,31 +74,133 @@ func (r *AssetRepository) GetCount(ctx context.Context, config asset.Config) (to // GetByID retrieves asset by its ID func (r *AssetRepository) GetByID(ctx context.Context, id string) (ast asset.Asset, err error) { - if !isValidUUID(id) { - return asset.Asset{}, asset.InvalidError{AssetID: id} + err = asset.InvalidError{AssetID: id} + return } - query := `SELECT * FROM assets WHERE id = $1 LIMIT 1;` + builder := r.getAssetSQL(). + Where(sq.Eq{"a.id": id}). + Limit(1) + query, args, err := r.buildSQL(builder) + if err != nil { + err = fmt.Errorf("error building query: %w", err) + return + } am := &AssetModel{} - err = r.client.db.GetContext(ctx, am, query, id) + err = r.client.db.GetContext(ctx, am, query, args...) if errors.Is(err, sql.ErrNoRows) { err = asset.NotFoundError{AssetID: id} return } + if err != nil { err = fmt.Errorf("error getting asset with ID = \"%s\": %w", id, err) return } - ast = am.toAsset() owners, err := r.getOwners(ctx, id) if err != nil { - err = fmt.Errorf("error getting asset's owners with ID = \"%s\": %w", id, err) + err = fmt.Errorf("error getting asset with ID = \"%s\": %w", id, err) + return + } + + ast = am.toAsset(owners) + + return +} + +// GetVersionHistory retrieves the previous versions of an asset +func (r *AssetRepository) GetVersionHistory(ctx context.Context, cfg asset.Config, id string) (avs []asset.AssetVersion, err error) { + if !isValidUUID(id) { + err = asset.InvalidError{AssetID: id} + return + } + + size := cfg.Size + if size == 0 { + size = r.defaultGetMaxSize + } + + builder := r.getAssetVersionSQL(). + Where(sq.Eq{"a.asset_id": id}). + OrderBy("version DESC"). + Limit(uint64(size)). + Offset(uint64(cfg.Offset)) + query, args, err := r.buildSQL(builder) + if err != nil { + err = fmt.Errorf("error building query: %w", err) + return + } + + var assetModels []AssetModel + err = r.client.db.SelectContext(ctx, &assetModels, query, args...) + if err != nil { + err = fmt.Errorf("failed fetching last versions: %w", err) + return + } + + if len(assetModels) == 0 { + err = asset.NotFoundError{AssetID: id} + return + } + + for _, am := range assetModels { + av, ferr := am.toAssetVersion() + if ferr != nil { + err = fmt.Errorf("failed converting asset model to asset version: %w", ferr) + return + } + avs = append(avs, av) + } + + return avs, nil +} + +// GetByVersion retrieves the specific asset version +func (r *AssetRepository) GetByVersion(ctx context.Context, id string, version string) (ast asset.Asset, err error) { + if !isValidUUID(id) { + err = asset.InvalidError{AssetID: id} + return + } + + latestAsset, err := r.GetByID(ctx, id) + if errors.Is(err, sql.ErrNoRows) { + err = asset.NotFoundError{AssetID: id} + return + } + + if err != nil { + return + } + + if latestAsset.Version == version { + ast = latestAsset + return + } + + var assetModel AssetModel + builder := r.getAssetVersionSQL(). + Where(sq.Eq{"a.asset_id": id, "a.version": version}) + query, args, err := r.buildSQL(builder) + if err != nil { + err = fmt.Errorf("error building query: %w", err) return } - ast.Owners = owners + + err = r.client.db.GetContext(ctx, &assetModel, query, args...) + if errors.Is(err, sql.ErrNoRows) { + err = asset.NotFoundError{AssetID: id} + return + } + + if err != nil { + err = fmt.Errorf("failed fetching asset version: %w", err) + return + } + + ast, err = assetModel.toVersionedAsset(latestAsset) return } @@ -104,26 +209,34 @@ func (r *AssetRepository) GetByID(ctx context.Context, id string) (ast asset.Ass // It updates if asset does exist. // Checking existance is done using "urn", "type", and "service" fields. func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (string, error) { - assetID, err := r.getID(ctx, ast) + fetchedAsset, err := r.getAssetByURN(ctx, ast.URN, ast.Type, ast.Service) if errors.As(err, new(asset.NotFoundError)) { err = nil } if err != nil { - return "", fmt.Errorf("error getting asset ID: %w", err) + return "", fmt.Errorf("error getting asset by URN: %w", err) } - if assetID == "" { - assetID, err = r.insert(ctx, ast) + + if fetchedAsset.ID == "" { + // insert flow + fetchedAsset.ID, err = r.insert(ctx, ast) if err != nil { - return assetID, fmt.Errorf("error inserting asset to DB: %w", err) + return fetchedAsset.ID, fmt.Errorf("error inserting asset to DB: %w", err) } } else { - err = r.update(ctx, assetID, ast) + // update flow + changelog, err := fetchedAsset.Diff(ast) + if err != nil { + return "", fmt.Errorf("error diffing two assets: %w", err) + } + + err = r.update(ctx, fetchedAsset.ID, ast, &fetchedAsset, changelog) if err != nil { return "", fmt.Errorf("error updating asset to DB: %w", err) } } - return assetID, nil + return fetchedAsset.ID, nil } // Delete removes asset using its ID @@ -166,23 +279,27 @@ func (r *AssetRepository) buildFilterQuery(builder sq.SelectBuilder, config asse func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id string, err error) { err = r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error { - err := tx.QueryRowxContext(ctx, - `INSERT INTO assets - (urn, type, service, name, description, data, labels, created_at, updated_at) - VALUES - ($1, $2, $3, $4, $5, $6, $7, $8, $9) - RETURNING id`, - ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.Labels, ast.CreatedAt, ast.UpdatedAt).Scan(&id) + query, args, err := sq.Insert("assets"). + Columns("urn", "type", "service", "name", "description", "data", "labels", "updated_by", "version"). + Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.Labels, ast.UpdatedBy.ID, asset.BaseVersion). + Suffix("RETURNING \"id\""). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return fmt.Errorf("error building insert query: %w", err) + } + + err = tx.QueryRowContext(ctx, query, args...).Scan(&id) if err != nil { return fmt.Errorf("error running insert query: %w", err) } - ast.Owners, err = r.createOrFetchOwnersID(ctx, tx, ast.Owners) + users, err := r.createOrFetchUserIDs(ctx, tx, ast.Owners) if err != nil { return fmt.Errorf("error creating and fetching owners: %w", err) } - err = r.insertOwners(ctx, tx, id, ast.Owners) + err = r.insertOwners(ctx, tx, id, users) if err != nil { return fmt.Errorf("error running insert owners query: %w", err) } @@ -193,14 +310,24 @@ func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id stri return } -func (r *AssetRepository) update(ctx context.Context, id string, ast *asset.Asset) error { +func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset *asset.Asset, oldAsset *asset.Asset, clog diff.Changelog) error { - if !isValidUUID(id) { - return asset.InvalidError{AssetID: id} + if !isValidUUID(assetID) { + return asset.InvalidError{AssetID: assetID} + } + + if len(clog) == 0 { + return nil } return r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error { - err := r.execContext(ctx, tx, + // update assets + newVersion, err := asset.IncreaseMinorVersion(oldAsset.Version) + if err != nil { + return err + } + + err = r.execContext(ctx, tx, `UPDATE assets SET urn = $1, type = $2, @@ -209,27 +336,31 @@ func (r *AssetRepository) update(ctx context.Context, id string, ast *asset.Asse description = $5, data = $6, labels = $7, - updated_at = $8 - WHERE id = $9; + updated_at = $8, + updated_by = $9, + version = $10 + WHERE id = $11; `, - ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.Labels, ast.UpdatedAt, id) + newAsset.URN, newAsset.Type, newAsset.Service, newAsset.Name, newAsset.Description, newAsset.Data, newAsset.Labels, time.Now(), newAsset.UpdatedBy.ID, newVersion, assetID) if err != nil { return fmt.Errorf("error running update asset query: %w", err) } - ast.Owners, err = r.createOrFetchOwnersID(ctx, tx, ast.Owners) - if err != nil { - return fmt.Errorf("error creating and fetching owners: %w", err) + // insert versions + if err = r.insertAssetVersion(ctx, tx, oldAsset, clog); err != nil { + return err } - currentOwners, err := r.getOwners(ctx, id) + + // managing owners + newAssetOwners, err := r.createOrFetchUserIDs(ctx, tx, newAsset.Owners) if err != nil { - return fmt.Errorf("error getting asset's current owners: %w", err) + return fmt.Errorf("error creating and fetching owners: %w", err) } - toInserts, toRemoves := r.compareOwners(currentOwners, ast.Owners) - if err := r.insertOwners(ctx, tx, id, toInserts); err != nil { + toInserts, toRemoves := r.compareOwners(oldAsset.Owners, newAssetOwners) + if err := r.insertOwners(ctx, tx, assetID, toInserts); err != nil { return fmt.Errorf("error inserting asset's new owners: %w", err) } - if err := r.removeOwners(ctx, tx, id, toRemoves); err != nil { + if err := r.removeOwners(ctx, tx, assetID, toRemoves); err != nil { return fmt.Errorf("error removing asset's old owners: %w", err) } @@ -237,7 +368,38 @@ func (r *AssetRepository) update(ctx context.Context, id string, ast *asset.Asse }) } -// getOwners retrieves asset's owners by its ID +func (r *AssetRepository) insertAssetVersion(ctx context.Context, execer sqlx.ExecerContext, oldAsset *asset.Asset, clog diff.Changelog) (err error) { + if oldAsset == nil { + err = asset.ErrNilAsset + return + } + + if clog == nil { + err = fmt.Errorf("changelog is nil when insert to asset version") + return + } + + jsonChangelog, err := json.Marshal(clog) + if err != nil { + return err + } + query, args, err := sq.Insert("assets_versions"). + Columns("asset_id", "urn", "type", "service", "name", "description", "data", "labels", "created_at", "updated_at", "updated_by", "version", "owners", "changelog"). + Values(oldAsset.ID, oldAsset.URN, oldAsset.Type, oldAsset.Service, oldAsset.Name, oldAsset.Description, oldAsset.Data, oldAsset.Labels, + oldAsset.CreatedAt, oldAsset.UpdatedAt, oldAsset.UpdatedBy.ID, oldAsset.Version, oldAsset.Owners, jsonChangelog). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return fmt.Errorf("error building insert query: %w", err) + } + + if err = r.execContext(ctx, execer, query, args...); err != nil { + return fmt.Errorf("error running insert asset version query: %w", err) + } + + return +} + func (r *AssetRepository) getOwners(ctx context.Context, assetID string) (owners []user.User, err error) { if !isValidUUID(assetID) { @@ -245,18 +407,18 @@ func (r *AssetRepository) getOwners(ctx context.Context, assetID string) (owners } query := ` - SELECT u.id,u.email,u.provider + SELECT + u.id as "id", + u.email as "email", + u.provider as "provider" FROM asset_owners ao JOIN users u on ao.user_id = u.id WHERE asset_id = $1` - ums := []UserModel{} - err = r.client.db.SelectContext(ctx, &ums, query, assetID) + + err = r.client.db.SelectContext(ctx, &owners, query, assetID) if err != nil { err = fmt.Errorf("error getting asset's owners: %w", err) } - for _, um := range ums { - owners = append(owners, *um.toUser()) - } return } @@ -315,14 +477,46 @@ func (r *AssetRepository) removeOwners(ctx context.Context, execer sqlx.ExecerCo return } -func (r *AssetRepository) createOrFetchOwnersID(ctx context.Context, tx *sqlx.Tx, users []user.User) (results []user.User, err error) { +func (r *AssetRepository) compareOwners(current, newOwners []user.User) (toInserts, toRemove []user.User) { + if len(current) == 0 && len(newOwners) == 0 { + return + } + + currMap := map[string]int{} + for _, curr := range current { + currMap[curr.ID] = 1 + } + + for _, n := range newOwners { + _, exists := currMap[n.ID] + if exists { + // if exists, it means that both new and current have it. + // we remove it from the map, + // so that what's left in the map is the that only exists in current + // and have to be removed + delete(currMap, n.ID) + } else { + toInserts = append(toInserts, user.User{ID: n.ID}) + } + } + + for id := range currMap { + toRemove = append(toRemove, user.User{ID: id}) + } + + return +} + +func (r *AssetRepository) createOrFetchUserIDs(ctx context.Context, tx *sqlx.Tx, users []user.User) (results []user.User, err error) { for _, u := range users { if u.ID != "" { + results = append(results, u) continue } var userID string userID, err = r.userRepo.GetID(ctx, u.Email) if errors.As(err, &user.NotFoundError{}) { + u.Provider = r.defaultUserProvider userID, err = r.userRepo.CreateWithTx(ctx, tx, &u) if err != nil { err = fmt.Errorf("error creating owner: %w", err) @@ -341,18 +535,39 @@ func (r *AssetRepository) createOrFetchOwnersID(ctx context.Context, tx *sqlx.Tx return } -func (r *AssetRepository) getID(ctx context.Context, ast *asset.Asset) (id string, err error) { - query := `SELECT id FROM assets WHERE urn = $1 AND type = $2 AND service = $3;` - err = r.client.db.GetContext(ctx, &id, query, ast.URN, ast.Type, ast.Service) - if errors.Is(err, sql.ErrNoRows) { - err = nil - } +func (r *AssetRepository) getAssetByURN(ctx context.Context, assetURN string, assetType asset.Type, assetService string) (ast asset.Asset, err error) { + builder := r.getAssetSQL(). + Where(sq.Eq{ + "a.urn": assetURN, + "a.type": assetType, + "a.service": assetService, + }) + query, args, err := r.buildSQL(builder) if err != nil { + err = fmt.Errorf("error building query: %w", err) + return + } + + var assetModel AssetModel + if err = r.client.db.GetContext(ctx, &assetModel, query, args...); err != nil { + if errors.Is(err, sql.ErrNoRows) { + err = asset.NotFoundError{} + return + } err = fmt.Errorf( - "error getting asset's ID with urn = \"%s\", type = \"%s\", service = \"%s\": %w", - ast.URN, ast.Type, ast.Service, err) + "error getting asset with urn = \"%s\", type = \"%s\", service = \"%s\": %w", + assetURN, assetType, assetService, err) + return + } + + owners, err := r.getOwners(ctx, assetModel.ID) + if err != nil { + err = fmt.Errorf("error getting asset's current owners: %w", err) + return } + ast = assetModel.toAsset(owners) + return } @@ -373,36 +588,6 @@ func (r *AssetRepository) execContext(ctx context.Context, execer sqlx.ExecerCon return nil } -func (r *AssetRepository) compareOwners(current, new []user.User) (toInserts, toRemove []user.User) { - if len(current) == 0 && len(new) == 0 { - return - } - - currMap := map[string]int{} - for _, curr := range current { - currMap[curr.ID] = 1 - } - - for _, n := range new { - _, exists := currMap[n.ID] - if exists { - // if exists, it means that both new and current have it. - // we remove it from the map, - // so that what's left in the map is the that only exists in current - // and have to be removed - delete(currMap, n.ID) - } else { - toInserts = append(toInserts, user.User{ID: n.ID}) - } - } - - for id := range currMap { - toRemove = append(toRemove, user.User{ID: id}) - } - - return -} - func (r *AssetRepository) buildSQL(builder sq.SelectBuilder) (query string, args []interface{}, err error) { query, args, err = builder.ToSql() if err != nil { @@ -418,18 +603,70 @@ func (r *AssetRepository) buildSQL(builder sq.SelectBuilder) (query string, args return } +func (r *AssetRepository) getAssetSQL() sq.SelectBuilder { + return sq.Select(` + a.id as "id", + a.urn as "urn", + a.type as "type", + a.name as "name", + a.service as"service", + a.description as "description", + a.data as "data", + a.labels as "labels", + a.version as "version", + a.created_at as "created_at", + a.updated_at as "updated_at", + u.id as "updated_by.id", + u.email as "updated_by.email", + u.provider as "updated_by.provider", + u.created_at as "updated_by.created_at", + u.updated_at as "updated_by.updated_at" + `). + From("assets a"). + LeftJoin("users u ON a.updated_by = u.id") +} + +func (r *AssetRepository) getAssetVersionSQL() sq.SelectBuilder { + return sq.Select(` + a.asset_id as "id", + a.urn as "urn", + a.type as "type", + a.name as "name", + a.service as"service", + a.description as "description", + a.data as "data", + a.labels as "labels", + a.version as "version", + a.created_at as "created_at", + a.updated_at as "updated_at", + a.changelog as "changelog", + a.owners as "owners", + u.id as "updated_by.id", + u.email as "updated_by.email", + u.provider as "updated_by.provider", + u.created_at as "updated_by.created_at", + u.updated_at as "updated_by.updated_at" + `). + From("assets_versions a"). + LeftJoin("users u ON a.updated_by = u.id") +} + // NewAssetRepository initializes user repository clients -func NewAssetRepository(c *Client, userRepo *UserRepository, defaultGetMaxSize int) (*AssetRepository, error) { +func NewAssetRepository(c *Client, userRepo *UserRepository, defaultGetMaxSize int, defaultUserProvider string) (*AssetRepository, error) { if c == nil { return nil, errors.New("postgres client is nil") } if defaultGetMaxSize == 0 { defaultGetMaxSize = DEFAULT_MAX_RESULT_SIZE } + if defaultUserProvider == "" { + defaultUserProvider = "unknown" + } return &AssetRepository{ - client: c, - defaultGetMaxSize: defaultGetMaxSize, - userRepo: userRepo, + client: c, + defaultGetMaxSize: defaultGetMaxSize, + defaultUserProvider: defaultUserProvider, + userRepo: userRepo, }, nil } diff --git a/store/postgres/asset_repository_test.go b/store/postgres/asset_repository_test.go index a7c4521c..bdbe048e 100644 --- a/store/postgres/asset_repository_test.go +++ b/store/postgres/asset_repository_test.go @@ -4,18 +4,19 @@ import ( "context" "fmt" "testing" + "time" + "github.com/google/uuid" "github.com/odpf/columbus/asset" "github.com/odpf/columbus/store/postgres" "github.com/odpf/columbus/user" "github.com/odpf/salt/log" "github.com/ory/dockertest/v3" + "github.com/r3labs/diff/v2" "github.com/stretchr/testify/suite" ) -const ( - defaultGetMaxSize = 7 -) +var defaultAssetUpdaterUserID = uuid.NewString() type AssetRepositoryTestSuite struct { suite.Suite @@ -24,6 +25,8 @@ type AssetRepositoryTestSuite struct { pool *dockertest.Pool resource *dockertest.Resource repository *postgres.AssetRepository + userRepo *postgres.UserRepository + users []user.User } func (r *AssetRepositoryTestSuite) SetupSuite() { @@ -36,14 +39,44 @@ func (r *AssetRepositoryTestSuite) SetupSuite() { } r.ctx = context.TODO() - userRepo, err := postgres.NewUserRepository(r.client) + r.userRepo, err = postgres.NewUserRepository(r.client) if err != nil { r.T().Fatal(err) } - r.repository, err = postgres.NewAssetRepository(r.client, userRepo, defaultGetMaxSize) + + r.repository, err = postgres.NewAssetRepository(r.client, r.userRepo, defaultGetMaxSize, defaultProviderName) if err != nil { r.T().Fatal(err) } + + r.users = r.createUsers(r.userRepo) +} + +func (r *AssetRepositoryTestSuite) createUsers(userRepo user.Repository) []user.User { + var err error + users := []user.User{} + + user1 := user.User{Email: "user-test-1@odpf.io", Provider: defaultProviderName} + user1.ID, err = userRepo.Create(r.ctx, &user1) + r.Require().NoError(err) + users = append(users, user1) + + user2 := user.User{Email: "user-test-2@odpf.io", Provider: defaultProviderName} + user2.ID, err = userRepo.Create(r.ctx, &user2) + r.Require().NoError(err) + users = append(users, user2) + + user3 := user.User{Email: "user-test-3@odpf.io", Provider: defaultProviderName} + user3.ID, err = userRepo.Create(r.ctx, &user3) + r.Require().NoError(err) + users = append(users, user3) + + user4 := user.User{Email: "user-test-4@odpf.io", Provider: defaultProviderName} + user4.ID, err = userRepo.Create(r.ctx, &user4) + r.Require().NoError(err) + users = append(users, user4) + + return users } func (r *AssetRepositoryTestSuite) TearDownSuite() { @@ -58,7 +91,7 @@ func (r *AssetRepositoryTestSuite) TearDownSuite() { } } -func (r *AssetRepositoryTestSuite) TestGet() { +func (r *AssetRepositoryTestSuite) TestGetAll() { // populate assets total := 12 assets := []asset.Asset{} @@ -74,9 +107,11 @@ func (r *AssetRepositoryTestSuite) TestGet() { } ast := asset.Asset{ - URN: fmt.Sprintf("urn-get-%d", i), - Type: typ, - Service: service, + URN: fmt.Sprintf("urn-get-%d", i), + Type: typ, + Service: service, + Version: asset.BaseVersion, + UpdatedBy: r.users[0], } id, err := r.repository.Upsert(r.ctx, &ast) r.Require().NoError(err) @@ -86,33 +121,40 @@ func (r *AssetRepositoryTestSuite) TestGet() { } r.Run("should return all assets limited by default size", func() { - results, err := r.repository.Get(r.ctx, asset.Config{}) + results, err := r.repository.GetAll(r.ctx, asset.Config{}) r.Require().NoError(err) r.Require().Len(results, defaultGetMaxSize) - r.Equal(assets[:defaultGetMaxSize], results) + for i := 0; i < defaultGetMaxSize; i++ { + r.assertAsset(&assets[i], &results[i]) + } }) r.Run("should override default size using GetConfig.Size", func() { size := 8 - results, err := r.repository.Get(r.ctx, asset.Config{ + results, err := r.repository.GetAll(r.ctx, asset.Config{ Size: size, }) r.Require().NoError(err) r.Require().Len(results, size) - r.Equal(assets[:size], results) + for i := 0; i < size; i++ { + r.assertAsset(&assets[i], &results[i]) + } }) r.Run("should fetch assets by offset defined in GetConfig.Offset", func() { offset := 2 - results, err := r.repository.Get(r.ctx, asset.Config{ + results, err := r.repository.GetAll(r.ctx, asset.Config{ Offset: offset, }) r.Require().NoError(err) - r.Equal(assets[offset:defaultGetMaxSize+offset], results) + for i := offset; i < defaultGetMaxSize+offset; i++ { + r.assertAsset(&assets[i], &results[i-offset]) + + } }) r.Run("should filter using type", func() { - results, err := r.repository.Get(r.ctx, asset.Config{ + results, err := r.repository.GetAll(r.ctx, asset.Config{ Type: asset.TypeDashboard, Size: total, }) @@ -124,7 +166,7 @@ func (r *AssetRepositoryTestSuite) TestGet() { }) r.Run("should filter using service", func() { - results, err := r.repository.Get(r.ctx, asset.Config{ + results, err := r.repository.GetAll(r.ctx, asset.Config{ Service: "postgres", Size: total, }) @@ -143,9 +185,10 @@ func (r *AssetRepositoryTestSuite) TestGetCount() { service := "service-getcount" for i := 0; i < total; i++ { ast := asset.Asset{ - URN: fmt.Sprintf("urn-getcount-%d", i), - Type: typ, - Service: service, + URN: fmt.Sprintf("urn-getcount-%d", i), + Type: typ, + Service: service, + UpdatedBy: r.users[0], } id, err := r.repository.Upsert(r.ctx, &ast) r.Require().NoError(err) @@ -178,14 +221,18 @@ func (r *AssetRepositoryTestSuite) TestGetByID() { r.Run("return correct asset from db", func() { asset1 := asset.Asset{ - URN: "urn-gbi-1", - Type: "table", - Service: "bigquery", + URN: "urn-gbi-1", + Type: "table", + Service: "bigquery", + Version: asset.BaseVersion, + UpdatedBy: r.users[1], } asset2 := asset.Asset{ - URN: "urn-gbi-2", - Type: "topic", - Service: "kafka", + URN: "urn-gbi-2", + Type: "topic", + Service: "kafka", + Version: asset.BaseVersion, + UpdatedBy: r.users[1], } var err error @@ -201,33 +248,21 @@ func (r *AssetRepositoryTestSuite) TestGetByID() { result, err := r.repository.GetByID(r.ctx, asset2.ID) r.NoError(err) - r.Equal(asset2, result) + asset2.UpdatedBy = r.users[1] + r.assertAsset(&asset2, &result) }) r.Run("return owners if any", func() { - // create users - user1 := user.User{Email: "johndoe@example.com", Provider: "shield"} - user2 := user.User{Email: "janedoe@example.com", Provider: "shield"} - userRepo, err := postgres.NewUserRepository(r.client) - r.Require().NoError(err) - user1.ID, err = userRepo.Create(r.ctx, &user1) - r.Require().NoError(err) - user2.ID, err = userRepo.Create(r.ctx, &user2) - r.Require().NoError(err) - // clean up - defer func() { - err = r.client.ExecQueries(r.ctx, []string{"DELETE from users"}) - r.Require().NoError(err) - }() ast := asset.Asset{ URN: "urn-gbi-3", Type: "table", Service: "bigquery", Owners: []user.User{ - user1, - user2, + r.users[1], + r.users[2], }, + UpdatedBy: r.users[1], } id, err := r.repository.Upsert(r.ctx, &ast) @@ -244,28 +279,217 @@ func (r *AssetRepositoryTestSuite) TestGetByID() { }) } -func (r *AssetRepositoryTestSuite) TestUpsert() { - // create users - user1 := user.User{Email: "johndoe@example.com", Provider: "shield"} - user2 := user.User{Email: "janedoe@example.com", Provider: "shield"} - userRepo, err := postgres.NewUserRepository(r.client) +func (r *AssetRepositoryTestSuite) TestVersions() { + assetURN := "urn-u-2-version" + // v0.1 + astVersioning := asset.Asset{ + URN: assetURN, + Type: "table", + Service: "bigquery", + UpdatedBy: r.users[1], + } + + id, err := r.repository.Upsert(r.ctx, &astVersioning) r.Require().NoError(err) - user1.ID, err = userRepo.Create(r.ctx, &user1) + r.Require().Equal(r.lengthOfString(id), 36) + astVersioning.ID = id + + // v0.2 + astVersioning.Description = "new description in v0.2" + id, err = r.repository.Upsert(r.ctx, &astVersioning) r.Require().NoError(err) - user2.ID, err = userRepo.Create(r.ctx, &user2) + r.Require().Equal(id, astVersioning.ID) + + // v0.3 + astVersioning.Owners = []user.User{ + { + Email: "user@odpf.io", + }, + { + Email: "meteor@odpf.io", + Provider: "meteor", + }, + } + id, err = r.repository.Upsert(r.ctx, &astVersioning) r.Require().NoError(err) - // clean up - defer func() { - err = r.client.ExecQueries(r.ctx, []string{"DELETE from users"}) - r.Require().NoError(err) - }() + r.Require().Equal(id, astVersioning.ID) - r.Run("on insert", func() { - r.Run("set ID to asset", func() { - ast := asset.Asset{ - URN: "urn-u-1", + // v0.4 + astVersioning.Data = map[string]interface{}{ + "data1": float64(12345), + } + id, err = r.repository.Upsert(r.ctx, &astVersioning) + r.Require().NoError(err) + r.Require().Equal(id, astVersioning.ID) + + // v0.5 + astVersioning.Labels = map[string]string{ + "key1": "value1", + } + + id, err = r.repository.Upsert(r.ctx, &astVersioning) + r.Require().NoError(err) + r.Require().Equal(id, astVersioning.ID) + + r.Run("should return 3 last versions of an assets if there are exist", func() { + + expectedAssetVersions := []asset.AssetVersion{ + { + ID: astVersioning.ID, + URN: assetURN, Type: "table", Service: "bigquery", + Version: "0.4", + Changelog: diff.Changelog{ + diff.Change{Type: "create", Path: []string{"labels", "key1"}, From: interface{}(nil), To: "value1"}, + }, + UpdatedBy: r.users[1], + }, + { + ID: astVersioning.ID, + URN: assetURN, + Type: "table", + Service: "bigquery", + Version: "0.3", + Changelog: diff.Changelog{ + diff.Change{Type: "create", Path: []string{"data", "data1"}, From: interface{}(nil), To: float64(12345)}, + }, + UpdatedBy: r.users[1], + }, + { + ID: astVersioning.ID, + URN: assetURN, + Type: "table", + Service: "bigquery", + Version: "0.2", + Changelog: diff.Changelog{ + diff.Change{Type: "create", Path: []string{"owners", "0", "email"}, From: interface{}(nil), To: "user@odpf.io"}, + diff.Change{Type: "create", Path: []string{"owners", "1", "email"}, From: interface{}(nil), To: "meteor@odpf.io"}, + }, + UpdatedBy: r.users[1], + }, + } + + assetVersions, err := r.repository.GetVersionHistory(r.ctx, asset.Config{Size: 3}, astVersioning.ID) + r.NoError(err) + // making updatedby user time empty to make ast comparable + for i := 0; i < len(assetVersions); i++ { + assetVersions[i].UpdatedBy.CreatedAt = time.Time{} + assetVersions[i].UpdatedBy.UpdatedAt = time.Time{} + assetVersions[i].CreatedAt = time.Time{} + assetVersions[i].UpdatedAt = time.Time{} + } + r.Equal(expectedAssetVersions, assetVersions) + }) + + r.Run("should return current version of an assets", func() { + expectedLatestVersion := asset.Asset{ + ID: astVersioning.ID, + URN: assetURN, + Type: "table", + Service: "bigquery", + Description: "new description in v0.2", + Data: map[string]interface{}{"data1": float64(12345)}, + Labels: map[string]string{"key1": "value1"}, + Version: "0.5", + UpdatedBy: r.users[1], + } + + ast, err := r.repository.GetByID(r.ctx, astVersioning.ID) + // hard to get the internally generated user id, we exclude the owners from the assertion + astOwners := ast.Owners + ast.Owners = nil + r.NoError(err) + // making updatedby user time empty to make ast comparable + ast.UpdatedBy.CreatedAt = time.Time{} + ast.UpdatedBy.UpdatedAt = time.Time{} + ast.CreatedAt = time.Time{} + ast.UpdatedAt = time.Time{} + r.Equal(expectedLatestVersion, ast) + + r.Len(astOwners, 2) + }) + + r.Run("should return current version of an assets with by version", func() { + expectedLatestVersion := asset.Asset{ + ID: astVersioning.ID, + URN: assetURN, + Type: "table", + Service: "bigquery", + Description: "new description in v0.2", + Data: map[string]interface{}{"data1": float64(12345)}, + Labels: map[string]string{"key1": "value1"}, + Version: "0.5", + UpdatedBy: r.users[1], + } + + ast, err := r.repository.GetByVersion(r.ctx, astVersioning.ID, "0.5") + // hard to get the internally generated user id, we exclude the owners from the assertion + astOwners := ast.Owners + ast.Owners = nil + r.NoError(err) + // making updatedby user time empty to make ast comparable + ast.UpdatedBy.CreatedAt = time.Time{} + ast.UpdatedBy.UpdatedAt = time.Time{} + ast.CreatedAt = time.Time{} + ast.UpdatedAt = time.Time{} + r.Equal(expectedLatestVersion, ast) + + r.Len(astOwners, 2) + }) + + r.Run("should return a specific version of an asset", func() { + selectedVersion := "0.3" + expectedAsset := asset.Asset{ + ID: astVersioning.ID, + URN: assetURN, + Type: "table", + Service: "bigquery", + Description: "new description in v0.2", + Version: "0.3", + Changelog: diff.Changelog{ + diff.Change{Type: "create", Path: []string{"data", "data1"}, From: interface{}(nil), To: float64(12345)}, + }, + UpdatedBy: r.users[1], + } + expectedOwners := []user.User{ + { + Email: "user@odpf.io", + Provider: defaultProviderName, + }, + { + Email: "meteor@odpf.io", + Provider: defaultProviderName, + }, + } + ast, err := r.repository.GetByVersion(r.ctx, astVersioning.ID, selectedVersion) + // hard to get the internally generated user id, we exclude the owners from the assertion + astOwners := ast.Owners + ast.Owners = nil + r.Assert().NoError(err) + // making updatedby user time empty to make ast comparable + ast.UpdatedBy.CreatedAt = time.Time{} + ast.UpdatedBy.UpdatedAt = time.Time{} + ast.CreatedAt = time.Time{} + ast.UpdatedAt = time.Time{} + r.Assert().Equal(expectedAsset, ast) + + for i := 0; i < len(astOwners); i++ { + astOwners[i].ID = "" + } + r.Assert().Equal(expectedOwners, astOwners) + }) +} + +func (r *AssetRepositoryTestSuite) TestUpsert() { + r.Run("on insert", func() { + r.Run("set ID to asset and version to base version", func() { + ast := asset.Asset{ + URN: "urn-u-1", + Type: "table", + Service: "bigquery", + Version: "0.1", + UpdatedBy: r.users[0], } id, err := r.repository.Upsert(r.ctx, &ast) r.NoError(err) @@ -276,7 +500,9 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { assetInDB, err := r.repository.GetByID(r.ctx, ast.ID) r.Require().NoError(err) - r.Equal(ast, assetInDB) + r.NotEqual(time.Time{}, assetInDB.CreatedAt) + r.NotEqual(time.Time{}, assetInDB.UpdatedAt) + r.assertAsset(&ast, &assetInDB) }) r.Run("should store owners if any", func() { @@ -285,9 +511,10 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { Type: "table", Service: "bigquery", Owners: []user.User{ - user1, - user2, + r.users[1], + r.users[2], }, + UpdatedBy: r.users[0], } id, err := r.repository.Upsert(r.ctx, &ast) @@ -310,16 +537,16 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { Type: "table", Service: "bigquery", Owners: []user.User{ - {Email: "newuser@example.com", Provider: "shield"}, + {Email: "newuser@example.com", Provider: defaultProviderName}, }, + UpdatedBy: r.users[0], } id, err := r.repository.Upsert(r.ctx, &ast) r.Require().NoError(err) r.Require().Equal(r.lengthOfString(id), 36) - ast.ID = id - actual, err := r.repository.GetByID(r.ctx, ast.ID) + actual, err := r.repository.GetByID(r.ctx, id) r.NoError(err) r.Len(actual.Owners, len(ast.Owners)) @@ -331,11 +558,12 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { }) r.Run("on update", func() { - r.Run("should not create but update existing asset if urn, type and service match", func() { + r.Run("should not create nor updating the asset if asset is identical", func() { ast := asset.Asset{ - URN: "urn-u-2", - Type: "table", - Service: "bigquery", + URN: "urn-u-2", + Type: "table", + Service: "bigquery", + UpdatedBy: r.users[0], } identicalAsset := ast identicalAsset.Name = "some-name" @@ -359,13 +587,14 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { Type: "table", Service: "bigquery", Owners: []user.User{ - user1, - user2, + r.users[1], + r.users[2], }, + UpdatedBy: r.users[0], } newAsset := ast newAsset.Owners = []user.User{ - user2, + r.users[2], } id, err := r.repository.Upsert(r.ctx, &ast) @@ -392,13 +621,14 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { Type: "table", Service: "bigquery", Owners: []user.User{ - user1, + r.users[1], }, + UpdatedBy: r.users[0], } newAsset := ast newAsset.Owners = []user.User{ - user1, - user2, + r.users[1], + r.users[2], } id, err := r.repository.Upsert(r.ctx, &ast) @@ -425,13 +655,14 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { Type: "table", Service: "bigquery", Owners: []user.User{ - user1, + r.users[1], }, + UpdatedBy: r.users[0], } newAsset := ast newAsset.Owners = []user.User{ - user1, - {Email: "newuser@example.com", Provider: "shield"}, + r.users[1], + {Email: "newuser@example.com", Provider: defaultProviderName}, } id, err := r.repository.Upsert(r.ctx, &ast) @@ -470,14 +701,17 @@ func (r *AssetRepositoryTestSuite) TestDelete() { r.Run("should delete correct asset", func() { asset1 := asset.Asset{ - URN: "urn-del-1", - Type: "table", - Service: "bigquery", + URN: "urn-del-1", + Type: "table", + Service: "bigquery", + UpdatedBy: user.User{ID: defaultAssetUpdaterUserID}, } asset2 := asset.Asset{ - URN: "urn-del-2", - Type: "topic", - Service: "kafka", + URN: "urn-del-2", + Type: "topic", + Service: "kafka", + Version: asset.BaseVersion, + UpdatedBy: user.User{ID: defaultAssetUpdaterUserID}, } var err error @@ -499,7 +733,7 @@ func (r *AssetRepositoryTestSuite) TestDelete() { asset2FromDB, err := r.repository.GetByID(r.ctx, asset2.ID) r.NoError(err) - r.Equal(asset2, asset2FromDB) + r.Equal(asset2.ID, asset2FromDB.ID) // cleanup err = r.repository.Delete(r.ctx, asset2.ID) @@ -507,6 +741,21 @@ func (r *AssetRepositoryTestSuite) TestDelete() { }) } +func (r *AssetRepositoryTestSuite) assertAsset(expectedAsset *asset.Asset, actualAsset *asset.Asset) bool { + // sanitize time to make the assets comparable + expectedAsset.CreatedAt = time.Time{} + expectedAsset.UpdatedAt = time.Time{} + expectedAsset.UpdatedBy.CreatedAt = time.Time{} + expectedAsset.UpdatedBy.UpdatedAt = time.Time{} + + actualAsset.CreatedAt = time.Time{} + actualAsset.UpdatedAt = time.Time{} + actualAsset.UpdatedBy.CreatedAt = time.Time{} + actualAsset.UpdatedBy.UpdatedAt = time.Time{} + + return r.Equal(expectedAsset, actualAsset) +} + func (r *AssetRepositoryTestSuite) lengthOfString(s string) int { return len([]rune(s)) } diff --git a/store/postgres/migrations/000003_create_assets_table.up.sql b/store/postgres/migrations/000003_create_assets_table.up.sql index 49728b0f..a1ab9ffa 100644 --- a/store/postgres/migrations/000003_create_assets_table.up.sql +++ b/store/postgres/migrations/000003_create_assets_table.up.sql @@ -7,6 +7,8 @@ CREATE TABLE assets ( description text, data jsonb, labels jsonb, + version text NOT NULL, + updated_by uuid NOT NULL, created_at timestamp DEFAULT NOW(), updated_at timestamp DEFAULT NOW() ); diff --git a/store/postgres/migrations/000004_create_stars_table.up.sql b/store/postgres/migrations/000004_create_stars_table.up.sql index 0019e6d4..473ae51e 100644 --- a/store/postgres/migrations/000004_create_stars_table.up.sql +++ b/store/postgres/migrations/000004_create_stars_table.up.sql @@ -1,6 +1,6 @@ CREATE TABLE stars ( id uuid DEFAULT gen_random_uuid() PRIMARY KEY, - user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + user_id uuid NOT NULL REFERENCES users(id) ON DELETE CASCADE, asset_id uuid NOT NULL REFERENCES assets(id) ON DELETE CASCADE, created_at timestamp DEFAULT NOW(), updated_at timestamp DEFAULT NOW() diff --git a/store/postgres/migrations/000005_create_assets_versions_table.down.sql b/store/postgres/migrations/000005_create_assets_versions_table.down.sql new file mode 100644 index 00000000..d9ae3a70 --- /dev/null +++ b/store/postgres/migrations/000005_create_assets_versions_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS assets_versions; \ No newline at end of file diff --git a/store/postgres/migrations/000005_create_assets_versions_table.up.sql b/store/postgres/migrations/000005_create_assets_versions_table.up.sql new file mode 100644 index 00000000..8859bbd0 --- /dev/null +++ b/store/postgres/migrations/000005_create_assets_versions_table.up.sql @@ -0,0 +1,20 @@ +CREATE TABLE assets_versions ( + id serial PRIMARY KEY, + asset_id uuid NOT NULL REFERENCES assets(id) ON DELETE CASCADE, + urn text NOT NULL, + type text NOT NULL, + service text NOT NULL, + name text NOT NULL, + description text, + data jsonb, + labels jsonb, + version text NOT NULL, + updated_by uuid NOT NULL, + created_at timestamp, + updated_at timestamp, + owners jsonb, + changelog jsonb NOT NULL +); + +CREATE UNIQUE INDEX assets_versions_idx_asset_id_version ON assets_versions(asset_id,version); +CREATE UNIQUE INDEX assets_versions_idx_urn_type_service_version ON assets_versions(urn,type,service,version); \ No newline at end of file diff --git a/store/postgres/postgres_test.go b/store/postgres/postgres_test.go index fbeab846..8b49baa7 100644 --- a/store/postgres/postgres_test.go +++ b/store/postgres/postgres_test.go @@ -16,7 +16,11 @@ import ( "github.com/ory/dockertest/v3/docker" ) -const logLevelDebug = "debug" +const ( + logLevelDebug = "debug" + defaultProviderName = "shield" + defaultGetMaxSize = 7 +) var ( pgConfig = postgres.Config{ @@ -145,8 +149,9 @@ func createUser(userRepo user.Repository, email string) (string, error) { return id, nil } -func createAsset(assetRepo asset.Repository, userID, assetURN, assetType string) (string, error) { - ast := getAsset(userID, assetURN, assetType) +func createAsset(assetRepo asset.Repository, updaterID string, ownerEmail, assetURN, assetType string) (string, error) { + ast := getAsset(ownerEmail, assetURN, assetType) + ast.UpdatedBy.ID = updaterID id, err := assetRepo.Upsert(context.Background(), ast) if err != nil { return "", err @@ -154,14 +159,14 @@ func createAsset(assetRepo asset.Repository, userID, assetURN, assetType string) return id, nil } -func getAsset(userID, assetURN, assetType string) *asset.Asset { +func getAsset(ownerEmail, assetURN, assetType string) *asset.Asset { return &asset.Asset{ URN: assetURN, Type: asset.Type(assetType), Service: "bigquery", Owners: []user.User{ { - ID: userID, + Email: ownerEmail, }, }, } diff --git a/store/postgres/star_model.go b/store/postgres/star_model.go deleted file mode 100644 index 6e16832a..00000000 --- a/store/postgres/star_model.go +++ /dev/null @@ -1,32 +0,0 @@ -package postgres - -import ( - "time" - - "github.com/odpf/columbus/star" -) - -type StarConfig struct { - Limit int - Offset int - SortKey string - SortDirectionKey string -} - -type StarModel struct { - ID string `db:"id"` - UserID string `db:"user_id"` - AssetType string `db:"asset_type"` - AssetURN string `db:"asset_urn"` - CreatedAt time.Time `db:"created_at"` - UpdatedAt time.Time `db:"updated_at"` -} - -func (s *StarModel) toStar(assetModel *AssetModel) *star.Star { - return &star.Star{ - ID: s.ID, - Asset: assetModel.toAsset(), - CreatedAt: s.CreatedAt, - UpdatedAt: s.UpdatedAt, - } -} diff --git a/store/postgres/star_model_test.go b/store/postgres/star_model_test.go deleted file mode 100644 index 6ef5328f..00000000 --- a/store/postgres/star_model_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package postgres - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestStarModel(t *testing.T) { - t.Run("successfully build star from star model", func(t *testing.T) { - assetModel := &AssetModel{ - URN: "asseturn", - Type: "assettype", - } - sm := &StarModel{ - ID: "id", - UserID: "userid", - AssetURN: "asseturn", - AssetType: "assettype", - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - } - - s := sm.toStar(assetModel) - - assert.Equal(t, s.ID, sm.ID) - assert.Equal(t, s.Asset.URN, sm.AssetURN) - assert.Equal(t, s.Asset.Type.String(), sm.AssetType) - assert.True(t, s.CreatedAt.Equal(sm.CreatedAt)) - assert.True(t, s.UpdatedAt.Equal(sm.UpdatedAt)) - }) -} diff --git a/store/postgres/star_repository.go b/store/postgres/star_repository.go index 061419a1..824b2242 100644 --- a/store/postgres/star_repository.go +++ b/store/postgres/star_repository.go @@ -11,6 +11,13 @@ import ( "github.com/odpf/columbus/user" ) +type StarClauses struct { + Limit int + Offset int + SortKey string + SortDirectionKey string +} + // StarRepository is a type that manages star operation to the primary database type StarRepository struct { client *Client @@ -66,7 +73,7 @@ func (r *StarRepository) GetStargazers(ctx context.Context, cfg star.Config, ass return nil, star.InvalidError{AssetID: assetID} } - starCfg := r.buildConfig(cfg) + starClausesValue := r.buildClausesValue(cfg) var userModels UserModels if err := r.client.db.SelectContext(ctx, &userModels, ` @@ -82,11 +89,9 @@ func (r *StarRepository) GetStargazers(ctx context.Context, cfg star.Config, ass users u ON s.user_id = u.id WHERE s.asset_id = $1 - LIMIT - $2 - OFFSET - $3 - `, assetID, starCfg.Limit, starCfg.Offset); err != nil { + LIMIT $2 + OFFSET $3 + `, assetID, starClausesValue.Limit, starClausesValue.Offset); err != nil { return nil, fmt.Errorf("failed fetching users of star: %w", err) } @@ -107,7 +112,7 @@ func (r *StarRepository) GetAllAssetsByUserID(ctx context.Context, cfg star.Conf return nil, star.InvalidError{UserID: userID} } - starCfg := r.buildConfig(cfg) + starClausesValue := r.buildClausesValue(cfg) var assetModels []AssetModel if err := r.client.db.SelectContext(ctx, &assetModels, fmt.Sprintf(` @@ -134,7 +139,7 @@ func (r *StarRepository) GetAllAssetsByUserID(ctx context.Context, cfg star.Conf $3 OFFSET $4 - `, starCfg.SortDirectionKey), userID, starCfg.SortKey, starCfg.Limit, starCfg.Offset); err != nil { + `, starClausesValue.SortDirectionKey), userID, starClausesValue.SortKey, starClausesValue.Limit, starClausesValue.Offset); err != nil { return nil, fmt.Errorf("failed fetching stars by user: %w", err) } @@ -144,7 +149,7 @@ func (r *StarRepository) GetAllAssetsByUserID(ctx context.Context, cfg star.Conf assets := []asset.Asset{} for _, am := range assetModels { - assets = append(assets, am.toAsset()) + assets = append(assets, am.toAsset(nil)) } return assets, nil } @@ -193,7 +198,7 @@ func (r *StarRepository) GetAssetByUserID(ctx context.Context, userID string, as return nil, fmt.Errorf("failed fetching star by user: %w", err) } - asset := asetModel.toAsset() + asset := asetModel.toAsset(nil) return &asset, nil } @@ -234,8 +239,8 @@ func (r *StarRepository) Delete(ctx context.Context, userID string, assetID stri return nil } -func (r *StarRepository) buildConfig(cfg star.Config) StarConfig { - sCfg := StarConfig{ +func (r *StarRepository) buildClausesValue(cfg star.Config) StarClauses { + sCfg := StarClauses{ Offset: 0, Limit: DEFAULT_MAX_RESULT_SIZE, SortKey: columnNameCreatedAt, diff --git a/store/postgres/star_repository_test.go b/store/postgres/star_repository_test.go index 4127c428..86fd9072 100644 --- a/store/postgres/star_repository_test.go +++ b/store/postgres/star_repository_test.go @@ -42,7 +42,7 @@ func (r *StarRepositoryTestSuite) SetupSuite() { if err != nil { r.T().Fatal(err) } - r.assetRepository, err = postgres.NewAssetRepository(r.client, r.userRepository, postgres.DEFAULT_MAX_RESULT_SIZE) + r.assetRepository, err = postgres.NewAssetRepository(r.client, r.userRepository, postgres.DEFAULT_MAX_RESULT_SIZE, "") if err != nil { r.T().Fatal(err) } @@ -61,6 +61,8 @@ func (r *StarRepositoryTestSuite) TearDownSuite() { } func (r *StarRepositoryTestSuite) TestCreate() { + ownerEmail := "test-create@odpf.io" + r.Run("return no error if succesfully create star", func() { err := setup(r.ctx, r.client) r.NoError(err) @@ -68,7 +70,7 @@ func (r *StarRepositoryTestSuite) TestCreate() { userID, err := createUser(r.userRepository, "user@odpf.io") r.NoError(err) - assetID, err := createAsset(r.assetRepository, userID, "asset-urn-1", "table") + assetID, err := createAsset(r.assetRepository, userID, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID, assetID) @@ -108,7 +110,7 @@ func (r *StarRepositoryTestSuite) TestCreate() { userID, err := createUser(r.userRepository, "user@odpf.io") r.NoError(err) - assetID, err := createAsset(r.assetRepository, userID, "asset-urn-1", "table") + assetID, err := createAsset(r.assetRepository, userID, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID, assetID) @@ -125,7 +127,7 @@ func (r *StarRepositoryTestSuite) TestCreate() { r.NoError(err) uid := uuid.NewString() - assetID, err := createAsset(r.assetRepository, uid, "asset-urn-1", "table") + assetID, err := createAsset(r.assetRepository, uid, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, uid, assetID) @@ -135,6 +137,8 @@ func (r *StarRepositoryTestSuite) TestCreate() { } func (r *StarRepositoryTestSuite) TestGetStargazers() { + ownerEmail := "test-getstargazers@odpf.io" + defaultCfg := star.Config{} r.Run("return ErrEmptyAssetID if asset id is empty", func() { users, err := r.repository.GetStargazers(r.ctx, defaultCfg, "") @@ -161,7 +165,7 @@ func (r *StarRepositoryTestSuite) TestGetStargazers() { userID1, err := createUser(r.userRepository, "user@odpf.io") r.NoError(err) - assetID1, err := createAsset(r.assetRepository, userID1, "asset-urn-1", "table") + assetID1, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID1, assetID1) @@ -198,8 +202,7 @@ func (r *StarRepositoryTestSuite) TestGetStargazers() { userEmail := fmt.Sprintf("user%d@odpf.io", i) userID, err := createUser(r.userRepository, userEmail) r.NoError(err) - - assetID, err = createAsset(r.assetRepository, userID, "asset-urn-1", "table") + assetID, err = createAsset(r.assetRepository, userID, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID, assetID) @@ -216,6 +219,8 @@ func (r *StarRepositoryTestSuite) TestGetStargazers() { } func (r *StarRepositoryTestSuite) TestGetAllAssetsByUserID() { + ownerEmail := "test-getallbyuserid@odpf.io" + defaultCfg := star.Config{} r.Run("return invalid error if user id is empty", func() { assets, err := r.repository.GetAllAssetsByUserID(r.ctx, defaultCfg, "") @@ -243,19 +248,19 @@ func (r *StarRepositoryTestSuite) TestGetAllAssetsByUserID() { userID1, err := createUser(r.userRepository, "user@odpf.io") r.NoError(err) - assetID1, err := createAsset(r.assetRepository, userID1, "asset-urn-1", "table") + assetID1, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID1, assetID1) r.NoError(err) r.NotEmpty(id) - assetID2, err := createAsset(r.assetRepository, userID1, "asset-urn-2", "table") + assetID2, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-2", "table") r.NoError(err) id, err = r.repository.Create(r.ctx, userID1, assetID2) r.NoError(err) r.NotEmpty(id) - assetID3, err := createAsset(r.assetRepository, userID1, "asset-urn-3", "table") + assetID3, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-3", "table") r.NoError(err) id, err = r.repository.Create(r.ctx, userID1, assetID3) r.NoError(err) @@ -284,7 +289,7 @@ func (r *StarRepositoryTestSuite) TestGetAllAssetsByUserID() { for i := 1; i < 20; i++ { starURN := fmt.Sprintf("asset-urn-%d", i) - assetID, err := createAsset(r.assetRepository, userID, starURN, "table") + assetID, err := createAsset(r.assetRepository, userID, ownerEmail, starURN, "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID, assetID) r.NoError(err) @@ -301,6 +306,8 @@ func (r *StarRepositoryTestSuite) TestGetAllAssetsByUserID() { } func (r *StarRepositoryTestSuite) TestGetAssetByUserID() { + ownerEmail := "test-getbyuserid@odpf.io" + r.Run("return ErrEmptyUserID if user id is empty", func() { ast, err := r.repository.GetAssetByUserID(r.ctx, "", "") r.ErrorIs(err, star.ErrEmptyUserID) @@ -341,19 +348,19 @@ func (r *StarRepositoryTestSuite) TestGetAssetByUserID() { userID1, err := createUser(r.userRepository, "user@odpf.io") r.NoError(err) - assetID1, err := createAsset(r.assetRepository, userID1, "asset-urn-1", "table") + assetID1, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID1, assetID1) r.NoError(err) r.NotEmpty(id) - assetID2, err := createAsset(r.assetRepository, userID1, "asset-urn-2", "table") + assetID2, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-2", "table") r.NoError(err) id, err = r.repository.Create(r.ctx, userID1, assetID2) r.NoError(err) r.NotEmpty(id) - assetID3, err := createAsset(r.assetRepository, userID1, "asset-urn-3", "table") + assetID3, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-3", "table") r.NoError(err) id, err = r.repository.Create(r.ctx, userID1, assetID3) r.NoError(err) @@ -367,6 +374,8 @@ func (r *StarRepositoryTestSuite) TestGetAssetByUserID() { } func (r *StarRepositoryTestSuite) TestDelete() { + ownerEmail := "test-delete@odpf.io" + r.Run("return invalid error if user id is empty", func() { err := r.repository.Delete(r.ctx, "", "") r.ErrorIs(err, star.ErrEmptyUserID) @@ -402,19 +411,19 @@ func (r *StarRepositoryTestSuite) TestDelete() { userID1, err := createUser(r.userRepository, "user@odpf.io") r.NoError(err) - assetID1, err := createAsset(r.assetRepository, userID1, "asset-urn-1", "table") + assetID1, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-1", "table") r.NoError(err) id, err := r.repository.Create(r.ctx, userID1, assetID1) r.NoError(err) r.NotEmpty(id) - assetID2, err := createAsset(r.assetRepository, userID1, "asset-urn-2", "table") + assetID2, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-2", "table") r.NoError(err) id, err = r.repository.Create(r.ctx, userID1, assetID2) r.NoError(err) r.NotEmpty(id) - assetID3, err := createAsset(r.assetRepository, userID1, "asset-urn-3", "table") + assetID3, err := createAsset(r.assetRepository, userID1, ownerEmail, "asset-urn-3", "table") r.NoError(err) id, err = r.repository.Create(r.ctx, userID1, assetID3) r.NoError(err) diff --git a/store/postgres/user_model.go b/store/postgres/user_model.go index 0e0ca4a6..f4f1892b 100644 --- a/store/postgres/user_model.go +++ b/store/postgres/user_model.go @@ -1,36 +1,36 @@ package postgres import ( - "time" + "database/sql" "github.com/odpf/columbus/user" ) type UserModel struct { - ID string `db:"id"` - Email string `db:"email"` - Provider string `db:"provider"` - CreatedAt time.Time `db:"created_at"` - UpdatedAt time.Time `db:"updated_at"` + ID sql.NullString `db:"id"` + Email sql.NullString `db:"email"` + Provider sql.NullString `db:"provider"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } -func (u *UserModel) toUser() *user.User { - return &user.User{ - ID: u.ID, - Email: u.Email, - Provider: u.Provider, - CreatedAt: u.CreatedAt, - UpdatedAt: u.UpdatedAt, +func (u *UserModel) toUser() user.User { + return user.User{ + ID: u.ID.String, + Email: u.Email.String, + Provider: u.Provider.String, + CreatedAt: u.CreatedAt.Time, + UpdatedAt: u.UpdatedAt.Time, } } -func newUserModel(u *user.User) *UserModel { - return &UserModel{ - ID: u.ID, - Email: u.Email, - Provider: u.Provider, - CreatedAt: u.CreatedAt, - UpdatedAt: u.UpdatedAt, +func newUserModel(u *user.User) UserModel { + return UserModel{ + ID: sql.NullString{String: u.ID, Valid: true}, + Email: sql.NullString{String: u.Email, Valid: true}, + Provider: sql.NullString{String: u.Provider, Valid: true}, + CreatedAt: sql.NullTime{Time: u.CreatedAt, Valid: true}, + UpdatedAt: sql.NullTime{Time: u.UpdatedAt, Valid: true}, } } @@ -39,13 +39,7 @@ type UserModels []UserModel func (us UserModels) toUsers() []user.User { users := []user.User{} for _, u := range us { - users = append(users, user.User{ - ID: u.ID, - Email: u.Email, - Provider: u.Provider, - CreatedAt: u.CreatedAt, - UpdatedAt: u.UpdatedAt, - }) + users = append(users, u.toUser()) } return users } diff --git a/store/postgres/user_model_test.go b/store/postgres/user_model_test.go index 8e660fcc..d1b1c7f8 100644 --- a/store/postgres/user_model_test.go +++ b/store/postgres/user_model_test.go @@ -1,6 +1,7 @@ package postgres import ( + "database/sql" "testing" "time" @@ -15,20 +16,20 @@ func TestUserModel(t *testing.T) { id := uuid.New() timestamp := time.Now().UTC() um := UserModel{ - ID: id.String(), - Email: "user@odpf.io", - Provider: "columbus", - CreatedAt: timestamp, - UpdatedAt: timestamp, + ID: sql.NullString{String: id.String(), Valid: true}, + Email: sql.NullString{String: "user@odpf.io", Valid: true}, + Provider: sql.NullString{String: "columbus", Valid: true}, + CreatedAt: sql.NullTime{Time: timestamp, Valid: true}, + UpdatedAt: sql.NullTime{Time: timestamp, Valid: true}, } ud := um.toUser() - assert.Equal(t, um.ID, ud.ID) - assert.Equal(t, um.Email, ud.Email) - assert.Equal(t, um.Provider, ud.Provider) - assert.True(t, um.CreatedAt.Equal(ud.CreatedAt)) - assert.True(t, um.UpdatedAt.Equal(ud.UpdatedAt)) + assert.Equal(t, um.ID.String, ud.ID) + assert.Equal(t, um.Email.String, ud.Email) + assert.Equal(t, um.Provider.String, ud.Provider) + assert.True(t, um.CreatedAt.Time.Equal(ud.CreatedAt)) + assert.True(t, um.UpdatedAt.Time.Equal(ud.UpdatedAt)) }) t.Run("should properly create user model from user", func(t *testing.T) { @@ -45,10 +46,10 @@ func TestUserModel(t *testing.T) { um := newUserModel(ud) - assert.Equal(t, um.ID, ud.ID) - assert.Equal(t, um.Email, ud.Email) - assert.Equal(t, um.Provider, ud.Provider) - assert.True(t, um.CreatedAt.Equal(ud.CreatedAt)) - assert.True(t, um.UpdatedAt.Equal(ud.UpdatedAt)) + assert.Equal(t, um.ID.String, ud.ID) + assert.Equal(t, um.Email.String, ud.Email) + assert.Equal(t, um.Provider.String, ud.Provider) + assert.True(t, um.CreatedAt.Time.Equal(ud.CreatedAt)) + assert.True(t, um.UpdatedAt.Time.Equal(ud.UpdatedAt)) }) } diff --git a/store/postgres/user_repository.go b/store/postgres/user_repository.go index 5414c3c6..86dc6e92 100644 --- a/store/postgres/user_repository.go +++ b/store/postgres/user_repository.go @@ -33,10 +33,10 @@ func (r *UserRepository) create(ctx context.Context, querier sqlx.QueryerContext } if err := r.client.db.QueryRowxContext(ctx, ` - INSERT INTO - users + INSERT INTO + users (email, provider) - VALUES + VALUES ($1, $2) RETURNING id `, ud.Email, ud.Provider).Scan(&userID); err != nil { @@ -56,7 +56,7 @@ func (r *UserRepository) create(ctx context.Context, querier sqlx.QueryerContext func (r *UserRepository) GetID(ctx context.Context, email string) (string, error) { var userID string if err := r.client.db.GetContext(ctx, &userID, ` - SELECT + SELECT id FROM users diff --git a/swagger.yaml b/swagger.yaml index 5dcf8014..aaf20852 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -2,7 +2,7 @@ swagger: "2.0" info: title: "Data Discovery and Lineage Service" description: "Data Discovery and Lineage Service" - version: 0.1.0 + version: 0.1.9 paths: "/v1beta1/assets": get: @@ -15,16 +15,14 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: query name: type description: "filter by type" - schema: - $ref: "#/definitions/AssetType" + type: string + format: asset_type - in: query name: service description: "filter by service" @@ -66,6 +64,11 @@ paths: produces: - "application/json" parameters: + - in: header + name: Columbus-User-Email + type: string + format: email + required: true - in: body name: "" required: true @@ -99,8 +102,9 @@ paths: - in: path name: id description: "asset's ID" - schema: - $ref: "#/definitions/UUID" + type: string + format: uuid + required: true responses: 200: description: OK @@ -125,8 +129,9 @@ paths: - in: path name: id description: "asset's ID" - schema: - $ref: "#/definitions/UUID" + type: string + format: uuid + required: true responses: 204: description: OK @@ -153,8 +158,9 @@ paths: - in: path name: id description: "asset's ID" - schema: - $ref: "#/definitions/UUID" + type: string + format: uuid + required: true responses: 200: description: OK @@ -174,6 +180,78 @@ paths: description: "internal server error" schema: $ref: "#/definitions/InternalServerError" + "/v1beta1/assets/{id}/versions": + get: + tags: + - Assets + summary: Get version history of an asset + description: "Returns a list of asset version history" + produces: + - "application/json" + parameters: + - in: path + name: id + description: "asset's ID" + type: string + format: uuid + required: true + responses: + 200: + description: OK + schema: + type: array + items: + $ref: "#/definitions/AssetVersion" + 400: + description: "bad request error" + schema: + $ref: "#/definitions/Error" + 404: + description: "not found error" + schema: + $ref: "#/definitions/Error" + 500: + description: "internal server error" + schema: + $ref: "#/definitions/InternalServerError" + "/v1beta1/assets/{id}/versions/{version}": + get: + tags: + - Assets + summary: Get asset's prev version + description: "Returns a specific version of an asset" + produces: + - "application/json" + parameters: + - in: path + name: id + description: "asset's ID" + type: string + format: uuid + required: true + - in: path + name: version + description: "asset's version" + type: string + format: semver + required: true + responses: + 200: + description: OK + schema: + $ref: "#/definitions/Asset" + 400: + description: "bad request error" + schema: + $ref: "#/definitions/Error" + 404: + description: "not found error" + schema: + $ref: "#/definitions/Error" + 500: + description: "internal server error" + schema: + $ref: "#/definitions/InternalServerError" "/v1beta1/lineage/{type}/{record}": get: tags: @@ -185,14 +263,23 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: query name: collapse type: boolean + - in: path + name: type + description: "asset's type" + type: string + format: asset_type + required: true + - in: path + name: record + description: "record's id" + type: string + required: true responses: 200: description: OK @@ -213,10 +300,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true responses: 200: @@ -242,10 +327,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: name @@ -275,10 +358,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: name @@ -317,10 +398,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: name @@ -348,10 +427,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: name @@ -383,10 +460,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: query name: "text" @@ -443,10 +518,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: body name: "" @@ -487,10 +560,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true responses: 200: @@ -513,10 +584,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: body name: "" @@ -556,10 +625,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: template_urn @@ -588,10 +655,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: template_urn @@ -633,10 +698,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: template_urn @@ -667,10 +730,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: type @@ -702,10 +763,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: type @@ -742,10 +801,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: type @@ -794,10 +851,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: type @@ -838,10 +893,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true responses: 200: @@ -874,10 +927,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: asset_id @@ -914,10 +965,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: asset_id @@ -952,10 +1001,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: asset_id @@ -965,7 +1012,6 @@ paths: 200: description: OK schema: - type: object $ref: "#/definitions/Star" 400: description: validation error @@ -991,10 +1037,8 @@ paths: parameters: - in: header name: Columbus-User-Email - schema: - type: string - format: email - example: user@odpf.io + type: string + format: email required: true - in: path name: user_id @@ -1060,6 +1104,75 @@ definitions: - job - topic - dashboard + AssetVersion: + type: object + properties: + id: + type: object + $ref: "#/definitions/UUID" + urn: + type: string + example: "sample-urn" + type: + type: object + $ref: "#/definitions/AssetType" + service: + type: string + example: "bigquery" + version: + type: string + example: "0.1" + Changelog: + type: object + $ref: "#/definitions/Changelog" + updated_by: + type: string + example: "user@odpf.io" + created_at: + type: string + updated_at: + type: string + Changelog: + type: array + items: + type: object + properties: + type: + type: string + path: + type: array + items: + type: string + from: + type: string + to: + type: string + User: + type: object + properties: + id: + type: string + description: id of a user + format: uuid + email: + type: string + description: email of a user + format: email + provider: + type: string + description: provider of user identity + Star: + type: object + properties: + asset_type: + type: string + description: name of the type (for e.g. dagger, firehose) + asset_urn: + type: string + created_at: + type: string + updated_at: + type: string Record: type: object properties: @@ -1275,15 +1388,3 @@ definitions: details: type: object description: error details. the keys are integer indices for the records that failed validation, and the value is a string describing the reason why that record fails validation - Star: - type: object - properties: - asset_type: - type: string - description: name of the type (for e.g. dagger, firehose) - asset_urn: - type: string - created_at: - type: string - updated_at: - type: string diff --git a/user/service.go b/user/service.go index 236360d7..bd497c66 100644 --- a/user/service.go +++ b/user/service.go @@ -8,15 +8,15 @@ import ( type contextKeyType struct{} var ( - // userIDContextKey is the key used for user.FromContext and + // userContextKey is the key used for user.FromContext and // user.NewContext. - userIDContextKey = contextKeyType(struct{}{}) + userContextKey = contextKeyType(struct{}{}) ) // Service is a type of service that manages business process type Service struct { repository Repository - config Config + cfg Config } // ValidateUser checks if user information is already in DB @@ -35,7 +35,7 @@ func (s *Service) ValidateUser(ctx context.Context, email string) (string, error } user := &User{ Email: email, - Provider: s.config.IdentityProviderDefaultName, + Provider: s.cfg.IdentityProviderDefaultName, } if userID, err = s.repository.Create(ctx, user); err != nil { return "", err @@ -46,7 +46,7 @@ func (s *Service) ValidateUser(ctx context.Context, email string) (string, error // NewContext returns a new context.Context that carries the provided // user ID. func NewContext(ctx context.Context, userID string) context.Context { - return context.WithValue(ctx, userIDContextKey, userID) + return context.WithValue(ctx, userContextKey, userID) } // FromContext returns the user ID from the context if present, and empty @@ -55,7 +55,7 @@ func FromContext(ctx context.Context) string { if ctx == nil { return "" } - h, _ := ctx.Value(userIDContextKey).(string) + h, _ := ctx.Value(userContextKey).(string) if h != "" { return h } @@ -63,9 +63,9 @@ func FromContext(ctx context.Context) string { } // NewService initializes user service -func NewService(cfg Config, repository Repository) *Service { +func NewService(repository Repository, cfg Config) *Service { return &Service{ - config: cfg, repository: repository, + cfg: cfg, } } diff --git a/user/service_test.go b/user/service_test.go index 45322d9c..9b44b784 100644 --- a/user/service_test.go +++ b/user/service_test.go @@ -12,13 +12,12 @@ import ( "github.com/stretchr/testify/mock" ) +var userCfg = user.Config{IdentityProviderDefaultName: "shield"} + func TestValidateWithHeader(t *testing.T) { ctx := context.TODO() - userCfg := user.Config{ - IdentityProviderDefaultName: "shield", - } t.Run("should return no user error when param is empty", func(t *testing.T) { - userSvc := user.NewService(userCfg, nil) + userSvc := user.NewService(nil, userCfg) id, err := userSvc.ValidateUser(ctx, "") @@ -30,7 +29,7 @@ func TestValidateWithHeader(t *testing.T) { mockUserRepository := &mocks.UserRepository{} mockUserRepository.On("GetID", mock.Anything, mock.Anything).Return("", nil) - userSvc := user.NewService(userCfg, mockUserRepository) + userSvc := user.NewService(mockUserRepository, userCfg) id, err := userSvc.ValidateUser(ctx, "an-email") @@ -43,7 +42,7 @@ func TestValidateWithHeader(t *testing.T) { mockUserRepository := &mocks.UserRepository{} mockUserRepository.On("GetID", mock.Anything, mock.Anything).Return(userID, nil) - userSvc := user.NewService(userCfg, mockUserRepository) + userSvc := user.NewService(mockUserRepository, userCfg) id, err := userSvc.ValidateUser(ctx, "an-email") @@ -57,7 +56,7 @@ func TestValidateWithHeader(t *testing.T) { mockUserRepository.On("GetID", mock.Anything, mock.Anything).Return("", nil) mockUserRepository.On("Create", mock.Anything, mock.Anything).Return(userID, nil) - userSvc := user.NewService(userCfg, mockUserRepository) + userSvc := user.NewService(mockUserRepository, userCfg) id, err := userSvc.ValidateUser(ctx, "an-email") @@ -71,7 +70,7 @@ func TestValidateWithHeader(t *testing.T) { mockUserRepository.On("GetID", mock.Anything, mock.Anything).Return("", mockErr) mockUserRepository.On("Create", mock.Anything, mock.Anything).Return("", mockErr) - userSvc := user.NewService(userCfg, mockUserRepository) + userSvc := user.NewService(mockUserRepository, userCfg) id, err := userSvc.ValidateUser(ctx, "an-email") diff --git a/user/user.go b/user/user.go index a49482d5..054d278e 100644 --- a/user/user.go +++ b/user/user.go @@ -8,11 +8,11 @@ import ( // User is a basic entity of a user type User struct { - ID string `json:"id"` - Email string `json:"email"` - Provider string `json:"provider"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id,omitempty" diff:"-"` + Email string `json:"email" diff:"email"` + Provider string `json:"provider" diff:"-"` + CreatedAt time.Time `json:"-" diff:"-"` + UpdatedAt time.Time `json:"-" diff:"-"` } // Validate validates a user is valid or not diff --git a/user/user_test.go b/user/user_test.go index 5f6f912c..43684430 100644 --- a/user/user_test.go +++ b/user/user_test.go @@ -24,11 +24,6 @@ func TestValidate(t *testing.T) { User: &User{Provider: "provider"}, ExpectError: InvalidError{Provider: "provider"}, }, - { - Title: "should return error invalid if provider is empty", - User: &User{Email: "email"}, - ExpectError: InvalidError{Email: "email"}, - }, { Title: "should return nil if user is valid", User: &User{Email: "email", Provider: "provider"},