Skip to content

revelatio/rvl-pipe

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

37 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rvl-pipe

Build Status Coverage Status Known Vulnerabilities

Helper functions for easier async pipelining/composing of promises

Promises are a simple (and cool) way of doing async programming in JavaScript. A promise encapsulates a future result, async in nature, that has only 3 possible states: Pending, Resolved, Rejected. Once the promise is resolved we can query its result, if it was rejected then catch and process the error accordingly.

We can run as many promises in parallel or chain them as we want. Like a pipeline.

This library contains helper functions to do simple pipelining of promises reusing a context object that will be pass down and can be queried or modified.

Version 2.0.0 the one with Typings (in TypeScript)

This version is the first completely ported to TypeScript so the build process exports the type definitions.

We have 3 main function definitions

export type Context = { [key: string]: any }

export type AsyncFunction = (ctx?: Context) => Promise<Context>

export type SyncFunction = (ctx: Context) => any

export type SyncPredicate = (ctx: Context) => boolean

Version 1.4.0 New functions

Version 1.4.0 introduces new functions: assign, composer and loop

  • assign: For assign to a prop based on the result of a async function
  • composer: Performs a merge on the results of a set of async functions
  • loop: Performs a async loop based on a predicate and a body

How it works?

rvl-pipe provides a set of functions to help you create, interact and query the context object using this approach. We have 2 types of functions: composability and querying, and also some error handling helpers.

You can require/import the helpers you need.

const { should, each, iff, prop, props } = require('rvl-pipe')

// as ES6 Modules
import { should, each, iff, prop, props } from 'rvl-pipe'

Composition functions

Composition functions can be always described as:

const createStep = (params) => AsyncFunction

So, basically functions that return a function that only takes the context return a Promise with the context after some async transformation. Or a rejection.

This steps can then be reused in a async pipeline, where the context object gets passed down.

We already provide some helper functions for several common cases like parallel execution, conditionals, noops, etc.

  • each: Composition function that runs other composition functions in sequence, passing the context to all of them and returning the resulting context.
const runAll = each(
    asyncTask1(...),
    asyncTask2(...)
)

return runAll({})   // {} is the starting context
    .then(context => {
        // context will have the resulting context after asyncTask2
    })

each is very handy to make reusable composition of common steps.

const myAsyncStep = each(
    doSomeAsync1(...),
    doSomeASync2(...)
)

return each(
    myAsyncStep(...),
    otherASyncStep(),
    ...
    yetAnotherAsyncStep(),
    myAsyncStep()
)()
  • all: Same as each but running all task in parallel and merging the resulting contexts.
return all(
    parallelAsyncTask1(...),
    parallelAsyncTask2(...)
)({})  // Starting context
  • iff: Performs a conditional step passing a condition (or predicate) and a async function. Also accepts an else async function.
return iff(
    prop('account'),
    asyncTask(...)
)({})

// Else is possible too
return iff(
    prop('account'),
    asyncTask(...),
    elseAsyncTask(...)
)()

// Negation (using Ramda's complement)

const { complement } = require('ramda')

return iff(
    complement(prop('account')),
    asyncTask(...)
)()
  • should: Performs a validation check for a property, it fails with a ContextError if the predicate is not satisfied.
return each(
    should(prop('name')),  // passes
    should(prop('last')),  // throws ContextError
)({name: 'John'})

// prop is a query function, check down for documentation

// You can also define your custom error names
return each(
    should(prop('name')),  // passes
    should(prop('last'), 'InvalidLastName'),  // throws ContextError(message='InvalidLastName', context=context)
)({name: 'John'})
  • noop: This is a no brainer, does nothing, just returns the context.
return noop()({})
  • set: Will add/merge data into the context. The parameters to this function can be a static object where a simple merge will be performed or a query function where the value depends on the context being passed.
return each(
    set(always({ name: 'John' })), // statically
    set(context => ({ last: 'Doe' })),  // dinamically
    set(context => ({ initial: context.name[0] }))
)({ name: 'Mary' })

// returns { name: 'John', last: 'Doe', initial: 'J' }
  • assign: Will assign a prop based on the result of an async function
return each(
    assign('response', fetch('http://api.server.com/status'))
)()

// response will have the response of the async request
  • loop: Very simple loop execution of async function based on an async predicate.
return each(
    loop(
        ctx => ctx.index < 10,
        each(
            doSomethingElse(),
            ctx => {
                ctx.index += 1
                return ctx
            }
        )
    )
)

Querying functions

Querying functions can be used to pull data from the context and also allow to perform logical operations on them.

  • equals: returns the triple equality of two query functions
return each(
    iff(
        equals(prop('a'), always(3)),   // checking a prop with a static value
        doAsyncTask(...)
    ),
    iff(
        equals(prop('a'), prop('b')),   // checking 2 props dynamically
        doAsyncTask(...)
    )
)({ a: 3, b: 3 })
  • always: is a helper function that returns the same value of the first parameter passed
const name = always('John')
const b = name()   // John
  • every: Evaluates true if all values or predicates are true
