Skip to content

Commit

Permalink
Merge branch 'next' into develop
Browse files Browse the repository at this point in the history
* next: (30 commits)
  add extra notes
  typo
  add missing value
  add more types in the example
  update changelog
  add documentation
  flag Task methods as private
  remove dead code
  run http requests asynchronously
  remove wip tag
  simplify code
  fix code
  fix reading files asynchronously
  add proofs
  remove phpunit tests
  simplify suspend actions namespace
  remove old throttle source
  simplify the task interface
  dont keep the terminated task in memory as it has no use
  keep track of the types returned by tasks
  ...
  • Loading branch information
Baptouuuu committed Nov 5, 2023
2 parents 067e4a9 + 165b06d commit 0fe6983
Show file tree
Hide file tree
Showing 36 changed files with 2,038 additions and 605 deletions.
24 changes: 11 additions & 13 deletions .github/workflows/ci.yml
Expand Up @@ -3,17 +3,17 @@ name: CI
on: [push, pull_request]

jobs:
phpunit:
blackbox:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macOS-latest]
php-version: ['8.2', '8.3']
dependencies: ['lowest', 'highest']
name: 'PHPUnit'
dependency-versions: ['lowest', 'highest']
name: 'BlackBox'
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
Expand All @@ -24,21 +24,19 @@ jobs:
uses: "ramsey/composer-install@v2"
with:
dependency-versions: ${{ matrix.dependencies }}
- name: PHPUnit
run: vendor/bin/phpunit
env:
BLACKBOX_DETAILED_PROPERTIES: 1
- name: BlackBox
run: php blackbox.php
coverage:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macOS-latest]
php-version: ['8.2', '8.3']
dependencies: ['lowest', 'highest']
dependency-versions: ['lowest', 'highest']
name: 'Coverage'
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
Expand All @@ -49,10 +47,10 @@ jobs:
uses: "ramsey/composer-install@v2"
with:
dependency-versions: ${{ matrix.dependencies }}
- name: PHPUnit
run: vendor/bin/phpunit --coverage-clover=coverage.clover
- name: BlackBox
run: php blackbox.php
env:
BLACKBOX_SET_SIZE: 1
ENABLE_COVERAGE: 'true'
- uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
Expand Down
2 changes: 0 additions & 2 deletions .gitignore
@@ -1,4 +1,2 @@
/composer.lock
/vendor
/.phpunit.result.cache
/.phpunit.cache
2 changes: 1 addition & 1 deletion .php-cs-fixer.dist.php
@@ -1,6 +1,6 @@
<?php

return Innmind\CodingStandard\CodingStandard::config([
'tests',
'proofs',
'src',
]);
23 changes: 23 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,28 @@
# Changelog

## [Unreleased]

### Added

- `Innmind\Mantle\Source\Continuation`

### Changed

- Requires `innmind/operating-system:~4.1`
- `Innmind\Mantle\Forerunner::of()` now only accepts an instance of `Innmind\OperatingSystem\OperatingSystem`
- `Innmind\Mantle\Forerunner::__invoke()` second argument now is a `callable` (previously it was a `Source`)
- `Innmind\Mantle\Suspend` is now declared internal
- `Innmind\Mantle\Source` is now declared internal
- `Innmind\Mantle\Source\Predetermined::of()` `callable`s now receive an instance of `Innmind\OperatingSystem\OperatingSystem`

### Removed

- `Innmind\Mantle\Source\Throttle`
- `Innmind\Mantle\Suspend\Strategy`
- `Innmind\Mantle\Suspend\Asynchronous`
- `Innmind\Mantle\Suspend\Synchronous`
- `Innmind\Mantle\Suspend\TimeFrame`

## 1.1.0 - 2023-09-16

