In [1]:
from datetime import datetime, timedelta

import pandahouse
import pandas as pd

In [2]:
def get_current_context():
    return {'ds': '2022-04-20'}

In [10]:
def read_db(query: str) -> pd.DataFrame:
    connection = {
        'host': 'https://clickhouse...',
        'password': '',
        'user': '',
        'database': 'simulator_20220420'
    }
    
    data = pandahouse.read_clickhouse(query, connection=connection)
    
    return data

In [11]:
def extract_feed() -> pd.DataFrame:
    context = get_current_context()

    query = f"""
    SELECT 
      user_id, 
      sum(action = 'view') as views, 
      sum(action = 'like') as likes
    FROM simulator_20220420.feed_actions 
    WHERE toDate(time) = toDate('{context["ds"]}') - 1
    GROUP BY user_id"""

    feed_data = read_db(query=query)
    return feed_data

In [12]:
def extract_messages() -> pd.DataFrame:
    context = get_current_context()

    query = f"""
    WITH sent as (
      SELECT 
        user_id,
        count(user_id) messages_sent, -- отсылает сообщений
        count(distinct reciever_id) users_sent -- скольким людям он пишет
      FROM simulator_20220420.message_actions snd
      WHERE toDate(time) = toDate('{context["ds"]}') - 1
      GROUP BY user_id
    )
    , received as (
      SELECT
        reciever_id,
        count(user_id) messages_received, -- сколько он получает
        count(distinct user_id) users_received -- сколько людей пишут ему
      FROM simulator_20220420.message_actions snd
      WHERE toDate(time) = toDate('{context["ds"]}') - 1
      GROUP BY reciever_id
    )

    SELECT 
      if(user_id = 0, reciever_id, user_id) user_id,
      messages_sent,
      users_sent,
      messages_received,
      users_received

    FROM sent
    FULL JOIN received 
      ON sent.user_id = received.reciever_id"""

    message_data = read_db(query=query)
    return message_data

In [13]:
def extract_user_data() -> pd.DataFrame:
    context = get_current_context()
    query = f"""
    WITH users as (
        SELECT distinct user_id, gender, age, os
        from simulator_20220420.feed_actions 
        WHERE toDate(time) = toDate('{context["ds"]}') - 1

        UNION DISTINCT

        SELECT distinct user_id, gender, age, os
        from simulator_20220420.message_actions 
        WHERE toDate(time) = toDate('{context["ds"]}') - 1

        UNION DISTINCT

        SELECT distinct s.user_id, s.gender, s.age, s.os 
        from (
          SELECT DISTINCT s.user_id, s.gender, s.age, s.os  
          from simulator_20220420.message_actions r
          inner join simulator_20220420.message_actions s on s.user_id = r.reciever_id 
          WHERE toDate(r.time) = toDate('{context["ds"]}') - 1
        )
    )
    select * from users
     """
    user_data = read_db(query)
    return user_data

In [14]:
def merge_data(feed_data: pd.DataFrame, message_data: pd.DataFrame, user_data: pd.DataFrame) -> pd.DataFrame:
    merged_data = (
        user_data
        .merge(feed_data, how='left', on='user_id')
        .merge(message_data, how='left', on='user_id')
        .fillna(0)
    )
    
    return merged_data

In [15]:
feed_data = extract_feed()
message_data = extract_messages()
user_data = extract_user_data()

In [16]:
merged_data = merge_data(feed_data, message_data, user_data)

In [17]:
# 3. Для этой таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. Делаем три разных таска на каждый срез.

In [55]:
def transform_by_gender(merged_data):
    stat_by_gender = (
        merged_data
        .groupby('gender')
        .sum()
        .drop(['user_id', 'age'], axis=1)
        .reset_index()
        .rename(columns={'gender': 'metric_slice'})
    )
    stat_by_gender['metric_name'] = 'gender'
    
    return stat_by_gender

In [57]:
def transform_by_age(merged_data):
    stat_by_age = (
        merged_data
        .groupby('age')
        .sum()
        .drop(['user_id', 'gender'], axis=1)
        .reset_index()
        .rename(columns={'age': 'metric_slice'})
    )
    stat_by_age['metric_name'] = 'age'
    
    return stat_by_age

In [58]:
def transform_by_os(merged_data):
    stat_by_os = (
        merged_data
        .groupby('os')
        .sum()
        .drop(['user_id', 'gender', 'age'], axis=1)
        .reset_index()
        .rename(columns={'os': 'metric_slice'})
    )
    stat_by_os['metric_name'] = 'os'
    
    return stat_by_os

In [59]:
stat_by_gender = transform_by_gender(merged_data)
stat_by_age = transform_by_age(merged_data)
stat_by_os = transform_by_os(merged_data)

In [60]:
final_stat = pd.concat([stat_by_gender, stat_by_age, stat_by_os])

In [61]:
final_stat.columns

Index(['metric_slice', 'views', 'likes', 'messages_sent', 'users_sent',
       'messages_received', 'users_received', 'metric_name'],
      dtype='object')

