# SQL 2 - задача на оконные функции.

Составить sql запрос по задаче:  
Сделать выгрузку по терминалам, в которых в течение короткого периода времени (временного окна) провели оплаты несколько клиентов

Входные данные:  
- Период выгрузки с сентября 2019 по конец января 2020  
- Временное окно операций - 2 часа  
- Кол-во уникальных клиентов, которые совершили операции в «окне» - 3 и более клиентов  

Какие поля содержит рабочая таблица (operations):
- id - индекс (номер операции), тип int  
- amount - сумма операции, тип float  
- user_id - номер клиента, тип int  
- term_id - номер банкомата, тип int  
- created_at - дата операции, тип time stamp  

На выходе должна получиться таблица со списком терминалов и операций, удовлетворяющих условию.


#### Решение 
Решение задачи реализовано непосредственно в jupyter на базе PostgreSQL с помощью библеотеки sqlalchemy.

In [1]:
# Импортируем библеотеки. 

import pandas as pd
import numpy as np

В начале создадим датасет аналогично таблице **operations** для проверки создаваемого кода. 

In [2]:
# Создаём вручную небольшой датасет из 10 строк, в котором:
    # id - индекс (номер операции), тип int - ОТ "9990" ДО "9999"
    # amount - сумма операции, тип float - СЛУЧАЙНЫЕ СУММЫ
    # user_id - номер клиента, тип int - ОТ 1 ДО 10
    # term_id - номер банкомата, тип int - ДВА ТЕРМИНАЛА "70" И "75"
    # created_at - дата операции, тип time stamp - СЛУЧАЙНЫЕ ДАТЫ


df = pd.DataFrame({
    'id':[
        9990,9991,9992,9993,9994,
        9995,9996,9997,9998,9999], 
    'amount':[
        100.2,109.5,130.5,150.7,170.8,
        210.8,222.5,240.4,280.3,290.9], 
    'user_id':[
        1,2,3,4,5,
        6,7,8,9,10], 
    'term_id':[
        70,70,70,70,70,
        75,75,75,75,75],
    'created_at':pd.to_datetime([
        '2020-03-01 00:01:00','2020-03-01 00:23:00','2020-03-01 01:36:00','2020-03-01 02:45:00','2020-03-01 02:50:00',
        '2020-01-01 00:01:00','2020-01-01 00:36:00','2020-01-01 01:47:00','2020-01-01 02:00:00','2020-01-01 05:27:00'],
        format='%Y-%m-%d %H:%M:%S')
})

# Посмотрим на датасет.
df

Unnamed: 0,id,amount,user_id,term_id,created_at
0,9990,100.2,1,70,2020-03-01 00:01:00
1,9991,109.5,2,70,2020-03-01 00:23:00
2,9992,130.5,3,70,2020-03-01 01:36:00
3,9993,150.7,4,70,2020-03-01 02:45:00
4,9994,170.8,5,70,2020-03-01 02:50:00
5,9995,210.8,6,75,2020-01-01 00:01:00
6,9996,222.5,7,75,2020-01-01 00:36:00
7,9997,240.4,8,75,2020-01-01 01:47:00
8,9998,280.3,9,75,2020-01-01 02:00:00
9,9999,290.9,10,75,2020-01-01 05:27:00


In [3]:
# Проверим правильность форматов данных в датасете.

df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   id          10 non-null     int64         
 1   amount      10 non-null     float64       
 2   user_id     10 non-null     int64         
 3   term_id     10 non-null     int64         
 4   created_at  10 non-null     datetime64[ns]
dtypes: datetime64[ns](1), float64(1), int64(3)
memory usage: 528.0 bytes


In [4]:
# Все отлично. Заливаем таблицу в sql.

from sqlalchemy import create_engine
con = create_engine('postgresql+psycopg2://fnvidjvc:bvCt7HVPVhjmphQGbtgw0W_RcyyQoxih@hattie.db.elephantsql.com/fnvidjvc')

df.to_sql('operations',con,index=False,if_exists='replace',method='multi')

10

**Переходим к SQL.**

In [5]:
# Посмотрим на получившуюся таблицу через sql.

sql = '''
SELECT 
op.*
FROM operations AS op
'''

In [6]:
pd.read_sql(sql, con)

