In [1]:
import sys
sys.path.insert(0, '..')

# Инициализируем граф

In [2]:
import pandas as pd

from src.eventstream.eventstream import Eventstream
from src.eventstream.schema import RawDataSchema, EventstreamSchema
from src.graph.p_graph import PGraph, EventsNode

raw_data = pd.read_csv('simple-onlineshop.csv')
# для теста кастомной колонки
cash_users = raw_data[raw_data['event'].str.contains('cash')]['user_id'].to_list()
raw_data['user_type'] = raw_data['user_id'].apply(
                lambda x: 'cash' if x in cash_users else 'not_cash')

custom_cols = ['user_type']


raw_data_schema = RawDataSchema(
                            event_name="event",
                            event_timestamp="timestamp",
                            user_id="user_id",
                            custom_cols = [{"custom_col": 'user_type',
                                           "raw_data_col": "user_type"}]
                                )

source = Eventstream(
    raw_data=raw_data,
    raw_data_schema=raw_data_schema,
    schema=EventstreamSchema(custom_cols=custom_cols)
)

graph = PGraph(source_stream=source)

In [3]:
raw_data

Unnamed: 0,user_id,event,timestamp,user_type
0,219483890,catalog,2019-11-01 17:59:13.273932,not_cash
1,219483890,product1,2019-11-01 17:59:28.459271,not_cash
2,219483890,cart,2019-11-01 17:59:29.502214,not_cash
3,219483890,catalog,2019-11-01 17:59:32.557029,not_cash
4,964964743,catalog,2019-11-01 21:38:19.283663,not_cash
...,...,...,...,...
35376,501098384,catalog,2020-04-29 12:47:40.975732,not_cash
35377,501098384,catalog,2020-04-29 12:48:01.809577,not_cash
35378,501098384,main,2020-04-29 12:48:01.938488,not_cash
35379,501098384,catalog,2020-04-29 12:48:06.595390,not_cash


In [4]:
source_df = source.to_dataframe()

In [5]:
source_df.head(3)

Unnamed: 0,event_id,event_type,event_index,event_name,event_timestamp,user_id,user_type
0,0a29f2d8-fc00-4702-9f49-654bed8f4cca,raw,0,catalog,2019-11-01 17:59:13.273932,219483890,not_cash
1,738cb153-2447-496f-8a45-6275a785ca27,raw,1,product1,2019-11-01 17:59:28.459271,219483890,not_cash
2,2896bed2-d0c8-461d-9640-4a72c95c7fd7,raw,2,cart,2019-11-01 17:59:29.502214,219483890,not_cash


In [6]:
source_df['event_name'].value_counts()

catalog             14518
main                 5635
lost                 3098
cart                 2842
product2             2172
delivery_choice      1686
product1             1515
payment_choice       1107
delivery_courier      834
payment_done          706
payment_card          565
delivery_pickup       506
payment_cash          197
Name: event_name, dtype: int64

In [7]:
source_df[source_df['user_id'] == 122915]

Unnamed: 0,event_id,event_type,event_index,event_name,event_timestamp,user_id,user_type
18665,29590ebf-c60a-4fab-ab5e-cfb4f4ffdca7,raw,18665,main,2020-04-07 03:27:32.806542,122915,not_cash
18666,ebd75f99-0a92-4ce3-a772-3f59d6a9bf5e,raw,18666,catalog,2020-04-07 03:27:49.914239,122915,not_cash
18667,6cb7c292-b0ab-4e56-88cc-b6d0bc91a57c,raw,18667,main,2020-04-07 03:27:57.320839,122915,not_cash
18668,c0c94665-03de-4f39-8531-7092f75ebf95,raw,18668,main,2020-04-07 03:28:00.580589,122915,not_cash
18669,33f18cfe-d278-40b1-acce-666bf91483a5,raw,18669,catalog,2020-04-07 03:28:01.815559,122915,not_cash
18670,3017ac29-e873-428f-b62e-88b5b4c35613,raw,18670,catalog,2020-04-07 03:28:45.697560,122915,not_cash
18671,99b23181-546e-4418-bb7b-167b62a1e6ae,raw,18671,product2,2020-04-07 03:29:04.983348,122915,not_cash
18672,f65cd5cb-faf3-48c7-bd28-a96a3e40ba91,raw,18672,catalog,2020-04-07 03:29:44.892554,122915,not_cash
18673,72f19b62-789a-4e06-8ce9-6bd8f8884e98,raw,18673,catalog,2020-04-07 03:30:28.058167,122915,not_cash
18674,e3ce3247-7688-44a1-a08e-29b1083650fa,raw,18674,catalog,2020-04-07 03:30:58.013806,122915,not_cash