### Added
Expand Down
137 changes: 117 additions & 20 deletions README.md
Expand Up @@ -4,43 +4,140 @@
[![codecov](https://codecov.io/gh/innmind/mantle/branch/develop/graph/badge.svg)](https://codecov.io/gh/innmind/mantle)
[![Type Coverage](https://shepherd.dev/github/innmind/mantle/coverage.svg)](https://shepherd.dev/github/innmind/mantle)

Minimalist abstraction on top of `Fiber`s to coordinate multiple tasks asynchronously.
Abstraction on top of `Fiber`s to coordinate multiple tasks asynchronously.

This package is intended for other packages to be built upon, end developers should not directly face this abstraction.
The goal is to easily move the execution of any code built using [`innmind/operating-system`](https://packagist.org/packages/innmind/operating-system) from a synchronous context to an async one. This means that it's easier to experiment running a piece of code asynchronously and then move back if the experiment is not successful. This also means that you can test each part of an asynchronous system synchronously.

## Installation

```sh
composer require innmind/mantle
```

## Concepts
## Usage

```php
use Innmind\Mantle\{
Forerunner,
Task,
Source\Continuation,
};
use Innmind\OperatingSystem\{
Factory,
OperatingSystem,
};
use Innmind\Filesystem\Name;
use Innmind\HttpTransport\Success;
use Innmind\Http\{
Request,
Method,
ProtocolVersion,
};
use Innmind\Url\{
Url,
Path,
};
use Innmind\Immutable\Sequence;

$run = Forerunner::of(Factory::build());
[$users] = $run(
[0, 0, false],
static function(array $carry, OperatingSystem $os, Continuation $continuation, Sequence $results): Continuation {
[$users, $finished, $launched] = $carry;

if (!$launched) {
return $continuation
->carryWith([$users, $finished, true])
->launch(Sequence::of(
Task::of(
static fn(OperatingSystem $os): int => $os
->remote()
->http()(Request::of(
Url::of('http://some-service.tld/users/count'),
Method::get,
ProtocolVersion::v11,
))
->map(static fn(Success $success): string => $success->response()->body()->toString())
->match(
static fn(string $response): int => (int) $response,
static fn() => throw new \RuntimeException('Failed to count the users'),
),
),
Task::of(
static fn(OperatingSystem $os): int => $os
->filesystem()
->mount(Path::of('some/directory/'))
->get(Name::of('users.csv'))
->map(static fn($file) => $file->content()->lines())
->match(
static fn(Sequence $lines) => $lines->reduce(
0,
static fn(int $total): int => $total + 1,
),
static fn() => throw new \RuntimeException('Users file not found'),
),
),
));
}

$finished += $results->size();
$users = $results->reduce(
$users,
static fn(int $total, int $result): int => $total + $result,
);
$continuation = $continuation->carryWith([$users, $finished, $launched]);

if ($finished === 2) {
$continuation = $continuation->terminate();
}

return $continuation;
},
);
```

This example counts a number of `$users` coming from 2 sources.

The `Forerunner` object behaves as a _reduce_ operation, that's why it has 2 arguments: a carried value and a reducer (called a source in this package).

The carried value here is an array that holds the number of fetched users, the number of finished tasks and whether it already launched the tasks or not.

The source will launch 2 tasks if not already done; the first one does an HTTP call and the second one counts the number of lines in a file. The source will be called again once a task finishes and their results will be available inside the fourth argument `$results`, it will add the number of finished tasks and the number of users to the carried value array. If both tasks are finished then the source calls `$continuation->terminate()` to instruct the loop to stop.

When the source calls `->terminate()` and that all tasks are finished then `$run()` returns the carried value. Here it will assign the aggregation of both tasks results to the value `$users`.

> **Note**
> As long as you use the `$os` abstraction passed as arguments the system will automatically suspend your code when necessary. This means that you don't even need to think about it.
> **Note**
> The source `callable` is also run asynchronously. This means that you can use it to build a socket server and wait indefinitely for new connections without impacting the execution of already started tasks.
> **Warning**
> Do NOT return the `$os` variable outside of the tasks or the source as it may break your code.
> **Note**
> Since this package has been designed by only passing arguments (no global state) it means that you can compose the use of `Forerunner`, this means that you can run a new instance of `Forerunner` inside a task and it will behave transparently. (Although this feature as not been tested yet!)
### Source
## Limitations

A `Source` let _emerge_ `Task`s that needs to be run asynchronously. For example a web server can be a `Source` that will emerge a `Task` when a new connection is received.
### Signals

This packages comes with the following sources:
- `Predetermined`: accepts a list of `callable`s that can be suspended
- `Throttle`: limits the number of `Task`s that can be run
Signals like `SIGINT`, `SIGTERM`, etc... that are normally handled via `$os->process()->signals()` is not yet supported. This may result in unwanted behaviours.

A `Source` has a notion of `active` to instruct the `Forerunner` if there will be other tasks in the future or not.
### HTTP calls

### Task
Currently HTTP calls are done via `curl` but it can't be integrated in the same loop as other streams. To allow the coordination of multiple tasks when doing HTTP calls the system use a timeout of `10ms` and switches between tasks at this max rate.

A `Task` takes a `callable` that will be run asynchronously. For the `callable` to effectively run asynchronously it must yield control of the process via the `Suspend` object passed as an argument to it.
To fix this limitation a new implementation entirely based on PHP streams needs to be created.

### Suspend
Meanwhile if your goal is to make multiple concurrent HTTP calls you don't need this package. [`innmind/http-transport`](https://packagist.org/packages/innmind/http-transport) already support concurrent calls on it's own (without the limitation mentionned above).

`Suspend` is an object on top of [`Fiber`s](https://www.php.net/manual/en/language.fibers.php) that is only accessible to a `Task` created by a `Source`. When called the `Task` will yield control to allow other `Task`s to run.
### SQL queries

The `Suspend` behaviour can be changed with different strategies:
- `Asynchornous` will yield as soon as `Suspend` is called (default)
- `Synchronous` will yield only when the `Task` is finished, meaning it reached the end of the `callable`
- `TimeFrame` will yield control when it exceeds the time frame allowed for a `Task` to run
SQL queries executed via `$os->remote()->sql()` are still executed synchronously.

### Forerunner
To fix this limitation a new implementation entirely based on PHP streams needs to be created.

This is the main object that will coordinate all the objects above. It operates as a _reduce like_ operation with a _carried value_ and a `Source` acting as the list `Task`s to reduce. The carried value is accessible everytime the `Source` is called to provide `Task`s.
### Number of tasks

When the `Source` is no longer active and there is no more `Task`s to run the object will return.
It seems that the current implementation of this package has a [limit of around 10K concurrent tasks](https://twitter.com/baptouuuu/status/1720092619496378741) before it starts slowing down drastically.
27 changes: 27 additions & 0 deletions blackbox.php
@@ -0,0 +1,27 @@
<?php
declare(strict_types = 1);

require 'vendor/autoload.php';

use Innmind\BlackBox\{
Application,
Runner\Load,
Runner\CodeCoverage,
};

Application::new($argv)
->when(
\getenv('ENABLE_COVERAGE') !== false,
static fn(Application $app) => $app
->codeCoverage(
CodeCoverage::of(
__DIR__.'/src/',
__DIR__.'/proofs/',
)
->dumpTo('coverage.clover')
->enableWhen(true),
)
->scenariiPerProof(1),
)
->tryToProve(Load::everythingIn(__DIR__.'/proofs/'))
->exit();
6 changes: 3 additions & 3 deletions composer.json
Expand Up @@ -16,8 +16,9 @@
},
"require": {
"php": "~8.2",
"innmind/immutable": "~4.9|~5.0",
"innmind/time-continuum": "^3.1"
"innmind/immutable": "~5.2",
"innmind/operating-system": "~4.1",
"innmind/filesystem": "~7.3"
},
"autoload": {
"psr-4": {
Expand All @@ -30,7 +31,6 @@
}
},
"require-dev": {
"phpunit/phpunit": "~10.2",
"vimeo/psalm": "~5.12",
"innmind/black-box": "~5.5",
"innmind/coding-standard": "~2.0"
Expand Down
25 changes: 0 additions & 25 deletions phpunit.xml.dist

This file was deleted.

0 comments on commit 0fe6983

Please sign in to comment.