Unnamed: 0,id,amount,user_id,term_id,created_at
0,9990,100.2,1,70,2020-03-01 00:01:00
1,9991,109.5,2,70,2020-03-01 00:23:00
2,9992,130.5,3,70,2020-03-01 01:36:00
3,9993,150.7,4,70,2020-03-01 02:45:00
4,9994,170.8,5,70,2020-03-01 02:50:00
5,9995,210.8,6,75,2020-01-01 00:01:00
6,9996,222.5,7,75,2020-01-01 00:36:00
7,9997,240.4,8,75,2020-01-01 01:47:00
8,9998,280.3,9,75,2020-01-01 02:00:00
9,9999,290.9,10,75,2020-01-01 05:27:00


In [7]:
# Проверим форматы данных.

sql = '''
SELECT
column_name
, data_type
FROM information_schema.columns
WHERE table_name = 'operations'
'''

In [8]:
pd.read_sql(sql, con)

Unnamed: 0,column_name,data_type
0,id,bigint
1,amount,double precision
2,user_id,bigint
3,term_id,bigint
4,created_at,timestamp without time zone


В решении задачи необходимо задействовать оконные функции и скользящие операторы. 

In [9]:
# ШАГ 1: вычисляем разницу между соседними датами.
# Используем этот столбец для визуального удобства проверки правильности вычисляемого запроса.

# Создаём новый столбец "time_diff", который является разницей между текущей и предыдущей датой.
# Используем оконную функцию для расчета по каждому терминалу.



sql = '''
SELECT
op.*,
op.created_at - lag(op.created_at)
    OVER (partition by op.term_id
    ORDER BY  op.created_at, op.term_id) AS time_diff
FROM operations AS op
'''

In [10]:
pd.read_sql(sql, con)

Unnamed: 0,id,amount,user_id,term_id,created_at,time_diff
0,9990,100.2,1,70,2020-03-01 00:01:00,NaT
1,9991,109.5,2,70,2020-03-01 00:23:00,0 days 00:22:00
2,9992,130.5,3,70,2020-03-01 01:36:00,0 days 01:13:00
3,9993,150.7,4,70,2020-03-01 02:45:00,0 days 01:09:00
4,9994,170.8,5,70,2020-03-01 02:50:00,0 days 00:05:00
5,9995,210.8,6,75,2020-01-01 00:01:00,NaT
6,9996,222.5,7,75,2020-01-01 00:36:00,0 days 00:35:00
7,9997,240.4,8,75,2020-01-01 01:47:00,0 days 01:11:00
8,9998,280.3,9,75,2020-01-01 02:00:00,0 days 00:13:00
9,9999,290.9,10,75,2020-01-01 05:27:00,0 days 03:27:00


In [11]:
# ШАГ 2: считаем клиентов в временном окне.

# Создаём вспомогательные столбцы подсчёта количества клиентов в временном окне (2 часа) для каждого терминала:
    # Столбец "moving_cnt_userid" - скользящий подсчёт количество клиентов в окне от текущей даты - 2 часа;
    # Столбец "moving_cnt_userid_rev" - скользящий подсчёт количество клиентов в окне от текущей даты + 2 часа.
# Используем оконную функцию для расчета по каждому терминалу.



sql = '''
SELECT
op.*,
(op.created_at - lag(op.created_at)
    OVER (partition by op.term_id
    ORDER BY  op.created_at, op.term_id)) AS time_diff,
count(op.user_id)
    OVER (partition by op.term_id
    ORDER BY cast(op.created_at as timestamp)
    range BETWEEN interval '2' hour preceding AND current row) AS moving_cnt_userid,
count(op.user_id)
    OVER (partition by op.term_id
    ORDER BY cast(op.created_at as timestamp)
    range BETWEEN current row AND interval '2' hour following) AS moving_cnt_userid_rev
FROM operations AS op
'''

In [12]:
pd.read_sql(sql, con)

Unnamed: 0,id,amount,user_id,term_id,created_at,time_diff,moving_cnt_userid,moving_cnt_userid_rev
0,9990,100.2,1,70,2020-03-01 00:01:00,NaT,1,3
1,9991,109.5,2,70,2020-03-01 00:23:00,0 days 00:22:00,2,2
2,9992,130.5,3,70,2020-03-01 01:36:00,0 days 01:13:00,3,3
3,9993,150.7,4,70,2020-03-01 02:45:00,0 days 01:09:00,2,2
4,9994,170.8,5,70,2020-03-01 02:50:00,0 days 00:05:00,3,1
5,9995,210.8,6,75,2020-01-01 00:01:00,NaT,1,4
6,9996,222.5,7,75,2020-01-01 00:36:00,0 days 00:35:00,2,3
7,9997,240.4,8,75,2020-01-01 01:47:00,0 days 01:11:00,3,2
8,9998,280.3,9,75,2020-01-01 02:00:00,0 days 00:13:00,4,1
9,9999,290.9,10,75,2020-01-01 05:27:00,0 days 03:27:00,1,1