In [8]:
source_df['event_name'].isin(['product1', 'product2'])

0        False
1         True
2        False
3        False
4        False
         ...  
35376    False
35377    False
35378    False
35379    False
35380    False
Name: event_name, Length: 35381, dtype: bool

# Grouping

## Описание работы функции

Датапроцессор переименовывает события по заданному условию.
Новых событий не добавляется, у переименованных событий изменяется ```event_name``` и ```event_type```


Параметры:
```event_name: str```
Новое имя для событий, которые подойдут под условие из параметра ```filter```

```filter: EventstreamFilter```
EventstreamFilter = Callable[[DataFrame, EventstreamSchema], Any]
Кастомная функция, в которую передается датафрейм, схема и на выходе она дает маску (условие) для текущего eventstream.
Событие для которых условие = True -- > переименовывается

```event_type: Optional[str] = "group_alias"```
По дефолту новый тип события - group_alias, но можно задать свой
Изменение типа события происходит также по условию из функции filter

 ## Простое переименование 2-ух событий в одно

In [9]:
# Объединим в группу product события product1 и product2
from src.data_processors_lib.simple_processors import SimpleGroup, SimpleGroupParams

product_agg = EventsNode(
    SimpleGroup(params=SimpleGroupParams(**{
        'event_name': 'product1',
        'filter': lambda df, schema: df[schema.event_name].isin(['product1', 'product2']),
        'event_type' : 'lalala'
    }))
)

graph.add_node(
    node=product_agg,
    parents=[graph.root]
)

result = graph.combine(
    node=product_agg
)

In [10]:
product_agg_df = result.to_dataframe()

In [11]:
product_agg_df[product_agg_df['user_id'] == 122915]

Unnamed: 0,event_id,event_type,event_index,event_name,event_timestamp,user_id,user_type
20596,29590ebf-c60a-4fab-ab5e-cfb4f4ffdca7,raw,20596,main,2020-04-07 03:27:32.806542,122915.0,not_cash
20597,ebd75f99-0a92-4ce3-a772-3f59d6a9bf5e,raw,20597,catalog,2020-04-07 03:27:49.914239,122915.0,not_cash
20598,6cb7c292-b0ab-4e56-88cc-b6d0bc91a57c,raw,20598,main,2020-04-07 03:27:57.320839,122915.0,not_cash
20599,c0c94665-03de-4f39-8531-7092f75ebf95,raw,20599,main,2020-04-07 03:28:00.580589,122915.0,not_cash
20600,33f18cfe-d278-40b1-acce-666bf91483a5,raw,20600,catalog,2020-04-07 03:28:01.815559,122915.0,not_cash
20601,3017ac29-e873-428f-b62e-88b5b4c35613,raw,20601,catalog,2020-04-07 03:28:45.697560,122915.0,not_cash
20603,1d3038b5-353a-46a0-9d2e-e290ee91622a,lalala,20603,product1,2020-04-07 03:29:04.983348,122915.0,not_cash
20604,f65cd5cb-faf3-48c7-bd28-a96a3e40ba91,raw,20604,catalog,2020-04-07 03:29:44.892554,122915.0,not_cash
20605,72f19b62-789a-4e06-8ce9-6bd8f8884e98,raw,20605,catalog,2020-04-07 03:30:28.058167,122915.0,not_cash
20606,e3ce3247-7688-44a1-a08e-29b1083650fa,raw,20606,catalog,2020-04-07 03:30:58.013806,122915.0,not_cash


