Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add explicit type for implicits #1783

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BaseTypeClassLawsForTaskWithCallbackSuite(implicit opts: Task.Options) ext
A: Eq[A],
ec: TestScheduler,
opts: Options
) = {
): Eq[Task[A]] = {

Eq.by { task =>
val p = Promise[A]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.observers.buffers

import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.internal.collection.JSArrayQueue
import monix.execution.internal.math.nextPowerOf2
import scala.util.control.NonFatal
Expand All @@ -37,7 +38,7 @@ private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A, R](
private[this] val bufferSize = nextPowerOf2(_size)

private[this] val em = out.scheduler.executionModel
implicit final val scheduler = out.scheduler
implicit final val scheduler: Scheduler = out.scheduler

private[this] var upstreamIsComplete = false
private[this] var downstreamIsComplete = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.observers.buffers
import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.internal.collection.{ JSArrayQueue, _ }

import scala.util.control.NonFatal
Expand All @@ -38,7 +39,7 @@ private[observers] final class SyncBufferedSubscriber[-A] private (
onOverflow: Long => Coeval[Option[A]] /*| Null*/,
) extends BufferedSubscriber[A] with Subscriber.Sync[A] {

implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
// to be modified only in onError, before upstreamIsComplete
private[this] var errorThrown: Throwable = _
// to be modified only in onError / onComplete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.zip.Deflater

import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.{ CompressionLevel, CompressionParameters, CompressionStrategy, FlushMode }
import monix.reactive.observers.Subscriber
Expand All @@ -36,7 +37,7 @@ private[compression] final class DeflateOperator(
) extends Operator[Array[Byte], Array[Byte]] {
override def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] = {
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var ack: Future[Ack] = Continue
private[this] val deflate =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.{ util => ju }

import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.internal.operators.Gunzipper._
import monix.reactive.compression.{
Expand All @@ -42,7 +43,7 @@ private[compression] final class GunzipOperator(bufferSize: Int) extends Operato

def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] =
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var isDone = false
private[this] var ack: Future[Ack] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.zip.{ CRC32, Deflater }

import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.internal.operators.Gzipper.gzipOperatingSystem
import monix.reactive.compression.{
Expand Down Expand Up @@ -52,7 +53,7 @@ private[compression] final class GzipOperator(
) extends Operator[Array[Byte], Array[Byte]] {
override def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] = {
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var ack: Future[Ack] = _
private[this] val gzipper =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.{ util => ju }

import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.reactive.Observable.Operator
import monix.reactive.compression.CompressionException
import monix.reactive.observers.Subscriber
Expand All @@ -36,7 +37,7 @@ private[compression] final class InflateOperator(bufferSize: Int, noWrap: Boolea

def apply(out: Subscriber[Array[Byte]]): Subscriber[Array[Byte]] =
new Subscriber[Array[Byte]] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

private[this] var isDone = false
private[this] var ack: Future[Ack] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import monix.execution.{ Ack, ChannelType }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.BufferCapacity.Unbounded
import monix.execution.ChannelType._
import monix.execution.Scheduler
import monix.execution.atomic.Atomic
import monix.execution.atomic.PaddingStrategy.LeftRight256
import monix.execution.internal.collection.LowLevelConcurrentQueue
Expand All @@ -46,7 +47,7 @@ private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A, R](

private[this] val bufferSize = math.nextPowerOf2(_bufferSize)
private[this] val em = out.scheduler.executionModel
implicit final val scheduler = out.scheduler
implicit final val scheduler: Scheduler = out.scheduler

protected final val queue: LowLevelConcurrentQueue[A] =
LowLevelConcurrentQueue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.observers.buffers
import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.atomic.PaddingStrategy.{ LeftRight128, LeftRight256 }
import monix.execution.atomic.{ Atomic, AtomicInt }

Expand All @@ -43,7 +44,7 @@ private[observers] final class DropNewBufferedSubscriber[A] private (

require(bufferSize > 0, "bufferSize must be a strictly positive number")

implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
private[this] val em = out.scheduler.executionModel

private[this] val itemsToPush =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.observers.buffers
import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.atomic.PaddingStrategy.{ LeftRight128, LeftRight256 }
import monix.execution.atomic.{ Atomic, AtomicAny, AtomicInt }
import monix.execution.internal.math
Expand Down Expand Up @@ -114,7 +115,7 @@ private[observers] abstract class AbstractEvictingBufferedSubscriber[-A](

require(strategy.bufferSize > 0, "bufferSize must be a strictly positive number")

implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
private[this] val em = out.scheduler.executionModel

private[this] val droppedCount: AtomicInt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import monix.execution.{ Ack, ChannelType }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.BufferCapacity.{ Bounded, Unbounded }
import monix.execution.ChannelType.SingleConsumer
import monix.execution.Scheduler
import monix.execution.atomic.Atomic
import monix.execution.atomic.PaddingStrategy.LeftRight256
import monix.execution.exceptions.BufferOverflowException
Expand Down Expand Up @@ -62,7 +63,7 @@ private[observers] abstract class AbstractSimpleBufferedSubscriber[A] protected

private[this] val queue = _qRef
private[this] val em = out.scheduler.executionModel
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler
private[this] val itemsToPush =
Atomic.withPadding(0, LeftRight256)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ abstract class Observable[+A] extends Serializable { self =>
): Cancelable = {

subscribe(new Subscriber[A] {
implicit val scheduler = s
implicit val scheduler: Scheduler = s

def onNext(elem: A) = nextFn(elem)
def onComplete() = completedFn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.internal.builders

import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.cancelables.CompositeCancelable
import scala.util.control.NonFatal
import monix.reactive.Observable
Expand Down Expand Up @@ -121,7 +122,7 @@ private[reactive] final class CombineLatest2Observable[A1, A2, +R](obsA1: Observ
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -143,7 +144,7 @@ private[reactive] final class CombineLatest2Observable[A1, A2, +R](obsA1: Observ
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.internal.builders

import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.cancelables.CompositeCancelable
import scala.util.control.NonFatal
import monix.reactive.Observable
Expand Down Expand Up @@ -128,7 +129,7 @@ private[reactive] final class CombineLatest3Observable[A1, A2, A3, +R](
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -150,7 +151,7 @@ private[reactive] final class CombineLatest3Observable[A1, A2, A3, +R](
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -172,7 +173,7 @@ private[reactive] final class CombineLatest3Observable[A1, A2, A3, +R](
})

composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A3): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package monix.reactive.internal.builders
import monix.execution.cancelables.CompositeCancelable
import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import scala.util.control.NonFatal
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
Expand Down Expand Up @@ -133,7 +134,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -155,7 +156,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -177,7 +178,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
})

composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A3): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -199,7 +200,7 @@ private[reactive] final class CombineLatest4Observable[A1, A2, A3, A4, +R](
})

composite += obsA4.unsafeSubscribeFn(new Subscriber[A4] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A4): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monix.reactive.internal.builders

import monix.execution.{ Ack, Cancelable }
import monix.execution.Ack.{ Continue, Stop }
import monix.execution.Scheduler
import monix.execution.cancelables.CompositeCancelable
import scala.util.control.NonFatal
import monix.reactive.Observable
Expand Down Expand Up @@ -138,7 +139,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
val composite = CompositeCancelable()

composite += obsA1.unsafeSubscribeFn(new Subscriber[A1] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A1): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -160,7 +161,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A2): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -182,7 +183,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A3): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -204,7 +205,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA4.unsafeSubscribeFn(new Subscriber[A4] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A4): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand All @@ -226,7 +227,7 @@ private[reactive] final class CombineLatest5Observable[A1, A2, A3, A4, A5, +R](
})

composite += obsA5.unsafeSubscribeFn(new Subscriber[A5] {
implicit val scheduler = out.scheduler
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A5): Future[Ack] = lock.synchronized {
if (isDone) Stop
Expand Down