In [13]:
# ШАГ 3: отмечаем максимальное количество клиентов в окне для каждой операции.

# Убираем код с предыдущего шага в подзапрос через with.
# Создаём вспомогательные столбцы с указанием максимального количества клиентов в окне для каждой операции:
    # Столбец "max_cnt" - скользящий подсчёт максимального количества клиентов в окне от текущей даты - 2 часа;
    # Столбец "max_cnt_rev" - скользящий подсчёт максимального количества клиентов в окне от текущей даты + 2 часа.
# Это необходимо, чтобы после правильно выделить операции, подходящие под условия задачи.
# Используем оконную функцию для расчета по каждому терминалу.



sql = '''
with operations_new as
    (SELECT
    op.*,
    (op.created_at - lag(op.created_at)
        OVER (partition by op.term_id
        ORDER BY  op.created_at, op.term_id)) AS time_diff,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN interval '2' hour preceding AND current row) AS moving_cnt_userid,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN current row AND interval '2' hour following) AS moving_cnt_userid_rev
    FROM operations AS op)



SELECT
opn.*,
max(opn.moving_cnt_userid)
    OVER (partition by opn.term_id
    ORDER BY cast(opn.created_at as timestamp)
    range BETWEEN interval '2' hour preceding AND current row) AS max_cnt,
max(opn.moving_cnt_userid_rev)
    OVER (partition by opn.term_id
    ORDER BY cast(opn.created_at as timestamp)
    range BETWEEN current row AND interval '2' hour following) AS max_cnt_rev
FROM operations_new AS opn
'''

In [14]:
pd.read_sql(sql, con)

Unnamed: 0,id,amount,user_id,term_id,created_at,time_diff,moving_cnt_userid,moving_cnt_userid_rev,max_cnt,max_cnt_rev
0,9990,100.2,1,70,2020-03-01 00:01:00,NaT,1,3,1,3
1,9991,109.5,2,70,2020-03-01 00:23:00,0 days 00:22:00,2,2,2,3
2,9992,130.5,3,70,2020-03-01 01:36:00,0 days 01:13:00,3,3,3,3
3,9993,150.7,4,70,2020-03-01 02:45:00,0 days 01:09:00,2,2,3,2
4,9994,170.8,5,70,2020-03-01 02:50:00,0 days 00:05:00,3,1,3,1
5,9995,210.8,6,75,2020-01-01 00:01:00,NaT,1,4,1,4
6,9996,222.5,7,75,2020-01-01 00:36:00,0 days 00:35:00,2,3,2,3
7,9997,240.4,8,75,2020-01-01 01:47:00,0 days 01:11:00,3,2,3,2
8,9998,280.3,9,75,2020-01-01 02:00:00,0 days 00:13:00,4,1,4,1
9,9999,290.9,10,75,2020-01-01 05:27:00,0 days 03:27:00,1,1,1,1


In [15]:
# ШАГ 4: маркеруем операции с максимальным количеством клиентов в окне не менее 3.

# Убираем код с предыдущего шага в подзапрос через with.
# Маркеруем в отдельном столбце "flag" операции, в которых максимальное количество клиентов в окне не менее 3.
    # при маркеровании смотрим одновременно на столбецы "max_cnt" и "max_cnt_rev".



sql = '''
with operations_new as
    (SELECT
    op.*,
    (op.created_at - lag(op.created_at)
        OVER (partition by op.term_id
        ORDER BY  op.created_at, op.term_id)) AS time_diff,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN interval '2' hour preceding AND current row) AS moving_cnt_userid,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN current row AND interval '2' hour following) AS moving_cnt_userid_rev
    FROM operations AS op),

operations_new_2 as 
    (SELECT
    opn.*,
    max(opn.moving_cnt_userid)
        OVER (partition by opn.term_id
        ORDER BY cast(opn.created_at as timestamp)
        range BETWEEN interval '2' hour preceding AND current row) AS max_cnt,
    max(opn.moving_cnt_userid_rev)
        OVER (partition by opn.term_id
        ORDER BY cast(opn.created_at as timestamp)
        range BETWEEN current row AND interval '2' hour following) AS max_cnt_rev
    FROM operations_new AS opn)



SELECT
opn2.*,
CASE WHEN opn2.max_cnt >= 3 OR opn2.max_cnt_rev >= 3
    THEN 1
    ELSE 0 end as flag
FROM operations_new_2 as opn2
'''