In [12]:
source_df[source_df['user_id'] == 122915]

Unnamed: 0,event_id,event_type,event_index,event_name,event_timestamp,user_id,user_type
18665,29590ebf-c60a-4fab-ab5e-cfb4f4ffdca7,raw,18665,main,2020-04-07 03:27:32.806542,122915,not_cash
18666,ebd75f99-0a92-4ce3-a772-3f59d6a9bf5e,raw,18666,catalog,2020-04-07 03:27:49.914239,122915,not_cash
18667,6cb7c292-b0ab-4e56-88cc-b6d0bc91a57c,raw,18667,main,2020-04-07 03:27:57.320839,122915,not_cash
18668,c0c94665-03de-4f39-8531-7092f75ebf95,raw,18668,main,2020-04-07 03:28:00.580589,122915,not_cash
18669,33f18cfe-d278-40b1-acce-666bf91483a5,raw,18669,catalog,2020-04-07 03:28:01.815559,122915,not_cash
18670,3017ac29-e873-428f-b62e-88b5b4c35613,raw,18670,catalog,2020-04-07 03:28:45.697560,122915,not_cash
18671,99b23181-546e-4418-bb7b-167b62a1e6ae,raw,18671,product2,2020-04-07 03:29:04.983348,122915,not_cash
18672,f65cd5cb-faf3-48c7-bd28-a96a3e40ba91,raw,18672,catalog,2020-04-07 03:29:44.892554,122915,not_cash
18673,72f19b62-789a-4e06-8ce9-6bd8f8884e98,raw,18673,catalog,2020-04-07 03:30:28.058167,122915,not_cash
18674,e3ce3247-7688-44a1-a08e-29b1083650fa,raw,18674,catalog,2020-04-07 03:30:58.013806,122915,not_cash


In [13]:
(len(product_agg_df[product_agg_df['user_id'] == 122915]) ==
len(source_df[source_df['user_id'] == 122915]))

True

In [14]:
product_agg_df[product_agg_df['user_id'] == 122915]['event_name'].value_counts()

catalog     18
main         7
product1     6
cart         1
lost         1
Name: event_name, dtype: int64

In [15]:
source_df[source_df['user_id'] == 122915]['event_name'].value_counts()

catalog     18
main         7
product1     4
product2     2
cart         1
lost         1
Name: event_name, dtype: int64

In [16]:
(source_df[source_df['event_name'].isin(['product1', 'product2'])]['event_name'].count() == product_agg_df[product_agg_df['event_name'].isin(['product'])]['event_name'].count())

False

## По 2 условиям из 2 колонок

In [17]:
# Объединим в группу product события product1 и product2

def filter_(df, schema):
    return ((df[schema.user_id].isin([122915])) |
            (df.event_name.str.contains('product')))


product_agg = EventsNode(
    SimpleGroup(params=SimpleGroupParams(**{
        'event_name': 'product',
        'filter': filter_
    }))
)

graph.add_node(
    node=product_agg,
    parents=[graph.root]
)

result = graph.combine(
    node=product_agg
)

In [18]:
source_df['event_name'].str.contains('product')

0        False
1         True
2        False
3        False
4        False
         ...  
35376    False
35377    False
35378    False
35379    False
35380    False
Name: event_name, Length: 35381, dtype: bool

In [19]:
source_df['user_type'] == 'cash'

0        False
1        False
2        False
3        False
4        False
         ...  
35376    False
35377    False
35378    False
35379    False
35380    False
Name: user_type, Length: 35381, dtype: bool

In [20]:
result.to_dataframe()[result.to_dataframe()['user_id'] == 122915]