In [66]:
final_stat['event_date'] = datetime.strptime(get_current_context()['ds'], '%Y-%m-%d') - timedelta(days=1)

In [67]:
final_stat

Unnamed: 0,metric_slice,views,likes,messages_sent,users_sent,messages_received,users_received,metric_name,event_date
0,0,261715.0,59211.0,5667.0,3888.0,5399.0,3948.0,gender,2022-04-19
1,1,313802.0,71041.0,7232.0,4937.0,7500.0,4877.0,gender,2022-04-19
0,14,8465.0,1869.0,221.0,89.0,253.0,105.0,age,2022-04-19
1,15,19076.0,4227.0,440.0,238.0,169.0,134.0,age,2022-04-19
2,16,22660.0,5062.0,425.0,279.0,467.0,237.0,age,2022-04-19
...,...,...,...,...,...,...,...,...,...
59,73,62.0,16.0,9.0,2.0,0.0,0.0,age,2022-04-19
60,75,16.0,3.0,0.0,0.0,0.0,0.0,age,2022-04-19
61,78,58.0,10.0,0.0,0.0,0.0,0.0,age,2022-04-19
0,Android,371017.0,84050.0,8688.0,5958.0,8334.0,5932.0,os,2022-04-19


In [68]:
final_stat[['event_date', 'metric_name', 'metric_slice', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]

Unnamed: 0,event_date,metric_name,metric_slice,views,likes,messages_received,messages_sent,users_received,users_sent
0,2022-04-19,gender,0,261715.0,59211.0,5399.0,5667.0,3948.0,3888.0
1,2022-04-19,gender,1,313802.0,71041.0,7500.0,7232.0,4877.0,4937.0
0,2022-04-19,age,14,8465.0,1869.0,253.0,221.0,105.0,89.0
1,2022-04-19,age,15,19076.0,4227.0,169.0,440.0,134.0,238.0
2,2022-04-19,age,16,22660.0,5062.0,467.0,425.0,237.0,279.0
...,...,...,...,...,...,...,...,...,...
59,2022-04-19,age,73,62.0,16.0,0.0,9.0,0.0,2.0
60,2022-04-19,age,75,16.0,3.0,0.0,0.0,0.0,0.0
61,2022-04-19,age,78,58.0,10.0,0.0,0.0,0.0,0.0
0,2022-04-19,os,Android,371017.0,84050.0,8334.0,8688.0,5932.0,5958.0


In [77]:
def prepare_df_to_load(stat_by_gender, stat_by_age, stat_by_os):
    final_stat = pd.concat([stat_by_gender, stat_by_age, stat_by_os])
    final_stat['event_date'] = datetime.strptime(get_current_context()['ds'], '%Y-%m-%d') - timedelta(days=1)

    col_order = ['event_date', 'metric_name', 'metric_slice', 'views', 'likes', 
                 'messages_received', 'messages_sent', 'users_received', 'users_sent']
    
    final_stat = final_stat[col_order]
    
    final_stat = final_stat.astype({
        'metric_name': str,
        'metric_slice': str,
        'views': int,
        'likes': int,
        'messages_received': int,
        'messages_sent': int,
        'users_received': int,
        'users_sent': int,
    })

    return final_stat

In [78]:
df = prepare_df_to_load(stat_by_gender, stat_by_age, stat_by_os)

In [79]:
df.head()

Unnamed: 0,event_date,metric_name,metric_slice,views,likes,messages_received,messages_sent,users_received,users_sent
0,2022-04-19,gender,0,261715,59211,5399,5667,3948,3888
1,2022-04-19,gender,1,313802,71041,7500,7232,4877,4937
0,2022-04-19,age,14,8465,1869,253,221,105,89
1,2022-04-19,age,15,19076,4227,169,440,134,238
2,2022-04-19,age,16,22660,5062,467,425,237,279


In [72]:
connection_write = {
    'host': 'https://clickhouse...',
    'password': '',
    'user': '',
    'database': 'test'
}

In [89]:
df.dtypes

event_date           datetime64[ns]
metric_name                  object
metric_slice                 object
views                         int64
likes                         int64
messages_received             int64
messages_sent                 int64
users_received                int64
users_sent                    int64
dtype: object

In [92]:
create_table_query = """
CREATE TABLE IF NOT EXISTS test.m_surkova_6_report
(
    event_date String NOT NULL,
    metric_name String,
    metric_slice String,
    views Int64 NULL,
    likes Int64 NULL,
    messages_received Int64 NULL,
    messages_sent Int64 NULL,
    users_received Int64 NULL,
    users_sent Int64 NULL
    
) ENGINE = MergeTree ORDER BY (event_date)
"""

In [93]:
pandahouse.execute(create_table_query, connection_write)

b''

In [91]:
pandahouse.execute("drop table test.m_surkova_6_report", connection_write)

b''

In [94]:
pandahouse.to_clickhouse(
    df=df, 
    table='m_surkova_6_report', index=False, connection=connection_write)

66

In [90]:
pandahouse.execute('select * from test.m_surkova_6_report', connection_write)

b''