# Numba optimization

Numba works by generating optimized machine code using the LLVM compiler infrastructure at import time, runtime, or statically (using the included pycc tool). Numba supports compilation of Python to run on either CPU or GPU hardware, and is designed to integrate with the Python scientific software stack.

In [6]:
import io
from typing import List

import numba
import numpy as np
import pandas as pd

In [7]:
# original code from C.E.
def calc_slope(data: pd.DataFrame, key_columns: List[str], slope_column: str, fact_name: str) -> pd.DataFrame:
    if not data.empty:
        pdf_with_slope = data.sort_values(by="period_seq").groupby(key_columns).apply(slope, slope_column).reset_index()
        if pdf_with_slope.empty:
            data[fact_name] = 0.0
        else:
            pdf_with_slope.columns = key_columns + [fact_name]
            data = data.merge(pdf_with_slope, on=key_columns)
            return data
    else:
        return pd.DataFrame(columns=data.columns.to_list() + [fact_name])


def slope(data: pd.DataFrame, sales_column: str):
    num_periods = list(range(data.shape[0]))
    sales = data[sales_column].to_numpy()

    sum_num_periods = sum(num_periods)
    sum_sales = sum(sales)

    sum_num_periods_sales = np.dot(num_periods, sales)
    sum_num_periods_square = np.dot(num_periods, num_periods)

    square_of_sum_num_periods = sum_num_periods * sum_num_periods
    slope_num = len(num_periods) * sum_num_periods_sales - sum_num_periods * sum_sales
    slope_den = len(num_periods) * sum_num_periods_square - square_of_sum_num_periods

    return slope_num / slope_den

In CE we use `calc_slope` function twice inside the rule_53 implementation.

```python
output_df = calc_slope(output_df, ['item_id', 'competitor_item_id'], 'salesunits_own', 'slope_own')
#...
output_df = calc_slope(output_df, ['item_id', 'competitor_item_id'], 'salesunits_competitor', 'slope_competitor')
```

Let's check how expensive this function is, and if we can improve the execution time with numba.


In [8]:
# Prepare a sample data frame
#    We need the cell's data and configuration as they are used in CE:

configuration = {
    "country_code": "CN",
    "item_group_code": "PTV_FLAT",
    "market_configuration": {
        "ce": {
            "low_price_percentage": 0.1,
            "high_price_percentage": 0.1,
            "medium_price_percentage": 0.1,
            "lower_price_range_threshold": 0,
            "upper_price_range_threshold": 999999999,
        }
    },
}

df = pd.read_json(io.StringIO("PTV_FLAT-CN.py"), orient="columns")
df.shape

ValueError: Expected object or value

In [None]:
df.head()

In [None]:
df.info()

Verify the code works:

In [None]:
calc_slope(df, ["item_id", "competitor_item_id"], "salesunits_own", "slope_own")

In [None]:
%timeit calc_slope(df, ['item_id', 'competitor_item_id'], 'salesunits_own', 'slope_own')

Let’s take a look and see where the time is spent during this operation (limited to the most time consuming four calls) using the prun ipython magic function:

In [None]:
%prun -l 4 calc_slope(df, ['item_id', 'competitor_item_id'], 'salesunits_own', 'slope_own')

# Numba

> NOTE: As of Numba version 0.20, pandas objects cannot be passed directly to Numba-compiled functions. Instead, one must pass the NumPy array underlying the pandas object to the Numba-compiled function.



- Pandas is not understood by Numba and as a result Numba would simply run the code via the interpreter but with the added cost of the Numba internal overheads!
- To use JIT compile with Numba, we need to write code based on vectorizationa & broadcasting technique.
- Vectorizing the code only plays well with Numpy and simple Python syntax.
- Instead of using a Pandas `apply`, separate out numerical calculations into a Numba sub-function.
- `pyyaml` - enables configuration of Numba via a YAML config file.
- The parallel option for jit() can produce diagnostic information about the transforms undertaken in automatically parallelizing the decorated code. This information can be accessed in two ways, the first is by setting the environment variable `NUMBA_PARALLEL_DIAGNOSTICS`, the second is by calling `parallel_diagnostics()`, both methods give the same information and print to STDOUT. The level of verbosity in the diagnostic information is controlled by an integer argument of value between 1 and 4 inclusive, 1 being the least verbose and 4 the most.



In [None]:
@numba.jit
def f_plain(x):
    return x * (x - 1)


@numba.jit
def integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx


@numba.jit
def apply_integrate_f_numba(col_a, col_b, col_N):
    #     n = len(col_N)
    n = col_N.shape[0]
    result = np.empty(n, dtype=np.float64)
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result


def compute_numba(df):
    result = apply_integrate_f_numba(df["a"].to_numpy(), df["b"].to_numpy(), df["N"].to_numpy())
    return pd.Series(result, index=df.index, name="result")

In [None]:
rand_df = pd.DataFrame(
    {"a": np.random.randn(1000), "b": np.random.randn(1000), "N": np.random.randint(100, 1000, (1000)), "x": "x"}
)

In [None]:
%timeit compute_numba(rand_df)

In [None]:
# Numba optimized code
def calc_optimized_slope(data: pd.DataFrame, key_columns: List[str], slope_column: str, fact_name: str) -> pd.DataFrame:
    if not data.empty:
        pdf_with_slope = (
            data.sort_values(by="period_seq")
            .groupby(key_columns)[slope_column]
            .apply(apply_optimized_slope, raw=True)
            .reset_index()
        )
        if pdf_with_slope.empty:
            data[fact_name] = 0.0
        else:
            pdf_with_slope.columns = key_columns + [fact_name]
            data = data.merge(pdf_with_slope, on=key_columns)
            return data
    else:
        return pd.DataFrame(columns=data.columns.to_list() + [fact_name])


# def apply_optimized_slope(data: np.ndarray, sales_column: str):
#     return the_real_calculation(data[sales_column].to_numpy(),
#                                 np.array(data.shape[0]))


@numba.jit
def apply_optimized_slope(sales: np.ndarray) -> float:
    num_periods = np.arange(float(size))
    print(size)
    print(sales.shape)
    print(sales)

    #     sum_num_periods = np.sum(num_periods)
    #     sum_sales = np.sum(sales)

    #     sum_num_periods_sales = np.dot(num_periods, sales)
    #     sum_num_periods_square = np.dot(num_periods, num_periods)

    #     square_of_sum_num_periods = sum_num_periods * sum_num_periods
    #     slope_num = size * sum_num_periods_sales - sum_num_periods * sum_sales
    #     slope_den = size * sum_num_periods_square - square_of_sum_num_periods

    #     square_of_sum_num_periods = np.multiply(sum_num_periods, sum_num_periods)
    #     slope_num = np.subtract((size * sum_num_periods_sales), (sum_num_periods * sum_sales))
    #     slope_den = np.subtract((size * sum_num_periods_square), square_of_sum_num_periods)

    #     return np.divide(slope_num, slope_den)
    return 0.0

In [None]:
np.arange(5.0)

In [None]:
calc_optimized_slope(df, ["item_id", "competitor_item_id"], "salesunits_own", "slope_own")

In [None]:
%timeit calc_optimized_slope(df, ['item_id', 'competitor_item_id'], 'salesunits_own', 'slope_own')