Unnamed: 0,event_id,event_type,event_index,event_name,event_timestamp,user_id,user_type
20596,8b5f1c2d-8776-4fe1-9a9b-60f5f3d2a03e,group_alias,20596,product,2020-04-07 03:27:32.806542,122915.0,not_cash
20598,bd33513a-6bb8-4172-9e77-15d355a67338,group_alias,20598,product,2020-04-07 03:27:49.914239,122915.0,not_cash
20600,6368870f-e10f-4f84-b2ff-c15cdacfd9fd,group_alias,20600,product,2020-04-07 03:27:57.320839,122915.0,not_cash
20602,19292c06-2495-4578-ada5-26dfedbcb03c,group_alias,20602,product,2020-04-07 03:28:00.580589,122915.0,not_cash
20604,39b49656-030f-4092-84fe-91039c4f4646,group_alias,20604,product,2020-04-07 03:28:01.815559,122915.0,not_cash
20606,ba9e29e1-dc46-4b84-9d51-3633c600a290,group_alias,20606,product,2020-04-07 03:28:45.697560,122915.0,not_cash
20608,e1e8e107-4efe-40b3-8759-d04458e5301e,group_alias,20608,product,2020-04-07 03:29:04.983348,122915.0,not_cash
20610,af645065-3a6f-4aa9-98f8-3c0a6035a910,group_alias,20610,product,2020-04-07 03:29:44.892554,122915.0,not_cash
20612,3f26544b-4eb5-49c3-bb2c-aedcf6c68191,group_alias,20612,product,2020-04-07 03:30:28.058167,122915.0,not_cash
20614,4c7225ae-1537-4908-9bd5-3f7e8238697e,group_alias,20614,product,2020-04-07 03:30:58.013806,122915.0,not_cash


In [21]:
result.to_dataframe()['event_name'].value_counts()

catalog             14500
main                 5628
product              3714
lost                 3097
cart                 2841
delivery_choice      1686
payment_choice       1107
delivery_courier      834
payment_done          706
payment_card          565
delivery_pickup       506
payment_cash          197
Name: event_name, dtype: int64

# Delete

## Описание работы функции

Датапроцессор фильрует события по заданному условию (проставляет метку delete=True)

Параметры:
```filter: EventstreamFilter```
EventstreamFilter = Callable[[DataFrame, EventstreamSchema], Any]
Кастомная функция, в которую передается датафрейм, схема и на выходе она дает маску (условие) для текущего eventstream.
Событие для которых условие = True -- > отфильтровываются


In [22]:
from src.data_processors_lib.simple_processors import DeleteEvents, DeleteEventsParams

def filter_1(df, schema):
    return (df['event_name'] == 'lost')

delete_lost = EventsNode(DeleteEvents(
                    params=DeleteEventsParams(filter=filter_1)))

graph.add_node(
    node=delete_lost,
    parents=[product_agg]
)

result = graph.combine(
    node=delete_lost
)

In [23]:
result.to_dataframe()['event_name'].value_counts()

catalog             14500
main                 5628
product              3714
cart                 2841
delivery_choice      1686
payment_choice       1107
delivery_courier      834
payment_done          706
payment_card          565
delivery_pickup       506
payment_cash          197
Name: event_name, dtype: int64

Удалились события lost

In [24]:
result.to_dataframe()

Unnamed: 0,event_id,event_type,event_index,event_name,event_timestamp,user_id,user_type
0,0a29f2d8-fc00-4702-9f49-654bed8f4cca,raw,0,catalog,2019-11-01 17:59:13.273932,219483890.0,not_cash
1,25345d44-b56a-43e3-ba88-5f15d79955ba,group_alias,1,product,2019-11-01 17:59:28.459271,219483890.0,not_cash
3,2896bed2-d0c8-461d-9640-4a72c95c7fd7,raw,3,cart,2019-11-01 17:59:29.502214,219483890.0,not_cash
4,103ffb37-51ca-4948-84f2-88870199b83d,raw,4,catalog,2019-11-01 17:59:32.557029,219483890.0,not_cash
5,8fa40a59-9c87-455c-82c4-e079206f88c8,raw,5,catalog,2019-11-01 21:38:19.283663,964964743.0,not_cash
...,...,...,...,...,...,...,...
42185,f4f75a6a-a878-4dd3-b759-8ac29e0d9573,raw,42185,main,2020-04-29 12:47:39.956925,501098384.0,not_cash
42186,ebb8e0b2-3e31-47a4-8bd2-e1cfaef78128,raw,42186,catalog,2020-04-29 12:47:40.975732,501098384.0,not_cash
42187,7b31064b-2de4-4480-85de-ae2cf4ccc2e4,raw,42187,catalog,2020-04-29 12:48:01.809577,501098384.0,not_cash
42188,1b09d736-d4b3-4a45-9ace-53a8fa440b65,raw,42188,main,2020-04-29 12:48:01.938488,501098384.0,not_cash


