Skip to content

Commit

Permalink
add explicit type
Browse files Browse the repository at this point in the history
  • Loading branch information
xuwei-k committed Oct 16, 2023
1 parent 629cf79 commit 2aba9fe
Show file tree
Hide file tree
Showing 142 changed files with 315 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BaseTypeClassLawsForTaskWithCallbackSuite(implicit opts: Task.Options) ext
A: Eq[A],
ec: TestScheduler,
opts: Options
) = {
): Eq[Task[A]] = {

Eq.by { task =>
val p = Promise[A]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.observers.buffers

import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.internal.collection.JSArrayQueue
import monix.execution.internal.math.nextPowerOf2
import scala.util.control.NonFatal
Expand All @@ -37,7 +38,7 @@ private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A, R](
private[this] val bufferSize = nextPowerOf2(_size)

private[this] val em = out.scheduler.executionModel
implicit final val scheduler = out.scheduler
implicit final val scheduler: Scheduler = out.scheduler

private[this] var upstreamIsComplete = false
private[this] var downstreamIsComplete = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.observers.buffers
import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.internal.collection.{ JSArrayQueue, _ }

import scala.util.control.NonFatal
Expand All @@ -38,7 +39,7 @@ private[observers] final class SyncBufferedSubscriber[-A] private (
onOverflow: Long => Coeval[Option[A]] /*| Null*/,
) extends BufferedSubscriber[A] with Subscriber.Sync[A] {

implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
// to be modified only in onError, before upstreamIsComplete
private[this] var errorThrown: Throwable = _
// to be modified only in onError / onComplete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.zip.Deflater

import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.{ CompressionLevel, CompressionParameters, CompressionStrategy, FlushMode }
import monix.reactive.observers.Subscriber
Expand All @@ -36,7 +37,7 @@ private[compression] final class DeflateOperator(
) extends Operator[Array[Byte], Array[Byte]] {
override def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] = {
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var ack: Future[Ack] = Continue
private[this] val deflate =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.{ util => ju }

import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.internal.operators.Gunzipper._
import monix.reactive.compression.{
Expand All @@ -42,7 +43,7 @@ private[compression] final class GunzipOperator(bufferSize: Int) extends Operato

def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] =
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var isDone = false
private[this] var ack: Future[Ack] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.zip.{ CRC32, Deflater }

import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.internal.operators.Gzipper.gzipOperatingSystem
import monix.reactive.compression.{
Expand Down Expand Up @@ -52,7 +53,7 @@ private[compression] final class GzipOperator(
) extends Operator[Array[Byte], Array[Byte]] {
override def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] = {
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var ack: Future[Ack] = _
private[this] val gzipper =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.{ util => ju }

import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.CompressionException
import monix.reactive.observers.Subscriber
Expand All @@ -36,7 +37,7 @@ private[compression] final class InflateOperator(bufferSize: Int, noWrap: Boolea

def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] =
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var isDone = false
private[this] var ack: Future[Ack] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import monix.execution.{ Ack, ChannelType }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.BufferCapacity.Unbounded
import monix.execution.ChannelType._
import monix.execution.Scheduler
import monix.execution.atomic.Atomic
import monix.execution.atomic.PaddingStrategy.LeftRight256
import monix.execution.internal.collection.LowLevelConcurrentQueue
Expand All @@ -46,7 +47,7 @@ private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A, R](

private[this] val bufferSize = math.nextPowerOf2(_bufferSize)
private[this] val em = out.scheduler.executionModel
implicit final val scheduler = out.scheduler
implicit final val scheduler: Scheduler = out.scheduler

protected final val queue: LowLevelConcurrentQueue[A] =
LowLevelConcurrentQueue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.observers.buffers
import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.atomic.PaddingStrategy.{ LeftRight128, LeftRight256 }
import monix.execution.atomic.{ Atomic, AtomicInt }

Expand All @@ -43,7 +44,7 @@ private[observers] final class DropNewBufferedSubscriber[A] private (

require(bufferSize > 0, "bufferSize must be a strictly positive number")

implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
private[this] val em = out.scheduler.executionModel

private[this] val itemsToPush =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.observers.buffers
import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.atomic.PaddingStrategy.{ LeftRight128, LeftRight256 }
import monix.execution.atomic.{ Atomic, AtomicAny, AtomicInt }
import monix.execution.internal.math
Expand Down Expand Up @@ -114,7 +115,7 @@ private[observers] abstract class AbstractEvictingBufferedSubscriber[-A](

require(strategy.bufferSize > 0, "bufferSize must be a strictly positive number")

implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
private[this] val em = out.scheduler.executionModel

private[this] val droppedCount: AtomicInt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import monix.execution.{ Ack, ChannelType }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.BufferCapacity.{ Bounded, Unbounded }
import monix.execution.ChannelType.SingleConsumer
import monix.execution.Scheduler
import monix.execution.atomic.Atomic
import monix.execution.atomic.PaddingStrategy.LeftRight256
import monix.execution.exceptions.BufferOverflowException
Expand Down Expand Up @@ -62,7 +63,7 @@ private[observers] abstract class AbstractSimpleBufferedSubscriber[A] protected

private[this] val queue = _qRef
private[this] val em = out.scheduler.executionModel
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
private[this] val itemsToPush =
Atomic.withPadding(0, LeftRight256)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ abstract class Observable[+A] extends Serializable { self =>
): Cancelable = {

subscribe(new Subscriber[A] {
implicit val scheduler = s
implicit val scheduler: Scheduler = s

def onNext(elem: A) = nextFn(elem)
def onComplete() = completedFn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.internal.builders

import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.cancelables.CompositeCancelable
import scala.util.control.NonFatal
import monix.reactive.Observable
Expand Down Expand Up @@ -121,7 +122,7 @@ private[reactive] final class CombineLatest2Observable[A1, A2, +R](obsA1: Observ
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -143,7 +144,7 @@ private[reactive] final class CombineLatest2Observable[A1, A2, +R](obsA1: Observ
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.internal.builders

import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.cancelables.CompositeCancelable
import scala.util.control.NonFatal
import monix.reactive.Observable
Expand Down Expand Up @@ -128,7 +129,7 @@ private[reactive] final class CombineLatest3Observable[A1, A2, A3, +R](
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -150,7 +151,7 @@ private[reactive] final class CombineLatest3Observable[A1, A2, A3, +R](
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -172,7 +173,7 @@ private[reactive] final class CombineLatest3Observable[A1, A2, A3, +R](
})

composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A3): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.internal.builders
import monix.execution.cancelables.CompositeCancelable
import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import scala.util.control.NonFatal
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
Expand Down Expand Up @@ -133,7 +134,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -155,7 +156,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -177,7 +178,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
})

composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A3): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -199,7 +200,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
})

composite += obsA4.unsafeSubscribeFn(new Subscriber[A4] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A4): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.internal.builders

import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.cancelables.CompositeCancelable
import scala.util.control.NonFatal
import monix.reactive.Observable
Expand Down Expand Up @@ -138,7 +139,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -160,7 +161,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -182,7 +183,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A3): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -204,7 +205,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA4.unsafeSubscribeFn(new Subscriber[A4] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A4): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -226,7 +227,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA5.unsafeSubscribeFn(new Subscriber[A5] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A5): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down

0 comments on commit 2aba9fe

Please sign in to comment.