-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove redisDB dependency #25
Conversation
d83aebc
to
595cd27
Compare
ytsync/manager.go
Outdated
@@ -149,7 +165,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st | |||
res, _ := http.PostForm(endpoint, vals) | |||
defer res.Body.Close() | |||
body, _ := ioutil.ReadAll(res.Body) | |||
var response apiSyncUpdateResponse | |||
var response apiVideoStatusResponse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't have to create a named struct for this if its only used in one place. you can inline it as
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will look into that
ytsync/ytsync.go
Outdated
lbryChannelID string | ||
|
||
videosMapMux sync.Mutex | ||
mux sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you have multiple mutexes, you can't call one of them just mux
. its not clear what that's for. it could be something like walletSetupMux
. though once you name it that, it becomes clear that something might be wrong with wrapping the whole wallet setup in a single mutex. does the whole thing actually need to be locked?
the mutexes should be pointers, so new copies are not created if the Sync struct is copied
its clearer if you put the mutex right above the thing the mutex is locking, and leave newlines on either side. so in this example, upt videosMapMux
right above syncedVideos
. and then maybe rename it to syncedVideosMux
, like so:
...other vars...
syncedVideosMux *sync.Mutex
syncedVideos map[string]syncedVideo
...other vars...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use a read/write lock to lock the map for reading when you read it, and writing when you write to it. this lets multiple threads read the data at once, which is safe and blocks less. use *sync.RWMutex
instead, and call mux.RLock()
and mux.RUnlock()
when you're only reading from the variable. leave mux.Lock()
for writing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename and move the mutexes.
The reason I didn't use read/write locks is that i don't want the application to read when it's being written to, plus the locks are held for a very very short time so there would be no noticeable improvement, only a higher risk of race conditions happening during wallet refills.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a read/write lock does not allow reading during writing. thats the point of every lock. what it does allow is multiple concurrent reads. when you do RLock, others can RLock at the same time. when you Lock (for writing), no one's allowed to Lock or RLock at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, you're right, not sure what was going in my mind. I'll swap that
ytsync/ytsync.go
Outdated
|
||
videosMapMux sync.Mutex | ||
mux sync.Mutex | ||
wg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should also be a pointer. but more importantly, you don't need this if you already have a stop.Group. stop.Group works as a combo WaitGroup + channel that will be closed to indicate stopping (and can be closed safely multiple times). so use grp.Add()
and grp.Wait()
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it won't let me comment below, so I'm commenting here:
you call Add()
and Done()
inside startWorker(workerNum int)
, but the correct pattern is to make those calls outside the function. startWorker() doesn't know if its being run asynchronously or not. you only need a waitgroup if it is. there are also subtle concurrency issues with calling it inside the function. so the right way to go is to remove Add and Done from inside startWorker, and do this:
for i := 0; i < s.ConcurrentVideos; i++ {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.startWorker(i)
}()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to look into that, this is from your original code and I don't remember ever changing it. Thanks for the pointers there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked more into why we need to lock the whole walletSetup function. The reason I'm doing that is to avoid multiple threads from refilling the wallet concurrently. I don't think I can easily break up the the function to lock fewer lines of code. I think it's fair to leave it like that.
I removed the wait group in favor of the stop group
ytsync/ytsync.go
Outdated
@@ -182,6 +195,9 @@ func (s *Sync) FullCycle() (e error) { | |||
} | |||
|
|||
s.db = redisdb.New() | |||
s.videosMapMux.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this code all the way down here, so far from line 125 where syncedVideos is first declared?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only reason i put it there is because there were a bunch of assignments there and i wanted to keep them grouped, but I agree it's not very read-able this way. I'll move them up
ytsync/setup.go
Outdated
} | ||
s.videosMapMux.Lock() | ||
numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... | ||
s.videosMapMux.Unlock() | ||
log.Debugf("We already published %d videos", numPublished) | ||
|
||
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are all these cast to floats? they can be ints here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no clue. but you're completely right. Will fix
"the video is too big to sync, skipping for now", | ||
} | ||
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) { | ||
log.Println(v.ID() + " can't ever be published") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how often does this happen, and do you know why? is it within our control?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the first happens due to youtube restrictions (either region, age or deleted videos)
the second happens when we hit the defined size limit (it's in our control).
While we can't do anything about the first, we can raise the limits for the second, but as per several discussions we'll keep it at 2GB or less.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of 100k published videos we have about 1% that failed for those two (leading) reasons: https://scrn.storni.info/2018-08-09_12-24-24-339707393.png
use video statuses
reinstate redis db to detect inconsistencies
add locking fix bugs
should make startup faster bug fixes
bad bug fixes remove half broken code used to wait for UTXOs use channel IDs rather than names (skip lbryum) port changes to UCB uploader
798e7cc
to
37c1595
Compare
101bda3
to
58c8e7f
Compare
refactoring address reviews
addressed reviews |
@@ -149,7 +159,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st | |||
res, _ := http.PostForm(endpoint, vals) | |||
defer res.Body.Close() | |||
body, _ := ioutil.ReadAll(res.Body) | |||
var response apiSyncUpdateResponse | |||
var response struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we not using the aligned code in the api package? I would make sure we are making things DRY. api.Response
s.mux.Lock() | ||
defer s.mux.Unlock() | ||
s.walletMux.Lock() | ||
defer s.walletMux.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we using a mutex lock? What is wrong with concurrent execution? A mutex is not the standard way to handle synchronization. It can cause deadlocks. Ideally if you want things to run synchronously from different go routines, you use channels in golang.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the only place the lock is used and no multiple locks are being held that can cause a deadlock here.
However I would like to discuss channels with you to understand how they could be used here (not sure they can)
@@ -30,31 +30,32 @@ func (s *Sync) walletSetup() error { | |||
balance := decimal.Decimal(*balanceResp) | |||
log.Debugf("Starting balance is %s", balance.String()) | |||
|
|||
var numOnSource uint64 | |||
var numOnSource int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this to an int? It better to be more specific than more general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grins previous review outlined a mess with casts here and there to make simple math.
I changed everything to int as it's reasonable for the values they represent.
@@ -30,31 +30,32 @@ func (s *Sync) walletSetup() error { | |||
balance := decimal.Decimal(*balanceResp) | |||
log.Debugf("Starting balance is %s", balance.String()) | |||
|
|||
var numOnSource uint64 | |||
var numOnSource int | |||
if s.LbryChannelName == "@UCBerkeley" { | |||
numOnSource = 10104 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this?! not part of the PR but this is not good to have in the code base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can explain the berkeley stuff to you via DM, it's all good as it will be eventually removed from here. Not worth changing now
return err | ||
} | ||
|
||
for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so is this ok? If there is a problem it would be an infinite loop no? Maybe have a maxit
?
if channelName != "" { | ||
options.ChannelName = &channelName | ||
Title: &v.title, | ||
Author: strPtr("UC Berkeley"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the author UC Berkley? should this function be called publishUCBerkley?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahhh, ucbVideo
Seems odd to have a struct type for a specific author.
queue chan video | ||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { | ||
s.syncedVideosMux.Lock() | ||
defer s.syncedVideosMux.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we have a mutex lock here. Why would this get called twice in different go routines on the same sync instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
videos end concurrently, that's why
interruptChan := make(chan os.Signal, 1) | ||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) | ||
go func() { | ||
<-interruptChan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not do this here. Please use the stopper pattern grin built in the stop package.
if err != nil { | ||
return err | ||
} | ||
s.syncedVideosMux.Lock() | ||
s.syncedVideos = syncedVideos | ||
s.syncedVideosMux.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last comment on mutex locks. We should discuss the pattern you are using here, as I am almost positive using channels is a better and more robust solution design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i agree with most of beamer's comments, and none are major blockers, so im fine with merging this now and cleaning up as you continue to work with the code.
in general im trying to be less strict on PRs when the code works but has little nits in it.
This PR aims at removing the local redisdb dependency that makes it impossible to redistribute work across different servers.
Further work has been put in, here's a summary: