In [14]:
import seaborn as sns

In [155]:
import pandas as pd
import numpy as np

import pandahouse as ph

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

from datetime import timedelta, datetime


# Подключение к БД
connection = {
    'host': 'https://clickhouse.lab.karpov.courses',
    'password': 
    'user': 'student',
    'database': 'simulator'
}
db='simulator_20240320'
# Для создания и пополнения таблицы
connection_test = {
    'host': 'https://clickhouse.lab.karpov.courses',
    'database':'test',
    'user':'student-rw', 
    'password':
}
default_args = {
    'owner': 'v.grabchuk',
    'depends_on_past': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=1),
    'start_date': datetime(2024, 2, 3),
}

# # Интервал запуска DAG
# schedule_interval = '1 0 * * *'

# 1. Статистика юзеров по ленте и мессенджеру

In [165]:
today = pd.Timestamp.today().date().strftime('%Y-%m-%d')
today = f"'{today}'"
today

"'2024-04-23'"

In [168]:
def users_feed_stat():
    # Статистика юзеров в feed_actions
    today = pd.Timestamp.today().date().strftime('%Y-%m-%d')
    today = f"'{today}'"
#     context = get_current_context()
#     today = context['ds']
#     today = f"'{today}'"
    q = f"""
    SELECT
    user_id,
    countIf(action='view') AS views,
    countIf(action='like') AS likes
    FROM {db}.feed_actions
    WHERE toDate(time)+1 == {today}
    GROUP BY user_id
    """
    df = ph.read_clickhouse(q, connection=connection)
    return df

def users_message_stat():
    # Статистика юзеров в message_actions
    today = pd.Timestamp.today().date().strftime('%Y-%m-%d')
    today = f"'{today}'"
#     context = get_current_context()
#     today = context['ds']
#     today = f"'{today}'"
    q = f"""
    WITH
        sent_t AS (
            SELECT
                user_id,
                COUNT(*) AS messages_sent,
                COUNT(DISTINCT receiver_id) AS users_sent
            FROM {db}.message_actions
            WHERE toDate(time)+1 == {today}
            GROUP BY user_id
        ),
        received_t_1 AS (
            SELECT
                receiver_id,
                COUNT(*) AS messages_received,
                COUNT(DISTINCT user_id) AS users_received
            FROM {db}.message_actions
            WHERE toDate(time)+1 == {today}
            GROUP BY receiver_id
        ),
        received_t AS (
            SELECT
                receiver_id AS user_id,
                messages_received,
                users_received
            FROM received_t_1
        )

    SELECT *
    FROM received_t
        FULL JOIN sent_t
            USING user_id
    """
    df = ph.read_clickhouse(q, connection=connection)
    return df

In [169]:
# Статистика
df_1 = users_feed_stat()
df_2 = users_message_stat()

In [170]:
df_1

Unnamed: 0,user_id,views,likes
0,38250,52,10
1,121096,56,6
2,159611,33,7
3,151408,65,9
4,120940,13,4
...,...,...,...
18374,116629,8,2
18375,151611,13,2
18376,124488,30,11
18377,3533,31,6


In [171]:
df_2

Unnamed: 0,user_id,messages_received,users_received,messages_sent,users_sent
0,8973,13,13,10,10
1,4660,5,5,6,6
2,18651,15,15,4,4
3,122087,1,1,6,6
4,128506,6,6,5,5
...,...,...,...,...,...
2069,129998,0,0,6,6
2070,18053,0,0,2,2
2071,110536,0,0,11,9
2072,129832,0,0,7,4


# 2. Объединение таблиц

In [172]:
def full_merge(df_1, df_2, on):
    # Объединение таблиц (FULL JOIN)
    df = df_1.merge(df_2, how='outer', on=on)
    df = df.fillna(0)
    return df

def cols_astype(df, cols, new_type):
    # Преобразование колонок к новому типу данных
    df[cols] = df[cols].astype(new_type)
    return df

In [173]:
# join
users_activity_df = full_merge(df_1, df_2, on='user_id')
# Приведение типов
columns = ['views', 'likes', 'messages_received', 'users_received',
   'messages_sent', 'users_sent']
