# Intro
If you're looking for building a better understanding of `kotlinx.coroutines`, try re-implementing it.
It's quite fascinating that `coroutines`, in the Kotlin ecosystem is almost synonymous with kotlinx.coroutines. However: kotlinx.coroutines is "just a library" maintained by the "Kotlin Libraries" team. You have all tools necessary for re-implementing kotlinx.coroutines in the standard library and within the Kotlin compiler. Let's try.



# What we want to implement.
Here we a very plain and simple Kotlin 'main' function which we can run. There are no surprises, it will greet us with a "Hello". 
However, there is also this 'async' function which could print "Async". Now calling this function with kotlinx.coroutines would be as simple as launching a coroutine in some scope and invoking `async`. 
In this video I'll show how to implement the basics of kotlinx.coroutines such as invoking this `async` method, Dispatchers and switching context by providing our own `withContext` method. 

# Creating the first coroutine
We can create the first coroutine taking any reference to a `suspend` fun and create a coroutine from it using the `createCoroutine` function. 
Note this `createCoroutine` is coming straight from the Kotlin stdlib. 
This function requires us to provide a `Continuation`. This object will care about two things: 
1. It will carry the `coroutine context` of the coroutine: We'll pass an `EmptyCoroutineContext` for now
2. It provides the `resumeWith` function which gets invoked once the coroutine finished. In this case we're just getting the value or throw on any error.

In [1]:
fun main() {
    println("Hello")
}

suspend fun async() {
    println("Async")
}

main()

Hello


In [1]:
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.createCoroutine
import kotlin.coroutines.resume

fun main() {
    val coroutine = ::async.createCoroutine(Continuation(EmptyCoroutineContext) { result -> result.getOrThrow() })
    coroutine.resume(Unit)
}

suspend fun async() {
    println("Async")
}

main()

Async


# Re-Implementing a dispatcher
Yay! We called our first `suspend` function and we can see it being executed. Let's define a `Dispatcher` interface as abstraction over actual threads that can execute code.
The interface shapp be very simple: We just want to 'dispatch' any arbitrary piece of work to some kind of thread or queue, or whatever!
Implementing `CoroutineContext.Element` will allow for providing a given `Dispatcher` implementation in the `currentCoroutineContext`. 

It's a quite common convention to use the `companion object` as key for for the `Context Element`


In [2]:
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.CoroutineContext.Element
import kotlin.coroutines.CoroutineContext.Key

interface Dispatcher : CoroutineContext.Element {
    fun dispatch(block: () -> Unit)
    override val key: CoroutineContext.Key<*> get() = Dispatcher.Key
    companion object Key : CoroutineContext.Key<Dispatcher>
}

# Re-Implementing a 'MainDispatcher' for the 'main thread'
Now that we have defined the `Dispatcher` interface, its time to re-implement the "Main Dispatcher" which will allow scheduling tasks on the 'main' thread. 
Typically such a main thread is implemented using looping over a blocking queue

In [3]:
import java.util.concurrent.LinkedBlockingQueue
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.CoroutineContext.Element
import kotlin.coroutines.CoroutineContext.Key

interface Dispatcher : CoroutineContext.Element {
    fun dispatch(block: () -> Unit)
    override val key: CoroutineContext.Key<*> get() = Dispatcher.Key
    companion object Key : CoroutineContext.Key<Dispatcher>
}

object MainDispatcher : Dispatcher {
    private val queue = LinkedBlockingQueue<() -> Unit>()

    override fun dispatch(block: () -> Unit) {
        queue.offer(block)
    }

    fun loop(): Nothing {
        while (true) {
            queue.take().invoke()
        }
    }
}

# Using the MainDispatcher to start the coroutine
Let's provide this `MainDispatcher` as the coroutine context for our initial coroutine; Note: To keep it simple, we're not going to implement the `Dispatcher` as proper `CoroutineInterceptor`
In order to let the program stop, we'll simply use a timeout for receiving new work on the `MainDispatcher`.  


In [3]:
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext.Key
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.createCoroutine
import kotlin.coroutines.resume
import kotlin.system.exitProcess
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.CoroutineContext.Element

interface Dispatcher : CoroutineContext.Element {
    fun dispatch(block: () -> Unit)
    override val key: CoroutineContext.Key<*> get() = Dispatcher.Key
    companion object Key : CoroutineContext.Key<Dispatcher>
}

object MainDispatcher : Dispatcher {
    private val queue = LinkedBlockingQueue<() -> Unit>()

    override fun dispatch(block: () -> Unit) {
        queue.offer(block)
    }

    fun loop() {
        while(true) {
            // Stop program after no more work was received for the main queue after one second!
            queue.poll(1, TimeUnit.SECONDS)?.invoke() ?: return
        }
    }
}

fun main() {
    Thread.currentThread().name = "main"
    val coroutine = ::async.createCoroutine(Continuation(MainDispatcher) { result -> result.getOrThrow() })
    MainDispatcher.dispatch { coroutine.resume(Unit) }
    MainDispatcher.loop()
}

suspend fun async() {
    println("Async: ${Thread.currentThread().name}")
}

