Skip to content

loom talks qcon london 2019

landon edited this page Apr 21, 2020 · 2 revisions

翻译笔记-Why Continuations are Coming to Java

  • talks说明

    • 讨论并比较了处理和并发的各种技术如纯函数式(monads, affine types)和命令式编程(threads, continuations, monads, async/await)
    • 说明了为什么continuations是非常适合命令式风格
  • talks笔记

    • Why Java is Getting Continuations

    • 关于计算的观点

      • 确定性的、顺序的
      • 不确定性的、交互的/并发的
    • 纯函数式编程

      • 确定性、顺序的

      • Linear Types: nondeterminism as a function of unknown values

        • 线性类型-未知函数的不确定性
      • IO Type: Move nondeterminism outside the program

        • io类型-将不确定性转移到程序之外
      • landon: 这里主要是函数式编程的一些知识,如haskell、monad等

    • 经典的命令式编程

      • 不确定性的、并发的/交互的
    • 反应式编程的问题

      // 一个阻塞方法
      double calcImportantFinance(double x) {
          try { 
              double result; 
              result = compute("USD->euro", x); 
              result = compute("subtractTax", result * 1.3); 
              result = compute("addInterest", result); 
              return result; 
          } catch(Exception ex) { 
              log(ex);  
              throw ex;
          }
      }
      
      double compute(String op, double x);
      
      // 用CompletableFuture异步实现
      CompletableFuture<Double> calcImportantFinance (double x) {
          return
              compute("USD->euro", 100.0)
              .thenCompose(result -> compute("subtractTax", result * 1.3))
              .thenCompose(result -> compute("addInterest", result))
              .handle((result,ex) -> {
                  if(ex != null) {
                      log(ex); 
                      throw new RuntimeException(ex);
                  }else{
                      return result
                  }
              });
      }
      
      • Lost Control Flow
      double result;
      result = compute("USD->euro", x);
      if(result > 1000) {
           result = compute("subtractTax", result * 1.3); 
      }
      while(!sufficient(result)) {
          result = compute("addInterest", result);
      }
      return result;
      
      • CompletableFuture和控制流在某些方面不匹配

      • Lost Context

        • 上面每一步都是在不同的线程执行
        • 当出现异常时,上下文丢失,则后面调试代码非常困难。另外性能分析也会变的困难,因为性能分析是通过对堆栈跟踪采样来进行的。
      • Viral 病毒式的

        • 最大的问题是返回类型,返回值是CompletableFuture
        • 除非调用阻塞的get方法,否则无法获取值
        • 另外调用链上也必须异步方式使用这些CompletableFuture,这是病毒式的
        • 这意味着我们的这个调用堆栈要么是阻塞同步,要么是异步,两种几乎不能互操作
    • Async/Await

      async Task<double> CalImportantFinace()
      {
          try
          {
              double result; 
              result = await compute("USD->euro", x); 
              result = await compute("subtractTax", result * 1.3); 
              result = await compute("addInterest", result); 
              return result; 
          }
          catch(Exception ex)
          {
              log(ex);
              throw ex;
          }
      }
      
      • C#引入一个叫async and await的东西,功能类似于Java的CompletableFuture,但是可以用await关键字作为异步方法的前缀,看着是比较漂亮的命令式代码
      • 解决了提到的第一个lost control flow的问题。但是对于第二个问题,因为上面的每一行仍然在不同的线程上执行,如果出现异常的话会是一个比较奇怪的异常堆栈
      • .Net Core 2.1尝试修复此问题,它们人为地生成了一个可以捕获实际上下文的堆栈跟踪,但是与您编写普通的阻塞代码时所得到的堆栈跟踪相比,它仍然是非常不同的堆栈跟踪
      • 所以最大的问题还是隔离了同步阻塞世界和异步世界
    • Why give up a good (core!) abstraction just because of an inadequate implementation?

      • 命令式语言不仅处理输入和非确定性语言,还具有线程,进程和阻塞的内置概念,从编程角度来看,它是一个很好的抽象。我们有时想要避免的唯一原因是内核线程的实现负担太重
      • 显而易见的解决方案是更改实施,而不是更改我们的编程方式。这就是我们作为Project Loom的一部分正在尝试做的事情
      • First, we need the ability to stop the code that's running on the CPU and say, "I'm not using the CPU anymore," and later on have the ability to resume it.
        • Continuation,yield and resume
      • we need some mechanism to schedule those pieces that want to run the CPU
        • Scheduler
      • thread/process = yield control and resume + execution scheduling
      • thread/process = continuation + scheduler
    • Continuations

      • 当您调用run时,continuation的主体将运行到完成或直到下一次调用yield为止
      • 作用域使我们可以在另一个内部嵌套不同的continuation,并能够挂起多个continuation并跳回多个调用方,这与我们在堆栈中抛出异常的方式类似
      • 限定的意思是我们要暂停和恢复的代码部分不是整个程序,而只是body内的代码
      • 每次运行它都会一直运行到下一个挂起点,然后状态改变,然后当您再次运行它时,它将从第一个挂起点运行到第二个挂起点,我们永远都无法回到过去
      • 克隆和序列化
    • Fiber

      • 今天,编写应用程序的人可以选择编写简单的同步阻塞代码,但是他们依靠内核为他们提供线程,并且内核只能处理这么多线程。该应用程序将非常清晰,易于维护和调试,但将不可扩展。另一种选择是编写异步代码,正如我向您展示的那样,这要复杂得多,并且很难将现有代码放入其中,但至少它是可伸缩的

      • 使用fiber,如果我们确实可以使它们比内核提供的线程轻得多,则可以解决此问题。您可以根据需要拥有任意数量的这些用户模式轻量级线程,并且可以进行阻塞,阻塞实际上是免费的

      • fiber = continuation + scheduler

      • codes like sync,works like async

        • concurrency made simple
      • 这使编写并发应用程序变得更加容易,因为它可以帮助您匹配业务的并发单元。这是重量级内核线程无法实现的,因为您可能有100,000个并发用户,但是当前线程却无法达到100,000

      • io,java.util.concurrent

        • 目前已经是fiber-blocking了
      • 如果你喜欢Async/Await,那么下面这个就是整个实现

        class Async<T> extends CompletableFuture<T> { 
            public static <T> T await(Async<T> async) throwsInterruptedException, ExecutionException { 
                return async.get(); 
            } 
        }
        
        • 即可以直接在fiber中调用这个await
      • JDK中旧的重量级线程的数据。 它们每个都有大约2 KB的元数据,默认情况下还有1 MB的堆栈。 当涉及到fiber时,它们目前在原型中,它们目前只有2到300字节的元数据,而且可增长和收缩

      • 对于内核管理的重量级线程的任务切换成本,切换任务的时间在1到10微秒之间。 对于fiber,我们不知道它到底有多少,而性能仍然是我们仍在努力的方面,但我们希望它会比现在低得多

    • Rethink threads

      • 结构化并发

        • 线程被限制在一个已知的生命周期内,该生命周期可以扩展到给定的代码块
        • 我们有一个定义fiber作用域的块,保证在退出作用域时终止在作用域内创建的所有fiber
        • fiber作用域可取消,也可以给它一个截止日期
      • 结构化并发

        try(var scope = FiberScope.cancellable()) {
            Fiber<?> fiber = scope.schedule(task)
        }
        
        • 作用域内调度所有fiber都终止之前,无法退出作用域
        • fiber作用域可以嵌套
        • FiberScope具有一个终止队列,用于收集作用域内fiber的结果
        • 取消一个在可取消作用域的fiber会取消所有已经调度的fiber。在这个作用域内,通过嵌套,可以取消一颗fiber树
        • 取消一个park在阻塞io操作的fiber会引起unpark和检查是否取消
      • 结构化并发

        • 返回第一个完整任务的结果,取消并等待所有未完成的fiber终止后再返回

        • fiber作用域为我们提供了终止队列

          <V> V anyOf(Callable<? extends V>[] tasks) throws Throwable { 
              try (var scope = FiberScope.cancellable()) { 
                  var queue = new FiberScope.TerminationQueue<V>(); 
                  Arrays.stream(tasks).forEach(task -> scope.schedule(task, queue)); 
                  
              try { 
                  return queue.take().join(); 
               } catch (CompletionException e) { 
                  throw e.getCause(); 
               } finally { 
                  scope.fibers().forEach(Fiber::cancel); // cancel remaining fibers
               } 
             } 
          }
          
        • 结构化并发

          • 同样的有截止日期,如果过期则已调度的在作用域的fiber都将被取消

            <V> V anyOf(Callable<? extends V>[] tasks, Instant deadline) throws Throwable { 
                try (var scope = FiberScope.withDeadline(deadline)) { 
                    var queue = new FiberScope.TerminationQueue<V>(); 
                    Arrays.stream(tasks).forEach(task -> scope.schedule(task, queue)); 
                    
                try { 
                    return queue.take().join(); 
                 } catch (CompletionException e) { 
                    throw e.getCause(); 
                 } finally { 
                    scope.fibers().forEach(Fiber::cancel); // cancel remaining fibers
                 } 
               } 
            }
            
    • Generators

      • 我们希望人们使用fiber,而不是直接使用continuation
      • 但有趣的是,continuation的一个用途是类似python的生成器(run,yield,run...)
      class Generator<T> implements Iterable<T> { 
          private static final ContinuationScope GENERATOR = new ContinuationScope(); 
          private static class GenContinuation<T> extends Continuation { 
              public GenContinuation() { super(GENERATOR); } 
              private T value; 
           } 
           
          public static void yield(T value) { 
               ((GenContinuation)currentContinuation(GENERATOR)).val = value; 
              Continuation.yield(GENERATOR); 
          } 
      
          private final GenContinuation<T> cont; 
          public Generator(Runnable body) { 
          cont = new GenContinuation<T>(body);
          } 
          
          public Iterator<T> iterator() { 
              return new Iterator<T>() { 
              public T next() { cont.run(); return cont.val; } 
              public boolean hasNext() { return !cont.isDone(); } 
              } 
         } 
      }
      
      var fibonacci = new Generator<Integer>(() -> { 
          Generator.yield(0); 
          int a = 0; 
          int b = 1; 
          while(true) { 
              Generator.yield(b); 
              var sum = a + b; 
              a = b; 
              b = sum; 
          } 
      });
      
      // 调用迭代器,next和hasNext方法通过continuation的run和yield驱动
      for (var num : fibonacci) { 
          System.out.println(num); 
          if (num > 10_000) break; 
      }
      
      // 另外一个例子
      var greetedPrimes = new Generator<String>(() -> { 
          for (int n = 0; ; n++) 
          if (isPrime(n)) { 
              // 阻塞
              var greeting = Console.readLine(); 
              Generator.yield(greeting + ": " + n); 
          } 
      }); 
      
      Fiber.schedule(()-> { 
          for (var x : greetedPrimes) 
          System.out.println(x); 
      }
      
    • right now mostly interested input on the structure currency API

      • 现在最感兴趣的是结构化并发api的输入
Clone this wiki locally