users_activity_df = cols_astype(users_activity_df, columns, 'int')

In [174]:
users_activity_df

Unnamed: 0,user_id,views,likes,messages_received,users_received,messages_sent,users_sent
0,38250,52,10,0,0,0,0
1,121096,56,6,0,0,0,0
2,159611,33,7,0,0,0,0
3,151408,65,9,0,0,0,0
4,120940,13,4,0,0,0,0
...,...,...,...,...,...,...,...
20208,16762,0,0,0,0,2,2
20209,21334,0,0,0,0,2,2
20210,129998,0,0,0,0,6,6
20211,18053,0,0,0,0,2,2


# 3. Считаем все метрики в разрезе по полу, возрасту и ос (по таску на срез)

In [180]:
def get_info(agg_col, cols):
    # Инфа по agg_col, которая содержится в cols
    any_cols = ', '.join(f'any({col}) AS {col}' for col in cols)
    cols = ', '.join(cols)
    today = pd.Timestamp.today().date().strftime('%Y-%m-%d')
    today = f"'{today}'"
#     context = get_current_context()
#     today = context['ds']
#     today = f"'{today}'"
    q = f"""
    WITH
        feed_info_t AS (
            SELECT 
                {agg_col},
                {any_cols}
            FROM {db}.feed_actions
            WHERE toDate(time)+1 == {today}
            GROUP BY {agg_col}
        ),
        message_info_t AS (
            SELECT 
                {agg_col},
                {any_cols}
            FROM {db}.message_actions
            WHERE toDate(time)+1 == {today}
            GROUP BY {agg_col}
        ),
        union_t AS
        (
            SELECT *
            FROM feed_info_t
            UNION ALL
            SELECT *
            FROM message_info_t
        )


    SELECT DISTINCT {agg_col}, {cols}
    FROM union_t
    """
    df = ph.read_clickhouse(q, connection=connection)
    return df

def left_merge(df_1, df_2, on):
    df = df_1.merge(df_2, how='left', on=on)
    return df

def get_slice(df, agg_col, metric_cols, dropna=False):
    # Срез куба
    df = (
        df.groupby(agg_col, as_index=False, dropna=dropna)[metric_cols].sum()
        .rename(columns={agg_col: 'dimension_value'})
    )
    today = pd.Timestamp.today().date().strftime('%Y-%m-%d')
    today = f"'{today}'"
#     context = get_current_context()
#     today = pd.Series(context['ds'])
    info_df = pd.DataFrame({
        'event_date': today, 
        'dimension': pd.Series(agg_col)
    })
    df = info_df.merge(df, how='cross')
    return df

def concat(*args):
    df = pd.concat(args, ignore_index=True)
    return df

In [181]:
# Доп инфа
users_info_df = get_info(agg_col='user_id', cols=['gender', 'age', 'os'])
users_info_df = cols_astype(users_info_df, ['gender', 'age'], 'str')
# Кубик
giga_cube_df = left_merge(users_activity_df, users_info_df, on='user_id')
# Срезы
metric_cols = [
    'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent'
]
gender_slice_df = get_slice(giga_cube_df, agg_col='gender', metric_cols=metric_cols)
age_slice_df = get_slice(giga_cube_df, agg_col='age', metric_cols=metric_cols)
os_slice_df = get_slice(giga_cube_df, agg_col='os', metric_cols=metric_cols)
df = concat(gender_slice_df, age_slice_df, os_slice_df)

In [182]:
df

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,'2024-04-23',gender,0,282001,58304,4719,5568,4067,4387
1,'2024-04-23',gender,1,341235,70450,8105,7324,5899,5644
2,'2024-04-23',gender,,0,0,68,0,65,0
3,'2024-04-23',age,14,9717,2059,66,196,65,68
4,'2024-04-23',age,15,20560,4324,280,383,221,233
...,...,...,...,...,...,...,...,...,...
68,'2024-04-23',age,95,70,10,0,0,0,0
69,'2024-04-23',age,,0,0,68,0,65,0
70,'2024-04-23',os,Android,407369,84027,7025,8416,6200,6533
71,'2024-04-23',os,iOS,215867,44727,5799,4476,3766,3498


