# Compiler-Target Switch

Configure the target:

In [1]:
import os

os.environ['POLIASTRO_TARGET'] = 'cuda'  # cpu, parallel, cuda
os.environ['POLIASTRO_INLINE'] = 'no'  # yes, no

Variation of code from my current `poliastro` dev branch:

In [2]:
import os
import sys
import warnings

import numba as nb


class JitWarning(UserWarning):
    pass


TARGET = os.environ.get('POLIASTRO_TARGET', 'cpu')

if TARGET not in ('cpu', 'parallel', 'cuda'):  # numba 0.54.0, 19 August 2021, removed AMD ROCm target
    raise ValueError(f'unknown target "{TARGET:s}"')
if TARGET == 'parallel' and sys.maxsize <= 2**31:  # paying respect to poliastro#1399
    raise ValueError('target "parallel" not supported on 32bit systems')
if TARGET == 'cuda':
    from numba import cuda  # explicit import required and only performed if target is switched to cuda

INLINE = os.environ.get('POLIASTRO_INLINE', 'no')
if INLINE not in ('yes', 'no'):
    raise ValueError(f'unknown value for inline "{INLINE:s}"')
INLINE = INLINE == 'yes'

CACHE = os.environ.get('POLIASTRO_CACHE', 'no')
if CACHE not in ('yes', 'no'):
    raise ValueError(f'unknown value for cache "{TARGET:s}"')
CACHE = CACHE == 'yes'
if TARGET == 'cuda' and CACHE:
    warnings.warn(
        'caching is not supported for target "cuda"',
        JitWarning,
        stacklevel=2,
    )

PRECISIONS = ('f4', 'f8')  # TODO allow f2, i.e. half, for CUDA at least?

NOPYTHON = True  # only for debugging, True by default

_VECTOR = 'Tuple([f,f,f])'  # TODO hope for support of "f[:]" return values in cuda target
_MATRIX = f'Tuple([{_VECTOR:s},{_VECTOR:s},{_VECTOR:s}])'  # TODO see above


def _parse_signatures(signature):
    """
    Automatically generate signatures for floats, vectors and matrices
    """
    if '->' in signature:  # this is likely a layout for guvectorize
        return signature
    if not any(notation in signature for notation in ('f', 'V', 'M')):  # leave this signature as it is
        return signature
    if any(level in signature for level in PRECISIONS):  # leave this signature as it is
        return signature
    signature = signature.replace('V', _VECTOR)
    signature = signature.replace('M', _MATRIX)
    signature = [signature.replace('f', dtype) for dtype in PRECISIONS]
    print(signature)
    return signature


def hjit(*args, **kwargs):
    """
    Scalar helper, pre-configured, internal, switches compiler targets.
    Functions decorated by it can only be called directly if TARGET is cpu or parallel.
    """

    if len(args) == 1 and callable(args[0]):
        func = args[0]
        args = tuple()
    else:
        func = None

    if len(args) > 0 and isinstance(args[0], str):
        args = _parse_signatures(args[0]), *args[1:]

    cfg = {}
    if TARGET in ('cpu', 'parallel'):
        cfg.update({'nopython': NOPYTHON, 'inline': 'always' if INLINE else 'never', 'cache': CACHE})
    if TARGET == 'cuda':
        cfg.update({'device': True, 'inline': INLINE})
    cfg.update(kwargs)

    wjit = cuda.jit if TARGET == 'cuda' else nb.jit

    def wrapper(func):
        return wjit(
            *args,
            **cfg,
        )(func)

    if func is not None:
        return wrapper(func)

    return wrapper


def vjit(*args, **kwargs):
    """
    Vectorize on array, pre-configured, user-facing, switches compiler targets.
    Functions decorated by it can always be called directly if needed.
    """

    if len(args) == 1 and callable(args[0]):
        func = args[0]
        args = tuple()
    else:
        func = None

    if len(args) > 0 and isinstance(args[0], str):
        args = _parse_signatures(args[0]), *args[1:]

    cfg = {'target': TARGET}
    if TARGET in ('cpu', 'parallel'):
        cfg.update({'nopython': NOPYTHON, 'cache': CACHE})
    cfg.update(kwargs)

    def wrapper(func):
        return nb.vectorize(
            *args,
            **cfg,
        )(func)

    if func is not None:
        return wrapper(func)

    return wrapper


def gjit(*args, **kwargs):
    """
    Generalized vectorize on array, pre-configured, user-facing, switches compiler targets.
    Functions decorated by it can always be called directly if needed.
    """

    if len(args) == 1 and callable(args[0]):
        func = args[0]
        args = tuple()
    else:
        func = None

    if len(args) > 0 and isinstance(args[0], str):
        args = _parse_signatures(args[0]), *args[1:]

    cfg = {'target': TARGET}
    if TARGET in ('cpu', 'parallel'):
        cfg.update({'nopython': NOPYTHON, 'cache': CACHE})
    cfg.update(kwargs)

    def wrapper(func):
        return nb.guvectorize(
            *args,
            **cfg,
        )(func)

    if func is not None:
        return wrapper(func)

    return wrapper


def jit(*args, **kwargs):
    """
    Regular (n)jit, pre-configured, potentially user-facing, always CPU compiler target.
    Functions decorated by it can be called directly.
    """

    if len(args) == 1 and callable(args[0]):
        func = args[0]
        args = tuple()
    else:
        func = None

    cfg = {'nopython': NOPYTHON, 'inline': 'never'}  # DOES NOT SWITCH INLINE TO PRESERVE OLD TESTED BEHAVIOR
    cfg.update(kwargs)

    def wrapper(func):

        return nb.jit(
            *args,
            **cfg,
        )(func)

    if func is not None:
        return wrapper(func)

    return wrapper

# Test

In [3]:
import numpy as np
from math import cos, sin

COMPLEXITY = 2 ** 11
SIZE = 2 ** 16

@hjit('f(f)')
def helper(scalar: float) -> float:
    res: float = 0.0
    for idx in range(COMPLEXITY):
        if idx % 2 == round(scalar) % 2:
            res += sin(idx)
        else:
            res -= cos(idx)
    return res

@vjit('f(f)')
def test(d: float) -> float:
    return helper(d)

['f4(f4)', 'f8(f8)']
['f4(f4)', 'f8(f8)']


In [4]:
data_f4 = (np.random.random(SIZE) * 128).astype('f4')
data_f8 = (np.random.random(SIZE) * 128).astype('f8')

result_f4 = test(data_f4)
assert result_f4.dtype == np.float32

result_f8 = test(data_f8)
assert result_f8.dtype == np.float64



AssertionError: 