Есть Pandas DataFrame со столбцами [“customer_id”, “product_id”, “timestamp”], который содержит данные по просмотрам товаров на сайте. Есть проблема – просмотры одного customer_id не разбиты на сессии (появления на сайте). Мы хотим разместить сессии так, чтобы сессией считались все смежные просмотры, между которыми не более 3 минут.

Написать методом который создаст в Pandas DataFrame столбец session_id и проставит в нем уникальный int id для каждой сессии.

У каждого пользователя может быть по несколько сессий. Исходный DataFrame может быть большим – до 100 млн строк.

In [2]:
import numpy as np
import pandas as pd

In [3]:
def gendf(CUSTOMER_NUM=5, PRODUCT_NUM=10, ORDER_NUM=10, MAX_TIME_OFFSET=1000):
    global df
    df = pd.DataFrame({
        'customer_id': np.random.randint(0, CUSTOMER_NUM, size=(ORDER_NUM,)),
        'product_id': np.random.randint(0, PRODUCT_NUM, size=(ORDER_NUM,)),
        'timestamp': pd.Timestamp('2018-01-01 00:00:00') + pd.to_timedelta(np.random.randint(15, MAX_TIME_OFFSET, size=(ORDER_NUM,)), unit='s'),
        }).sort_values('timestamp')

gendf()

In [4]:
df

Unnamed: 0,customer_id,product_id,timestamp
1,3,7,2018-01-01 00:01:30
4,2,6,2018-01-01 00:03:22
0,1,9,2018-01-01 00:06:41
2,3,8,2018-01-01 00:07:39
8,2,1,2018-01-01 00:08:33
3,3,6,2018-01-01 00:08:44
7,0,6,2018-01-01 00:09:53
5,2,1,2018-01-01 00:11:03
9,3,9,2018-01-01 00:12:40
6,0,2,2018-01-01 00:15:04


Самое очевидное решение: для каждого пользователя пройти по записям, храня значение `session_id` и предыдущий `timestamp`, и увеличивать `session_id` если разница текущего `timestamp` с предыдущим больше 3 минут. Однако такой метод будет очень медленным, потому что не использует возможности pandas.

In [4]:
def naive(df):
    session_id = -1
    cur_customer_id = -1
    last_timestamp = None
    
    for name, group in df.groupby('customer_id'):
        for j in group.itertuples():
            if j[1] != cur_customer_id:
                session_id += 1
                cur_customer_id = j[1]
                last_timestamp = j[3]
            elif (j[3] - last_timestamp) > pd.Timedelta('3min'):
                session_id += 1
            df.loc[j.Index, 'session_id'] = session_id
            last_timestamp = j[3]

gendf()
naive(df)
df.sort_values('customer_id')

Unnamed: 0,customer_id,product_id,timestamp,session_id
6,0,2,2018-01-01 00:03:51,0.0
5,0,2,2018-01-01 00:08:36,1.0
1,0,4,2018-01-01 00:11:50,2.0
7,0,8,2018-01-01 00:15:33,3.0
9,1,9,2018-01-01 00:04:33,4.0
8,1,0,2018-01-01 00:14:22,5.0
4,2,3,2018-01-01 00:02:29,6.0
0,2,6,2018-01-01 00:04:14,6.0
2,2,2,2018-01-01 00:05:40,6.0
3,2,2,2018-01-01 00:12:53,7.0


Немного более быстрый способ с использованием apply вместо цикла:

In [8]:
def naive2(df):    
    session_id = -1
    cur_customer_id = -1
    last_timestamp = None
    def naive2(x):
        nonlocal session_id, cur_customer_id, last_timestamp
        if x['customer_id'] != cur_customer_id:
            session_id += 1
            cur_customer_id = x['customer_id']
            last_timestamp = x['timestamp']
            return session_id
        if (x['timestamp'] - last_timestamp) > pd.Timedelta('3min'):
            session_id += 1
        last_timestamp = x['timestamp']
        return session_id
    
    for name, group in df.groupby('customer_id'):
                df.loc[group.index, 'session_id'] = group.apply(naive2, axis=1)