return iff(
    every(prop('a'), prop('b'), always(true), always(10)),  // a and b must evaluate truthy for doAsyncTask to run
    doASyncTask(...)
)()
  • some: Same as every but we only need one to be true
return iff(
    some(prop('a'), prop('b')), // a or b should be truthy for doAsyncTask to run
    doAsyncTask(...)
)()
  • prop: returns the query function for the value of a prop. It can be nested via dots. This is only a property lookup in a object.
const getUserName = prop('user.name')

const name = getUserName({ user: { name: 'John' }})  // name === John

return iff(
    getUserName,
    doAsyncTask(...)
)()
  • props: Helper to construct objects where props can be static or dynamically evaluated
const createAccountDocument = props({
    user: {
        name: prop('auth.username'),
        token: prop('auth.token'),
        newUser: true,
        roles: ['admin', prop('auth.forRole')],
        team: {
            name: prop('auth.team')  // nested props too :)
        }
    }
})

return each(
    doAsyncAuth(),        // Asumming this adds prop 'auth' to context
    set(createAccountDocument),
    saveToDB()
)()
  • composer: Composer merges the result of a set of async functions, very useful to craft objects where some properties are added based on predicates.
const createObject = composer(
    always({ name: 'John' }),
    iff(
        equals(prop('user'), always(1)),
        always({ last: 'Doe' }),
        always({})
    ),
    iff(
        equals(prop('user'), always(2)),
        always({ last: 'Perez' }),
        always({})
    )
)

const a = createObject({ user: 1 })    // { name: 'John', last: 'Doe' }
const b = createObject({ user: 2 })    // { name: 'John', last: 'Perez' }
const c = createObject({ user: 3 })    // { name: 'John' }
  • createTracer: Sometimes we need to trace some properties on the context. This function creates a no-op operation that performs that side-effect for us in a plugable way. The provided tracer should receive two params path and value
const logger = createTracer((path, value) => {
    // perform a log of the path and the value on our logging service
})

return each(
    doAsyncAuth(),        // Asumming this adds prop 'auth' to context
    set(createAccountDocument),
    logger('user'),
    saveToDB()
)()
  • consoleTracer is a pre-made simple console.log tracer ready to use
return each(
    doAsyncAuth(),        // Asumming this adds prop 'auth' to context
    set(createAccountDocument),
    consoleTracer('user'),
    saveToDB()
)()

Error handling

If you need to send and error in the pipeline you must return an exception. Depending in the context your async function is used the error message alone will be wrapped in a ContextError. This will help error handling to recover and close the necessary resources.

const myAsyncTask = () => context => {
    // doing some async stuff

    // oh no we found a error, lets throw
    return Resolve.reject(new Error('MyAsyncTaskError'))
}

Passing the context in the error helps cleaning steps in the pipeline.

return each(
    connectToDB(),
    myAsyncTask(...),  // This returns an error
    closeDB()          // This never gets executed
)()

We might want to capture the error and recover from it to close allocated resources. For that we use the capture async helper.

return each(
    connectToDB(),
    capture(
        myAsyncTask(...),   // This returns an error
        noop()              // This will be executed only if there is an in the previous call error
    ),
    closeDB()               // This never will be executed
)()

Another way to achieve the same goal is to use the ensure function.

return ensure(
    each(
        connectToDB(),
        myAsyncTask(...),   // This returns an error
    ),
    closeDB()               // This never will be always executed
)()

We can also define different error handlers depending on error type (message)

return each(
    connectToDB(),
    capture(
        myAsyncTask(...),   // This returns an error
        {
            'AsyncError': noop()              // This will be executed only if there is an in the previous call error,
            'VeryRareAsyncError': logItRemotely()   // Will be executing if error.message === 'VeryRareAsyncError'
        }
    ),
    closeDB()               // This never will be executed
)()

Notice that if we don't provide a handler for some error type the whole async function will fail with a promise rejection containing the error.

So, how to create async pipeline functions

If you are building a async pipeline function from scratch you function signature should look like this:

// Arrow function
const myAsyncFunction = (params) => context => {
    // Async stuff depending on params that prob mutates context
    if (someErrorCondition) {
        return Promise.reject(new Error('MyCustomError'))
    }

    return Promise.resolve(context);
}

// Function notation
function myAsyncFunction (params) {
    return function(context) {
        // Async stuff depending on params that prob mutates context
        if (someErrorCondition) {
            return Promise.reject(new Error('MyCustomError'))
        }

        return Promise.resolve(context);
    }
}


// Usage
return each(
    myAsyncFunction(...),
    otherAsyncFunction(...)
)({ starting: 'context' })

This way you can write your own set for mongodb, redis, request, rabbitmq, etc.

Async / Await

Async/Await in JavaScript is based on Promises. Since all async pipeline functions return a Promise of a Context you can also do:

const performTask = each(
    connectToDB(),
    capture(
        myAsyncTask(...),   // This returns an error
        each(
            logItRemotely(),
            always({ error: 'failed' })
        )
    ),
    closeDB(),
    props({ result: 'ok' })
)

const result = await performTask()
console.log(result)

About

Functions for easier async pipelining/composing of promises

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published