In [16]:
pd.read_sql(sql, con)

Unnamed: 0,id,amount,user_id,term_id,created_at,time_diff,moving_cnt_userid,moving_cnt_userid_rev,max_cnt,max_cnt_rev,flag
0,9990,100.2,1,70,2020-03-01 00:01:00,NaT,1,3,1,3,1
1,9991,109.5,2,70,2020-03-01 00:23:00,0 days 00:22:00,2,2,2,3,1
2,9992,130.5,3,70,2020-03-01 01:36:00,0 days 01:13:00,3,3,3,3,1
3,9993,150.7,4,70,2020-03-01 02:45:00,0 days 01:09:00,2,2,3,2,1
4,9994,170.8,5,70,2020-03-01 02:50:00,0 days 00:05:00,3,1,3,1,1
5,9995,210.8,6,75,2020-01-01 00:01:00,NaT,1,4,1,4,1
6,9996,222.5,7,75,2020-01-01 00:36:00,0 days 00:35:00,2,3,2,3,1
7,9997,240.4,8,75,2020-01-01 01:47:00,0 days 01:11:00,3,2,3,2,1
8,9998,280.3,9,75,2020-01-01 02:00:00,0 days 00:13:00,4,1,4,1,1
9,9999,290.9,10,75,2020-01-01 05:27:00,0 days 03:27:00,1,1,1,1,0


In [17]:
# ШАГ 5: оставляем в качестве ответа только список терминалов и операций, удовлетворяющих требованиям задачи.

# Убираем код с предыдущего шага в подзапрос через with.
# В выводе оставляем только нужные данные.
# Добавляем ещё условие периода вывода данных с сентября 2019 по конец января 2020.


sql = '''
with operations_new as
    (SELECT
    op.*,
    (op.created_at - lag(op.created_at)
        OVER (partition by op.term_id
        ORDER BY  op.created_at, op.term_id)) AS time_diff,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN interval '2' hour preceding AND current row) AS moving_cnt_userid,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN current row AND interval '2' hour following) AS moving_cnt_userid_rev
    FROM operations AS op),

operations_new_2 as 
    (SELECT
    opn.*,
    max(opn.moving_cnt_userid)
        OVER (partition by opn.term_id
        ORDER BY cast(opn.created_at as timestamp)
        range BETWEEN interval '2' hour preceding AND current row) AS max_cnt,
    max(opn.moving_cnt_userid_rev)
        OVER (partition by opn.term_id
        ORDER BY cast(opn.created_at as timestamp)
        range BETWEEN current row AND interval '2' hour following) AS max_cnt_rev
    FROM operations_new AS opn),

operations_new_3 as 
    (SELECT
    opn2.*,
    CASE WHEN opn2.max_cnt >= 3 OR opn2.max_cnt_rev >= 3
        THEN 1
        ELSE 0 end as flag
    FROM operations_new_2 as opn2)



SELECT
opn3.id,
opn3.term_id
FROM operations_new_3 as opn3
WHERE flag = 1 AND 
    opn3.created_at >= '2019-09-01 00:00:00' AND
    opn3.created_at <= '2020-12-31 23:59:59'   
'''

In [18]:
pd.read_sql(sql, con)

Unnamed: 0,id,term_id
0,9990,70
1,9991,70
2,9992,70
3,9993,70
4,9994,70
5,9995,75
6,9996,75
7,9997,75
8,9998,75


**Тестируем итоговый код на данных.**

In [19]:
# Импортируем библеотеки. 

import pandas as pd
import numpy as np

In [20]:
# Загружаем подготовленный датасет в 10 тыс. строк, в котором:
    # id - уникальный номер от 10000 до 19999
    # amount - случайное число в диапозоне от 100 до 10000
    # user_id - уникальный номер от 1 до 100
    # term_id - случайное число в диапозоне от 70 до 79
    # created_at - случайная дата от '2019-01-01 00:00:00' до '2020-12-31 23:59:59'

df_test = pd.read_csv('https://raw.githubusercontent.com/zhukov-analyst/portfolio/main/sql_data/operations_test.csv', sep=';')

In [21]:
# Конвертируем правильный формат даты.

df_test['created_at'] = pd.to_datetime(df_test['created_at'],format='%Y-%m-%d %H:%M:%S')

# Посмотрим на данные.

df_test

