Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rx: fix Join operator #179

Open
Tracked by #122
plastikfan opened this issue Apr 11, 2024 · 1 comment
Open
Tracked by #122

rx: fix Join operator #179

plastikfan opened this issue Apr 11, 2024 · 1 comment
Assignees
Labels
bug Something isn't working

Comments

@plastikfan
Copy link
Contributor

plastikfan commented Apr 11, 2024

Join operator is a bit more complicated to port over, due to its use of int64 to represent a time value:

// Returns absolute value for int64
func abs(n int64) int64 {
	y := n >> 63
	return (n ^ y) - y
}

The join algorithm performs bit shifting on time values (window duration), which are sent through channel. So how do we shoehorn this into the new representation of Item[T]?

Introducing a new time value member on Item, causes rippling changes accross different types like Func. This problem needs to be fixed into its own issue.

@plastikfan plastikfan mentioned this issue Apr 11, 2024
59 tasks
@plastikfan plastikfan self-assigned this Apr 11, 2024
@plastikfan plastikfan added the bug Something isn't working label Apr 11, 2024
@plastikfan
Copy link
Contributor Author

plastikfan commented Apr 11, 2024

This was the first attempt of implementing Join, that was abandoned:

// Returns absolute value for int
func absN(n int) int {
	sz := unsafe.Sizeof(int(0)) / 8
	y := n >> sz

	return (n ^ y) - y
}

// Join combines items emitted by two Observables whenever an item from one
// Observable is emitted during a time window defined according to an item
// emitted by the other Observable. The time is extracted using a
// timeExtractor function.
func (o *ObservableImpl[T]) Join(joiner Func2[T], right Observable[T],
	timeExtractor TimeExtractorFunc[T], window Duration, opts ...Option[T],
) Observable[T] {
	f := func(ctx context.Context, next chan Item[T], option Option[T], opts ...Option[T]) {
		defer close(next)
		windowDuration := window.duration()
		rBuf := make([]Item[T], 0)

		lObserve := o.Observe()
		rObserve := right.Observe()
	lLoop:
		for {
			select {
			case <-ctx.Done():
				return
			case lItem, ok := <-lObserve:
				if lItem.V == nil && !ok {
					return
				}

				if lItem.IsError() {
					lItem.SendContext(ctx, next)
					if option.getErrorStrategy() == StopOnError {
						return
					}
					continue
				}

				lTime := timeExtractor(lItem.V).UnixNano()
				cutPoint := 0

				for i, rItem := range rBuf {
					rTime := timeExtractor(rItem.V).UnixNano()

					if abs(lTime-rTime) <= windowDuration {
						i, err := joiner(ctx, lItem.V, rItem.V)

						if err != nil {
							Error[T](err).SendContext(ctx, next)

							if option.getErrorStrategy() == StopOnError {
								return
							}
							continue
						}
						Of(i).SendContext(ctx, next)
					}

					if lTime > rTime+windowDuration {
						cutPoint = i + 1
					}
				}

				rBuf = rBuf[cutPoint:]

				for {
					select {
					case <-ctx.Done():
						return
					case rItem, ok := <-rObserve:
						if rItem.V == nil && !ok {
							continue lLoop
						}
						if rItem.IsError() {
							rItem.SendContext(ctx, next)

							if option.getErrorStrategy() == StopOnError {
								return
							}
							continue
						}

						rBuf = append(rBuf, rItem)
						rTime := timeExtractor(rItem.V).UnixNano()

						if abs(lTime-rTime) <= windowDuration {
							i, err := joiner(ctx, lItem.V, rItem.V)
							if err != nil {
								Error[T](err).SendContext(ctx, next)
								if option.getErrorStrategy() == StopOnError {
									return
								}
								continue
							}
							Of(i).SendContext(ctx, next)

							continue
						}
						continue lLoop
					}
				}
			}
		}
	}

	return customObservableOperator(o.parent, f, opts...)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant