Skip to content

Commit

Permalink
feat: use etag to update data (#38)
Browse files Browse the repository at this point in the history
* feat: use etag to update data

* chore: add some logs message

* Update internal/database/sync.go

Co-authored-by: KaiserBh <41852205+kaiserbh@users.noreply.github.com>

---------

Co-authored-by: KaiserBh <41852205+kaiserbh@users.noreply.github.com>
  • Loading branch information
Cologler and kaiserbh committed Apr 10, 2024
1 parent c6e052c commit 310f07f
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 15 deletions.
29 changes: 29 additions & 0 deletions internal/database/postgres_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ CONSTRAINT sync_lock_pkey PRIMARY KEY (id)
ALTER TABLE manga_data ADD FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE manga_sync ADD FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE sync_lock ADD FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE ON UPDATE CASCADE;
CREATE TABLE sync_data
(
id INTEGER PRIMARY KEY,
user_api_key TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data BLOB NOT NULL,
data_etag TEXT NOT NULL,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
)
`

var postgresMigrations = []string{
Expand Down Expand Up @@ -143,4 +157,19 @@ var postgresMigrations = []string{
`,
`ALTER TABLE manga_sync ADD COLUMN "device_id" TEXT NOT NULL DEFAULT '';
`,
`
CREATE TABLE sync_data
(
id INTEGER PRIMARY KEY,
user_api_key TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data BLOB NOT NULL,
data_etag TEXT NOT NULL,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
)
`,
}
29 changes: 29 additions & 0 deletions internal/database/sqlite_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ CREATE TABLE sync_lock
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
);
CREATE TABLE sync_data
(
id INTEGER PRIMARY KEY,
user_api_key TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data BLOB NOT NULL,
data_etag TEXT NOT NULL,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
)
`

var sqliteMigrations = []string{
Expand Down Expand Up @@ -171,4 +185,19 @@ var sqliteMigrations = []string{
`ALTER TABLE manga_sync
ADD COLUMN device_id TEXT NOT NULL DEFAULT '';
`,
`
CREATE TABLE sync_data
(
id INTEGER PRIMARY KEY,
user_api_key TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data BLOB NOT NULL,
data_etag TEXT NOT NULL,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
)
`,
}
135 changes: 134 additions & 1 deletion internal/database/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package database
import (
"context"
"database/sql"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/SyncYomi/SyncYomi/internal/domain"
"github.com/SyncYomi/SyncYomi/internal/logger"
"github.com/SyncYomi/SyncYomi/pkg/errors"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/rs/zerolog"
"time"
)

func NewSyncRepo(log logger.Logger, db *DB) domain.SyncRepo {
Expand Down Expand Up @@ -368,3 +370,134 @@ func (r SyncRepo) DeleteSyncLockFile(ctx context.Context, apiKey string) bool {

return true
}

// Get etag of sync data.
// For avoid memory usage, only the etag will be returned.
func (r SyncRepo) GetSyncDataETag(ctx context.Context, apiKey string) (*string, error) {
var etag string

err := r.db.squirrel.
Select("data_etag").
From("sync_data").
Where(sq.Eq{"user_api_key": apiKey}).
Limit(1).
RunWith(r.db.handler).
Scan(&etag)

if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, errors.Wrap(err, "error executing query")
}

return &etag, nil
}

// Get sync data and etag
func (r SyncRepo) GetSyncDataAndETag(ctx context.Context, apiKey string) ([]byte, *string, error) {
var etag string
var data []byte

err := r.db.squirrel.
Select("data", "data_etag").
From("sync_data").
Where(sq.Eq{"user_api_key": apiKey}).
Limit(1).
RunWith(r.db.handler).
Scan(&data, &etag)

if err != nil {
if err == sql.ErrNoRows {
return nil, nil, nil
}
return nil, nil, errors.Wrap(err, "error executing query")
}

return data, &etag, nil
}

// Create or replace sync data, returns the new etag.
func (r SyncRepo) SetSyncData(ctx context.Context, apiKey string, data []byte) (*string, error) {
now := time.Now()
// the better way is use hash like sha1
// but uuid is faster than sha1
newEtag := "uuid=" + uuid.NewString()

updateResult, err := r.db.squirrel.
Update("sync_data").
Set("updated_at", now).
Set("data", data).
Set("data_etag", newEtag).
Where(sq.Eq{"user_api_key": apiKey}).
RunWith(r.db.handler).ExecContext(ctx)

if err != nil {
return nil, errors.Wrap(err, "error executing query")
}

if rowsAffected, err := updateResult.RowsAffected(); err != nil {
return nil, errors.Wrap(err, "error executing query")
} else if rowsAffected == 0 {
// new item
insertResult, err := r.db.squirrel.
Insert("sync_data").
Columns(
"user_api_key",
"updated_at",
"data",
"data_etag",
).
Values(apiKey, now, data, newEtag).
RunWith(r.db.handler).ExecContext(ctx)

if err != nil {
return nil, errors.Wrap(err, "error executing query")
}

if rowsAffected, err := insertResult.RowsAffected(); err != nil {
return nil, errors.Wrap(err, "error executing query")

} else if rowsAffected == 0 {
// multi devices race condition
return nil, errors.New("no rows affected")
}
}

r.log.Debug().Msgf("Sync data upsert: api_key=\"%v\"", apiKey)
return &newEtag, nil
}

// Replace sync data only if the etag matches,
// returns the new etag if updated, or nil if not.
func (r SyncRepo) SetSyncDataIfMatch(ctx context.Context, apiKey string, etag string, data []byte) (*string, error) {
now := time.Now()
// the better way is use hash like sha1
// but uuid is faster than sha1
newEtag := "uuid=" + uuid.NewString()

result, err := r.db.squirrel.
Update("sync_data").
Set("updated_at", now).
Set("data", data).
Set("data_etag", newEtag).
Where(sq.Eq{"user_api_key": apiKey}).
Where(sq.Eq{"data_etag": etag}).
RunWith(r.db.handler).ExecContext(ctx)

if err != nil {
return nil, errors.Wrap(err, "error executing query")
}

if rowsAffected, err := result.RowsAffected(); err != nil {
return nil, errors.Wrap(err, "error executing query")

} else if rowsAffected == 0 {
r.log.Debug().Msgf("ETag mismatch for api_key=\"%v\". Concurrent modification detected, aborting sync to prevent data overwrite. ETag=\"%v\"", apiKey, etag)
return nil, nil

} else {
r.log.Debug().Msgf("Sync data replaced: api_key=\"%v\", etag=\"%v\"", apiKey, etag)
return &newEtag, nil
}
}
11 changes: 11 additions & 0 deletions internal/domain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ type SyncRepo interface {
GetSyncLockFile(ctx context.Context, apiKey string) (*SyncLockFile, error)
UpdateSyncLockFile(ctx context.Context, syncLockFile *SyncLockFile) (*SyncLockFile, error)
DeleteSyncLockFile(ctx context.Context, apiKey string) bool

// Get etag of sync data.
// For avoid memory usage, only the etag will be returned.
GetSyncDataETag(ctx context.Context, apiKey string) (*string, error)
// Get sync data and etag
GetSyncDataAndETag(ctx context.Context, apiKey string) ([]byte, *string, error)
// Create or replace sync data, returns the new etag.
SetSyncData(ctx context.Context, apiKey string, data []byte) (*string, error)
// Replace sync data only if the etag matches,
// returns the new etag if updated, or nil if not.
SetSyncDataIfMatch(ctx context.Context, apiKey string, etag string, data []byte) (*string, error)
}

type Sync struct {
Expand Down
89 changes: 75 additions & 14 deletions internal/http/sync.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
package http

import (
"context"
"encoding/json"
"io"
"net/http"
"strconv"

"github.com/SyncYomi/SyncYomi/internal/domain"
"github.com/SyncYomi/SyncYomi/internal/sync"
"github.com/go-chi/chi/v5"
)

type syncService interface {
Store(ctx context.Context, sync *domain.Sync) (*domain.Sync, error)
Delete(ctx context.Context, id int) error
Update(ctx context.Context, sync *domain.Sync) (*domain.Sync, error)
ListSyncs(ctx context.Context, apiKey string) ([]domain.Sync, error)
GetSyncByApiKey(ctx context.Context, apiKey string) (*domain.Sync, error)
GetSyncData(ctx context.Context, apiKey string) (*domain.SyncData, error)
SyncData(ctx context.Context, sync *domain.SyncData) (*domain.SyncData, error)
GetSyncLockFile(ctx context.Context, apiKey string) (*domain.SyncLockFile, error)
CreateSyncLockFile(ctx context.Context, apiKey string, acquiredBy string) (*domain.SyncLockFile, error)
UpdateSyncLockFile(ctx context.Context, syncLockFile *domain.SyncLockFile) (*domain.SyncLockFile, error)
DeleteSyncLockFile(ctx context.Context, apiKey string) bool
}
type syncService = sync.Service

type syncHandler struct {
encoder encoder
Expand All @@ -38,6 +26,8 @@ func newSyncHandler(encoder encoder, syncService syncService) *syncHandler {
}

func (h syncHandler) Routes(r chi.Router) {
r.Get("/content", h.getContent)
r.Put("/content", h.putContent)
r.Post("/", h.store)
r.Delete("/{id}", h.delete)
r.Get("/", h.listSyncs)
Expand Down Expand Up @@ -318,3 +308,74 @@ func (h syncHandler) updateSyncLockFile(w http.ResponseWriter, r *http.Request)

h.encoder.StatusResponse(ctx, w, lockFile, http.StatusOK)
}

func (h syncHandler) getContent(w http.ResponseWriter, r *http.Request) {
apiKey := r.Header.Get("X-API-Token")
etag := r.Header.Get("If-None-Match")

if etag != "" {
etagInDb, err := h.syncService.GetSyncDataETag(r.Context(), apiKey)
if err != nil {
h.encoder.StatusInternalError(w)
return
}

if etagInDb != nil && etag == *etagInDb {
// nothing changed after last request
// see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
w.WriteHeader(http.StatusNotModified)
return
}
}

syncData, syncDataETag, err := h.syncService.GetSyncDataAndETag(r.Context(), apiKey)

if err != nil {
h.encoder.StatusInternalError(w)
return
}

if syncData == nil {
w.WriteHeader(http.StatusNotFound)
return
}

if syncDataETag != nil {
w.Header().Set("ETag", *syncDataETag)
}

w.Header().Set("Content-Type", "application/octet-stream")
w.Write(syncData)
w.WriteHeader(http.StatusOK)
}

func (h syncHandler) putContent(w http.ResponseWriter, r *http.Request) {
apiKey := r.Header.Get("X-API-Token")
etag := r.Header.Get("If-Match")

// Read data from request body
requestData, err := io.ReadAll(r.Body)
if err != nil {
h.encoder.StatusResponse(r.Context(), w, err.Error(), http.StatusBadRequest)
return
}

var newEtag *string
if etag != "" {
newEtag, err = h.syncService.SetSyncDataIfMatch(r.Context(), apiKey, etag, requestData)
} else {
newEtag, err = h.syncService.SetSyncData(r.Context(), apiKey, requestData)
}
if err != nil {
h.encoder.StatusInternalError(w)
}

if newEtag == nil {
// syncdata was changed from other clients
// see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Match
w.WriteHeader(http.StatusPreconditionFailed)
} else {
w.Header().Set("ETag", *newEtag)
w.WriteHeader(http.StatusOK)
}
}
Loading

0 comments on commit 310f07f

Please sign in to comment.