# 4. Записываем всё в одну таблицу

In [183]:
def create_table():
    q = '''
    CREATE TABLE IF NOT EXISTS test.gvs_etl (
        event_date Date,
        dimension String,
        dimension_value String,
        views UInt64,
        likes UInt64,
        messages_received UInt64,
        messages_sent UInt64,
        users_received UInt64,
        users_sent UInt64
    )
    ENGINE = MergeTree()
    ORDER BY (event_date, dimension, dimension_value)
    '''
    ph.execute(q, connection=connection_test)

def fill_table(df):
    # Присоединение df к таблице в clickhouse
    ph.to_clickhouse(
        df=df, table="gvs_etl", 
        index=False, connection=connection_test
    )

In [None]:
# # Пополнение таблицы
# create_table()
# fill_table(df)

In [148]:
# # Удаление таблицы
# query_test = '''
# DROP TABLE IF EXISTS test.gvs_etl
# '''
# ph.execute(query_test, connection=connection_test)

b''

In [184]:
# Посмотреть таблицу
q = f"""
SELECT *
FROM test.gvs_etl
ORDER BY event_date
"""
df_res = ph.read_clickhouse(q, connection=connection_test)
df_res

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2024-02-04,age,14,125,31,48,91,32,20
1,2024-02-04,age,15,223,40,48,89,30,37
2,2024-02-04,age,16,246,58,55,141,38,42
3,2024-02-04,age,17,349,70,73,167,61,62
4,2024-02-04,age,18,351,85,173,160,93,73
...,...,...,...,...,...,...,...,...,...
1964,2024-03-04,gender,0,161393,34253,5748,5835,3953,3939
1965,2024-03-04,gender,1,204845,43508,7141,7063,4590,4613
1966,2024-03-04,os,,0,0,9,0,9,0
1967,2024-03-04,os,Android,235567,50310,7881,8213,5503,5493


In [154]:
df_res.query('dimension == "gender"').dimension_value.value_counts()

dimension_value
0    7
1    7
     2
Name: count, dtype: int64

In [144]:
df_res.query('dimension == "os"').dimension_value.value_counts()

dimension_value
Android    60
iOS        60
           41
Name: count, dtype: int64

In [261]:
q = f"""
SELECT
    toDate(time) AS event_date,
    os AS dimension_value,
    countIf(user_id, action=='view') AS views,
    countIf(user_id, action=='like') AS likes 
FROM {db}.feed_actions
GROUP BY event_date, os
"""
df_1 = ph.read_clickhouse(q, connection=connection)
df_1

Unnamed: 0,event_date,dimension_value,views,likes
0,2024-03-01,Android,270583,57649
1,2024-03-30,iOS,254271,54138
2,2024-03-29,iOS,231701,51252
3,2024-03-04,Android,292427,62752
4,2024-03-06,Android,372969,77390
...,...,...,...,...
157,2024-02-26,Android,228313,47381
158,2024-03-24,iOS,229679,47598
159,2024-02-28,Android,306814,67266
160,2024-03-02,Android,244932,50850


In [185]:
# Посмотреть таблицу
q = f"""
SELECT *
FROM test.gvs_etl
ORDER BY event_date
"""
df_2 = ph.read_clickhouse(q, connection=connection_test)
df_2

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2024-02-04,age,14,125,31,48,91,32,20
1,2024-02-04,age,15,223,40,48,89,30,37
2,2024-02-04,age,16,246,58,55,141,38,42
3,2024-02-04,age,17,349,70,73,167,61,62
4,2024-02-04,age,18,351,85,173,160,93,73
...,...,...,...,...,...,...,...,...,...
1964,2024-03-04,gender,0,161393,34253,5748,5835,3953,3939
1965,2024-03-04,gender,1,204845,43508,7141,7063,4590,4613
1966,2024-03-04,os,,0,0,9,0,9,0
1967,2024-03-04,os,Android,235567,50310,7881,8213,5503,5493
