@@ -2,6 +2,7 @@ package redis_kotlin
2
2
3
3
import io.reactivex.rxjava3.core.Completable
4
4
import io.reactivex.rxjava3.core.Single
5
+ import kotlinx.coroutines.*
5
6
import org.redisson.Redisson
6
7
import org.redisson.RedissonMultiLock
7
8
import org.redisson.api.*
@@ -10,8 +11,6 @@ import org.redisson.config.Config
10
11
import reactor.core.publisher.Mono
11
12
import java.io.Serializable
12
13
import java.time.Duration;
13
- import java.util.concurrent.TimeUnit
14
-
15
14
16
15
data class Book (val pages : Int , val chapter : Int , val author : String ) : Serializable
17
16
@@ -90,34 +89,6 @@ class App {
90
89
myAtomicLong.unlink() // clean up, happens async
91
90
println (" myAtomicLong after unlink/cleanup: $myAtomicLong " )
92
91
}
93
-
94
- private fun atomicLongCoroutines (redisson : RedissonClient , newValue : Long ) {
95
- printHelper(" atomicLong corountines" )
96
- val myAtomicLong: RAtomicLong = redisson.getAtomicLong(" myAtomicLongCoRoutine" )
97
- println (" initial myAtomicLong: $myAtomicLong " )
98
-
99
- val job = GlobalScope .launch {
100
- for (i in 1 .. 7 ) {
101
- myAtomicLong.incrementAndGet()
102
- println (" myAtomicLong after increment and set: $myAtomicLong (loop run # $i )" )
103
- }
104
- }
105
- // Waiting for the increments to finish
106
- job.join()
107
-
108
- val jobs = mutableListOf<Job >()
109
- repeat(1 ) {
110
- jobs.add(GlobalScope .launch {
111
- myAtomicLong.get()
112
- println (" myAtomicLong after get: $myAtomicLong " )
113
- })
114
- }
115
-
116
- // Waiting for the jobs to finish
117
- jobs.joinAll()
118
- myAtomicLong.unlink() // clean up, happens async
119
- println (" myAtomicLong after unlink/cleanup: $myAtomicLong " )
120
- }
121
92
122
93
private fun atomicLongReactive (redisson : RedissonClient , newValue : Long ) {
123
94
printHelper(" atomicLong reactive interface" )
@@ -162,6 +133,33 @@ class App {
162
133
)
163
134
}
164
135
136
+ private suspend fun atomicLongCoroutines (redisson : RedissonClient , newValue : Long ) {
137
+ printHelper(" atomicLong corountines" )
138
+ val myAtomicLong: RAtomicLong = redisson.getAtomicLong(" myAtomicLongCoRoutine" )
139
+ println (" initial myAtomicLong: $myAtomicLong " )
140
+
141
+ val scope = CoroutineScope (Dispatchers .IO + SupervisorJob ())
142
+ val incAndGetJob = scope.launch {
143
+ for (i in 1 .. 7 ) {
144
+ myAtomicLong.incrementAndGet()
145
+ println (" myAtomicLong after increment and set: $myAtomicLong (loop run # $i )" )
146
+ }
147
+ }
148
+ // Waiting for the increments & gets to finish
149
+ incAndGetJob.join()
150
+
151
+ val getJob = scope.launch {
152
+ myAtomicLong.get()
153
+ println (" myAtomicLong after get: $myAtomicLong " )
154
+ }
155
+
156
+ // Waiting for the get job to finish
157
+ getJob.join()
158
+
159
+ myAtomicLong.unlink() // clean up, happens async
160
+ println (" myAtomicLong after unlink/cleanup: $myAtomicLong " )
161
+ }
162
+
165
163
private fun bucket (redissonClient : RedissonClient , bucketName : String , value : String ) {
166
164
printHelper(" bucket" )
167
165
val bucket = redissonClient.getBucket<String >(bucketName)
@@ -349,6 +347,9 @@ class App {
349
347
atomicLongReactive(redisson.redissonClient, 3L )
350
348
atomicLongRXJava3(redisson.redissonClient, 3L )
351
349
Thread .sleep(1000 ) // wait for 1 second to complete RX operations
350
+ runBlocking {
351
+ atomicLongCoroutines(redisson.redissonClient, 3L )
352
+ }
352
353
bucket(redisson.redissonClient, " foo" , " bar" ) // buckets
353
354
`object `(redisson.redissonClient, 100 , 10 , " some author" )
354
355
topic(redisson.redissonClient, " new message" )
0 commit comments