main()


Async: main


# Implementing a BackgroundDispatcher
Right now, we have not really gained a lot: Yes, we have created a main loop for our main thread, but where is the fun w/o dispatching work on some kind of background threads. 
We can implement a `BackgroundDispatcher` implementation which is backed by a thread pool. For the sake of this video, lets keep it simple and just use a thread pool with a fixed amount of background threats. 
Of course, in the real world one is requried to carefully think about the requirements of such pools and make sure to properly close them if not needed anymore, but this is YouTube.


In [None]:
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext.Key
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.createCoroutine
import kotlin.coroutines.resume
import kotlin.system.exitProcess
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.CoroutineContext.Element

interface Dispatcher : CoroutineContext.Element {
    fun dispatch(block: () -> Unit)
    override val key: CoroutineContext.Key<*> get() = Dispatcher.Key

    companion object Key : CoroutineContext.Key<Dispatcher>
}

object BackgroundDispatcher : Dispatcher {
    private val executors = Executors.newFixedThreadPool(4)
    
    override fun dispatch(block: () -> Unit) {
        executors.execute { block() }
    }
}


# Re-Implementing `withContext`
Yay, this was simple: We have now a `MainDispatcher` and `BackgroundDispatcher` implementation. Let's implement a `withContext` method to switch the 'coroutine context' which is effectively allowing us to switch threads. 
Note, again this implementation would look different if our Dispatchers would implement `ContinuationInterceptor`. Let me know if you're interested in learning more about this case!

This `withContext` method itself should be a suspend fun that accepts additions to the  `Coroutine Context` and another `suspend fun` which shall be executed in this new context. 
Using the `suspendCoroutine` method allows us to *suspend* the currently running coroutine and provides us the `outerContinuation` as handle to resume it later. 

Once the current coroutine is suspended we can create a new context and the new coroutine using, again, the `createCoroutine` function.
Creating the new coroutine requires creating a new continuation which, of course, gets passed the new context. However, the `resumeWith` handle is important as 
it requires us to connect the new coroutine with the currently suspended one by resuming it in its desired `Dispatcher`

Once the new coroutine was created we can execute it by resuming it on the Dispatcher found in the new context.


In [None]:
import kotlin.coroutines.suspendCoroutine

suspend fun <T> withContext(context: CoroutineContext, action: suspend () -> T): T {
    return suspendCoroutine { outerContinuation ->
        val newContext = outerContinuation.context + context

        val newCoroutine = action.createCoroutine(Continuation(newContext) { result ->
            val dispatcher = outerContinuation.context[Dispatcher] ?: error("No dispatcher found")
            dispatcher.dispatch {
                outerContinuation.resumeWith(result)
            }
        })

        val dispatcher = newContext[Dispatcher] ?: error("No dispatcher found")
        dispatcher.dispatch {
            newCoroutine.resume(Unit)
        }
    }
}

# Bringing it all together


In [7]:
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.coroutines.*
import kotlin.coroutines.CoroutineContext.Key
import kotlin.system.exitProcess
import kotlin.coroutines.CoroutineContext.Element

interface Dispatcher : CoroutineContext.Element {
    fun dispatch(block: () -> Unit)
    override val key: CoroutineContext.Key<*> get() = Dispatcher.Key

    companion object Key : CoroutineContext.Key<Dispatcher>
}

object MainDispatcher : Dispatcher {
    private val queue = LinkedBlockingQueue<() -> Unit>()

    override fun dispatch(block: () -> Unit) {
        queue.offer(block)
    }

    fun loop() {
        while (true) {
            // Stop program after no more work was received for the main queue after one second!
            queue.poll(1, TimeUnit.SECONDS)?.invoke() ?: return
        }
    }
}

object BackgroundDispatcher : Dispatcher {
    private val executors = Executors.newFixedThreadPool(4)

    override fun dispatch(block: () -> Unit) {
        executors.execute { block() }
    }
}

suspend fun <T> withContext(context: CoroutineContext, action: suspend () -> T): T {
    return suspendCoroutine { outerContinuation ->
        val newContext = outerContinuation.context + context

        val newCoroutine = action.createCoroutine(Continuation(newContext) { result ->
            val dispatcher = outerContinuation.context[Dispatcher] ?: error("No dispatcher found")
            dispatcher.dispatch {
                outerContinuation.resumeWith(result)
            }
        })

        val dispatcher = newContext[Dispatcher] ?: error("No dispatcher found")
        dispatcher.dispatch {
            newCoroutine.resume(Unit)
        }
    }
}

fun main() {
    Thread.currentThread().name = "main"
    val coroutine = ::async.createCoroutine(Continuation(MainDispatcher) { result -> result.getOrThrow() })
    MainDispatcher.dispatch { coroutine.resume(Unit) }
    MainDispatcher.loop()
}

suspend fun async() {
    println("Async: ${Thread.currentThread().name}")
    withContext(BackgroundDispatcher) {
        println("Changed to ${Thread.currentThread().name}")
    }

    println("Returned in ${Thread.currentThread().name}")
}

main()

Async: main
Changed to pool-3-thread-1
Returned in main