# Последовательное применение нод

In [25]:
# Объединим в группу delivery события:
# delivery_choice, delivery_courier, delivery_pickup

def filter_delivery(df, schema):
    return ((df['user_type'] == 'cash') |
            (df['event_name'].str.contains('delivery')))


delivery_agg = EventsNode(
    SimpleGroup(params=SimpleGroupParams(**{
        'event_name': 'delivery',
        'filter': filter_delivery
    })))

graph.add_node(
    node=delivery_agg,
    parents=[delete_lost]
)

result = graph.combine(
    node=delivery_agg
)

In [26]:
df_res = result.to_dataframe()

In [27]:
df_res['event_name'].value_counts()

catalog           12911
delivery           6757
main               4939
product            3267
cart               2452
payment_choice      854
payment_done        571
payment_card        533
Name: event_name, dtype: int64

In [31]:
len(source_df[(source_df['user_type'] == 'cash') |
            (source_df['event_name'].str.contains('delivery'))])

6834

In [28]:
source_df['event_name'].value_counts()

catalog             14518
main                 5635
lost                 3098
cart                 2842
product2             2172
delivery_choice      1686
product1             1515
payment_choice       1107
delivery_courier      834
payment_done          706
payment_card          565
delivery_pickup       506
payment_cash          197
Name: event_name, dtype: int64

In [36]:
len(df_res), \
len(source_df[(source_df['event_name'] != 'lost')|((source_df['event_name'] == 'lost')
    & (source_df['user_id'] == 122915)
                                            )
    ]
)

(32284, 32284)

# Результаты тестирования

<b>Grouping

1) Зачем мы передаем параметр "schema" в функцию фильтра?
2) [TODO] Если мы можем задать кастомный тип события - то как оно отсортируется? - надо бы менять список сортировки здесь же?


<b>Delete

1) [TO_Discuss] Надо подумать про переименование в Filter или что-то в этом роде, тогда не нужно отдельного
датапроцессора для фильтрации

<b>Общее

1) [TO_Discuss] Хорошо бы иметь метод, который выводит граф от текущей вершины (последовательность нод)

In [55]:
source_df = pd.DataFrame([
            [1, 'event1', '2022-01-01 00:00:00'],
            [1, 'event2', '2022-01-01 00:00:01'],
            [1, 'event3', '2022-01-01 00:00:02'],
            [2, 'event4', '2022-01-02 00:00:00'],
        ], columns=['user_id', 'event', 'timestamp']
        )

correct_result_columns = ['user_id', 'event_name', 'event_type', 'event_timestamp']
correct_result = pd.DataFrame([
            [1, 'event1', 'raw', '2022-01-01 00:00:00'],
            [1, 'event3', 'raw', '2022-01-01 00:00:02'],
        ], columns=correct_result_columns
        )

In [56]:
stream = Eventstream(
            raw_data_schema=RawDataSchema(
                event_name='event', event_timestamp='timestamp', user_id='user_id'
            ),
            raw_data=source_df,
            schema=EventstreamSchema(),
        )
def filter_(df, schema):
    return ((df[schema.user_id].isin([2])) |
            (df.event_name.str.contains('event2')))
graph = PGraph(source_stream=stream)

delete_conditional = EventsNode(DeleteEvents(
                    params=DeleteEventsParams(filter=filter_)))

graph.add_node(node=delete_conditional, parents=[graph.root])
res = graph.combine(node=delete_conditional).to_dataframe()[correct_result_columns].reset_index(drop=True)

res.compare(correct_result).shape == (0, 0)

True