gendf()
naive(df)
df.sort_values('customer_id')

Unnamed: 0,customer_id,product_id,timestamp,session_id
0,0,9,2018-01-01 00:15:10,0.0
2,1,0,2018-01-01 00:01:11,1.0
6,1,4,2018-01-01 00:06:29,2.0
9,2,5,2018-01-01 00:05:55,3.0
3,2,6,2018-01-01 00:10:49,4.0
7,2,2,2018-01-01 00:13:03,4.0
8,3,2,2018-01-01 00:16:21,5.0
1,4,4,2018-01-01 00:00:54,6.0
4,4,5,2018-01-01 00:13:20,7.0
5,4,6,2018-01-01 00:15:21,7.0


In [28]:
from time import perf_counter

# для честного тестирования скорости нужно каждый раз запускать метод на новом датафрейме, поэтому такая функция
def test_funcs(funcs_to_test, iter_count=100, order_num=1000):
    n = len(funcs_to_test)
    result = [0 for _ in range(n)]

    for idx in range(iter_count):
        gendf(ORDER_NUM=order_num)

        for f in range(n):
            df2 = df.copy()
            start = perf_counter()
            funcs_to_test[((idx+f) % n)](df2)
            end = perf_counter()
            result[((idx+f) % n)] += end - start
    
    for func, res in zip(funcs_to_test, result):
        print(f'{func.__name__}, {res*1000 / iter_count:.3f}ms')
    

In [29]:
test_funcs([naive, naive2])

naive, 108.429ms
naive2, 32.083ms


Заметим, что вместо того, чтобы вычитать из каждого `timestamp` предыдущий, мы можем из всего столбца вычесть его самого, сдвинутого на одну позицию вперед. Сравнив полученный столбец с дельтой в 3 минуты получим булевый индикатор, который показывает, когда нужно инкрементировать `session_id`. Взяв его кумулятивную сумму получим сам `session_id` для каждой записи.

In [8]:
def add_session_id(df):
    # инициализируем столбец нулями. Этого можно не делать, однако тогда столбец примет dtype=float, тк int не может иметь значение nan.
    # Вместо этого можно также в конце сделать df['session_id'].astype(int), однако это займет дольше времени из-за лишней конвертации
    df['session_id'] = 0
    g = df.groupby('customer_id')
    offset = 0
    for name, group in g:
        # fillna для заполнения самого первого индекса
        df.loc[group.index, 'session_id'] = (group['timestamp'] - group['timestamp'].shift(1) > pd.Timedelta('3min')).fillna(0).cumsum() + offset
        # так как session_id уникальны глобально, то нужно смещать его значения для следующего пользователя на последний id этого пользователя
        offset = df['session_id'].max() + 1
        
gendf()
add_session_id(df)
df.sort_values('customer_id')

Unnamed: 0,customer_id,product_id,timestamp,session_id
1,0,8,2018-01-01 00:02:54,0
5,0,0,2018-01-01 00:04:12,0
2,0,2,2018-01-01 00:04:55,0
6,0,6,2018-01-01 00:05:00,0
7,0,9,2018-01-01 00:12:55,1
4,1,2,2018-01-01 00:08:10,2
0,1,3,2018-01-01 00:09:27,2
8,1,8,2018-01-01 00:13:35,3
9,2,4,2018-01-01 00:07:08,4
3,4,3,2018-01-01 00:10:24,5


In [31]:
test_funcs([naive2, add_session_id])

naive2, 31.693ms
add_session_id, 7.716ms


Проверим, что полученный результат совпадает с ожидаемым:

In [11]:
gendf(ORDER_NUM=1000)
df2 = df.copy()
naive(df)
df['session_id'] = df['session_id'].astype(int)
add_session_id(df2)

df.equals(df2)

True

Тест на большом датасете:

In [37]:
test_funcs([add_session_id], order_num=10000000, iter_count=10)

add_session_id, 2688.079ms


Функция вынесена в файл `solution.py`