Нашей компании нужно сгруппировать клиентов для АБ-тестов. Алгоритм группировки очень простой - взять ID клиента (состоит из 5-7 цифр, например 7412567) и найти сумму всех его цифр. Получившееся число и является номером группы, в которую входит данный клиент.

Нужно написать две функции:
1. Функция, которая подсчитывает число покупателей, попадающих в каждую группу, если нумерация ID сквозная и начинается с 0. На вход функция получает целое число n_customers (количество клиентов).
2. Функция, аналогичная первой, если ID начинается с произвольного числа. На вход функция получает целые числа: п_customers (количество клиентов) и п_first_id (первый ID в последовательности).

В начале тест кейса написано, что ID состоит из 5-7 цифр, поэтому будем считать, что максимальный ID - 9999999. Соответственно, максимальный номер группы - 9*7 = 63.

In [366]:
import numpy as np

MAX_GROUP_ID = 63

Сначала напишем первую функцию.

Наивное решение: для каждого ID считать сумму цифр, проходя по числу как по строке. Сложность $O(N * 7) = O(N)$, однако перевод в строку и обратно занимает много времени.

In [367]:
def get_customers_per_group(n_customers):
    customers_per_group = np.zeros(MAX_GROUP_ID+1, dtype=int)
    for i in range(n_customers):
        # Находим сумму цифр в ID
        group_id = sum(int(digit) for digit in str(i))
        customers_per_group[group_id] += 1
    return customers_per_group

In [379]:
%%time
ans1 = get_customers_per_group(10000000)

CPU times: user 18.5 s, sys: 0 ns, total: 18.5 s
Wall time: 18.5 s


Так как в нашем случае ID идут подряд, то считать сумму цифр можно быстрее. Сумма цифр числа $X+1$ равна сумме цифр числа $X$, увеличенной на 1, если не произошло переноса в старший разряд. При каждом переносе в старший разряд сумма уменьшается на 9.

In [369]:
def get_customers_per_group2(n_customers):
    customers_per_group = np.zeros(MAX_GROUP_ID+1, dtype=int)
    group_id = 0
    for i in range(n_customers):
        customers_per_group[group_id] += 1
        # Увеличиваем group_id на 1 для следующего покупателя
        group_id += 1
        divisor = 10
        while True:
            # При каждом переносе в старший разряд уменьшаем group_id на 9
            if i % divisor == divisor - 1:
                group_id -= 9
            else:
                break
            divisor *= 10
        
    return customers_per_group

In [380]:
%%time
ans2 = get_customers_per_group2(10000000)

CPU times: user 3.6 s, sys: 0 ns, total: 3.6 s
Wall time: 3.59 s


Ускорение в ~5 раз. Удостоверимся, что ответы совпадают.

In [371]:
np.array_equal(ans1, ans2)

True

In [372]:
from random import randint

for _ in range(100):
    n_customers = randint(1, 10000)
    assert np.array_equal(get_customers_per_group(n_customers), get_customers_per_group2(n_customers))

Пусть нам необходимо подсчитать ответ для 100 клиентов (от 0 до 99). Разложим промежуток $0-99$ на такие части: $0-9, 10-19, \dots , 90-99$. Ответом будет поиндексная сумма ответов для каждой части.

Заметим также, что ответ для части 0-9 равен

`[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, ...]`.

для части 10-19 равен

`[0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, ...]`.

для части 20-29 равен

`[0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, ...]`.

То есть ответ для части 0-9, сдвигается направо на i мест, где i - первая цифра чисел в группе.

Таким образом, мы можем считать ответ рекурсивно: ответ для числа $\overline{abc}$ считаем как сумму ответов для $\overline{99}$, сдвинутых направо на $0, 1, \dots, a-1$ мест. И также нужно прибавить ответ для $\overline{bc}$, сдвинутый на $a$ мест вправо. 

In [373]:
def get_customers_per_group3(n_customers):
    answers_for_nines = dict()
    def recursive(customer_id):
        nonlocal answers_for_nines
        result = np.zeros(MAX_GROUP_ID+1, dtype=int)
        # Если длина customer_id равна 1, то возвращаем в ручную заполненный массив
        if len(customer_id) == 1:
            for i in range(int(customer_id) + 1):
                result[i] += 1
            return result
        # Считаем ответ для числа из девяток длиной на 1 меньше чем customer_id.
        # Сохраняем ответы в словарь, так как иначе будем много раз пересчитывать одно и то же
        lower_num = '9' * (len(customer_id) - 1)
        if lower_num not in answers_for_nines:
            answers_for_nines[lower_num] = recursive(lower_num)
        lower_ans = answers_for_nines[lower_num]
        # сдвигаем этот ответ на 0, 1, ... , a-1 вправо и прибавляем к ответу
        for i in range(int(customer_id[0])):
            result += np.roll(lower_ans, i)
        # считаем ответ для числа customer_id[1:], сдвигаем его на a вправо и прибавляем к ответу
        if customer_id[1:] in answers_for_nines:
            result += np.roll(answers_for_nines[customer_id[1:]], int(customer_id[0]))
        else:
            result += np.roll(recursive(customer_id[1:]), int(customer_id[0]))
        return result
    customers_per_group = recursive(str(n_customers - 1))
    return customers_per_group

In [374]:
%%time
ans3 = get_customers_per_group3(10000000)

CPU times: user 2.65 ms, sys: 0 ns, total: 2.65 ms
Wall time: 1.85 ms


Большое ускорение. Если не сохранять ответы для чисел, состоящих из девяток, то на каждом шаге рекурсии происходит ветвление на два подзадачи. Глубина рекурсии равна длине числа, то есть $O(log_{10}N)$. Таким образом число вызовов функции равно $O(2^{log_{10}N}) = O(N^{log_{10}2}) \approx O(\sqrt[3]{N})$. Однако с оптимизацией ветвление на два подзадачи происходит только в верхнем вызове, таким образом число вызовов функции равно $O(log_{10}N) = O(logN)$. Внутри каждой функции все операции константные, включая цикл for (он длиной не более 9). Таким образом, время работы алгоритма $O(logN)$.

Проверяем, что ответы совпадают.

In [375]:
np.array_equal(ans1, ans3)

True

In [376]:
for _ in range(100):
    n_customers = randint(1, 10000)
    assert np.array_equal(get_customers_per_group2(n_customers), get_customers_per_group3(n_customers))

Вторая функция очень проста. Просто вызываем первую функцию для `n_first_id` и `n_customers + n_first_id`, и вычитаем из второго ответа первый. Будем использовать наилучший вариант первой функции, но также напишем наивную версию второй функции для проверки корректности.

In [377]:
def naive_get_customers_per_group(n_customers, n_first_id):
    customers_per_group = np.zeros(MAX_GROUP_ID+1, dtype=int)
    for i in range(n_first_id, n_first_id + n_customers):
        group_id = sum(int(digit) for digit in str(i))
        customers_per_group[group_id] += 1
    return customers_per_group

def get_customers_per_group_from_id(n_customers, n_first_id):
    return get_customers_per_group3(n_customers + n_first_id) - get_customers_per_group3(n_first_id)


In [378]:
for _ in range(100):
    n_customers = randint(1, 10000)
    n_first_id = randint(1, 100000)
    assert np.array_equal(get_customers_per_group_from_id(n_customers, n_first_id), naive_get_customers_per_group(n_customers, n_first_id))

Сложность этой функции $O(2logN) = O(logN)$. Итоговые функции вынесены в `solution.py`.