-
Notifications
You must be signed in to change notification settings - Fork 7
/
Gopher.scala
122 lines (96 loc) · 3.79 KB
/
Gopher.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package gopher
import cps._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.util._
import java.util.logging.{Level => LogLevel}
import java.util.concurrent.Executor
/**
* core of Gopher API. Given instance of Gopher[F] need for using most of Gopher operations.
*
* Gopher is a framework, which implements CSP (Communication Sequence Process).
* Process here - scala units of execution (i.e. functions, blok of code, etc).
* Communication channels represented by [gopher.Channel]
*
* @see [[gopher.Channel]]
* @see [[gopher#select]]
**/
trait Gopher[F[_]:CpsSchedulingMonad]:
type Monad[X] = F[X]
/**
* Monad which control asynchronic execution.
* The main is scheduling: i.e. ability to submit monadic expression to scheduler
* and know that this monadic expression will be evaluated.
**/
def asyncMonad: CpsSchedulingMonad[F] = summon[CpsSchedulingMonad[F]]
/**
* Create Read/Write channel.
* @param bufSize - size of buffer. If it is zero, the channel is unbuffered. (i.e. writer is blocked until reader start processing).
* @param autoClose - close after first message was written to channel.
* @see [gopher.Channel]
**/
def makeChannel[A](bufSize:Int = 0,
autoClose: Boolean = false): Channel[F,A,A]
/**
* Create channel where you can write only one element.
* @see [gopher.Channel]
**/
def makeOnceChannel[A](): Channel[F,A,A] =
makeChannel[A](1,true)
/***
*Create a select statement, which used for choosing one action from a set of potentially concurrent asynchronics events.
*[@see [[gopher.Select#apply]]
**/
def select: Select[F] =
new Select[F](this)
/**
* get an object with time operations.
* @see [[gopher.Time]]
**/
def time: Time[F]
/**
* set logging function, which output internal diagnostics and errors from spawned processes.
**/
def setLogFun(logFun:(LogLevel, String, Throwable|Null) => Unit): ((LogLevel, String, Throwable|Null) => Unit)
def log(level: LogLevel, message: String, ex: Throwable| Null): Unit
def log(level: LogLevel, message: String): Unit =
log(level,message, null)
def taskExecutionContext: ExecutionContext
protected[gopher] def logImpossible(ex: Throwable): Unit =
log(LogLevel.WARNING, "impossible", ex)
protected[gopher] def spawnAndLogFail[T](op: =>F[T]): F[Unit] =
asyncMonad.mapTry(asyncMonad.spawn(op)){
case Success(_) => ()
case Failure(ex) =>
log(LogLevel.WARNING, "exception in spawned process", ex)
()
}
end Gopher
/**
* Create Read/Write channel.
* @param bufSize - size of buffer. If it is zero, the channel is unbuffered. (i.e. writer is blocked until reader start processing).
* @param autoClose - close after first message was written to channel.
* @see [gopher.Channel]
**/
def makeChannel[A](bufSize:Int = 0,
autoClose: Boolean = false)(using g:Gopher[?]):Channel[g.Monad,A,A] =
g.makeChannel(bufSize, autoClose)
def makeOnceChannel[A]()(using g:Gopher[?]): Channel[g.Monad,A,A] =
g.makeOnceChannel[A]()
def select(using g:Gopher[?]):Select[g.Monad] =
g.select
/**
* represent `F[_]` as read channel.
**/
def futureInput[F[_],A](f: F[A])(using g: Gopher[F]): ReadChannel[F,A] =
val ch = g.makeOnceChannel[Try[A]]()
g.spawnAndLogFail{
g.asyncMonad.flatMapTry(f)(r => ch.awrite(r))
}
ch.map(_.get)
extension [F[_],A](fa: F[A])(using g: Gopher[F])
def asChannel : ReadChannel[F,A] =
futureInput(fa)
extension [F[_],A](c: IterableOnce[A])(using g: Gopher[F])
def asReadChannel: ReadChannel[F,A] =
ReadChannel.fromIterable(c)