Unnamed: 0,id,amount,user_id,term_id,created_at
0,10000,2643,90,70,2020-05-04 19:37:08
1,10001,9468,2,75,2020-07-03 16:38:07
2,10002,9996,60,70,2019-02-04 18:55:19
3,10003,9005,59,75,2020-09-18 14:58:19
4,10004,5032,57,73,2019-09-17 14:00:00
...,...,...,...,...,...
9995,19995,4338,34,75,2020-07-17 10:04:40
9996,19996,6243,60,70,2020-06-04 20:49:41
9997,19997,2175,47,73,2019-03-02 12:41:11
9998,19998,9412,56,77,2019-01-04 13:57:52


In [22]:
# Проверим правильность форматов данных в датасете.

df_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   id          10000 non-null  int64         
 1   amount      10000 non-null  int64         
 2   user_id     10000 non-null  int64         
 3   term_id     10000 non-null  int64         
 4   created_at  10000 non-null  datetime64[ns]
dtypes: datetime64[ns](1), int64(4)
memory usage: 390.8 KB


In [23]:
# Все отлично. Заливаем таблицу в sql.

from sqlalchemy import create_engine
con_test = create_engine('postgresql+psycopg2://fnvidjvc:bvCt7HVPVhjmphQGbtgw0W_RcyyQoxih@hattie.db.elephantsql.com/fnvidjvc')

df_test.to_sql('operations_test',con_test,index=False,if_exists='replace',method='multi')

10000

In [24]:
# Посмотрим на получившуюся таблицу через sql.

sql_test = '''
SELECT 
op.*
FROM operations_test AS op
'''

In [25]:
pd.read_sql(sql_test, con_test)

Unnamed: 0,id,amount,user_id,term_id,created_at
0,10000,2643,90,70,2020-05-04 19:37:08
1,10001,9468,2,75,2020-07-03 16:38:07
2,10002,9996,60,70,2019-02-04 18:55:19
3,10003,9005,59,75,2020-09-18 14:58:19
4,10004,5032,57,73,2019-09-17 14:00:00
...,...,...,...,...,...
9995,19995,4338,34,75,2020-07-17 10:04:40
9996,19996,6243,60,70,2020-06-04 20:49:41
9997,19997,2175,47,73,2019-03-02 12:41:11
9998,19998,9412,56,77,2019-01-04 13:57:52


In [26]:
# Выполним итоговый запрос.



sql_test = '''
with operations_new as
    (SELECT
    op.*,
    (op.created_at - lag(op.created_at)
        OVER (partition by op.term_id
        ORDER BY  op.created_at, op.term_id)) AS time_diff,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN interval '2' hour preceding AND current row) AS moving_cnt_userid,
    count(op.user_id)
        OVER (partition by op.term_id
        ORDER BY cast(op.created_at as timestamp)
        range BETWEEN current row AND interval '2' hour following) AS moving_cnt_userid_rev
    FROM operations_test AS op),

operations_new_2 as 
    (SELECT
    opn.*,
    max(opn.moving_cnt_userid)
        OVER (partition by opn.term_id
        ORDER BY cast(opn.created_at as timestamp)
        range BETWEEN interval '2' hour preceding AND current row) AS max_cnt,
    max(opn.moving_cnt_userid_rev)
        OVER (partition by opn.term_id
        ORDER BY cast(opn.created_at as timestamp)
        range BETWEEN current row AND interval '2' hour following) AS max_cnt_rev
    FROM operations_new AS opn),

operations_new_3 as 
    (SELECT
    opn2.*,
    CASE WHEN opn2.max_cnt >= 3 OR opn2.max_cnt_rev >= 3
        THEN 1
        ELSE 0 end as flag
    FROM operations_new_2 as opn2)



SELECT
opn3.id,
opn3.term_id
FROM operations_new_3 as opn3
WHERE flag = 1 AND 
    opn3.created_at >= '2019-09-01 00:00:00' AND
    opn3.created_at <= '2020-12-31 23:59:59'   
'''

In [27]:
pd.read_sql(sql_test, con_test)

Unnamed: 0,id,term_id
0,11956,70
1,17969,70
2,18028,70
3,13571,70
4,10412,70
...,...,...
260,12345,79
261,17511,79
262,12882,79
263,14534,79


**Запрос работает.**

Задача выполнена частично - запрос учитывает всех клиентов, совершающих операцию "в окне".  
Проблему учёта только уникальных клиентов можно было бы реализовать через функция DISTINCT, но она не реализована в оконных функциях.
Можно попробовать решить через функцию ранжирования user_id внутри "окон".