In [1]:
import clickhouse_driver
from sqlalchemy import create_engine

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

plt.style.use('dark_background')

pd.set_option('display.max_rows', 250)
pd.set_option('display.max_columns', 250)
pd.set_option('display.max_colwidth', 250)
host='10.129.11.136'
port='9000'
user='superset'
password='FmnaRL3cC6wnTkn'
dbname='superset'

con = create_engine("clickhouse://superset:FmnaRL3cC6wnTkn@10.129.11.136:8123/superset")
con.connect()

def get_subs(date, df):
    """Возвращает список подписчиков user_id на дату"""

    date = pd.to_datetime(date)
    empty_date = pd.Timestamp('1970-01-01')

    mask0 = pd.to_datetime(df['first_prolong_date']) > empty_date  # не пусто
    mask1 = pd.to_datetime(df['first_prolong_date']) <= date
    mask2 = pd.to_datetime(df['ends_at']) >= date

    return df['user_id'][mask0 & mask1 & mask2].unique()

def get_trials(date, df):
    """Возвращает список триалов user_id на дату"""
    date = pd.to_datetime(date)

    empty_date = pd.Timestamp('1970-01-01')

    mask0 = pd.to_datetime(df['created_at']) > empty_date  # не пусто
    mask1 = pd.to_datetime(df['created_at']) <= date
    mask2 = pd.to_datetime(df['ends_at']) >= date
    mask3 = (
        (pd.to_datetime(df['first_prolong_date']) == empty_date) |
        (pd.to_datetime(df['first_prolong_date']) > date)
    )

    return df['user_id'][mask0 & mask1 & mask2 & mask3].unique()

def determine_status(row, df, status_df):
    """Возвращает датасет с updated user_id подписчиками и триалами на дату"""

    date = pd.to_datetime(row['date']) # определенная дата
    users_subs = get_subs(date, df) # список подписчиков
    users_trial = get_trials(date, df) # список триалов

    users_subs_df = pd.DataFrame({'user_id': users_subs, 
        'status': ['sub'] * len(users_subs), 
        'date': [date] * len(users_subs)}
    ) # датасет подписчиков
    
    users_trial_df = pd.DataFrame({'user_id': users_trial, 
        'status': ['trial'] * len(users_trial), 
        'date': [date] * len(users_trial)}
    ) # датасет триалов

    return pd.concat([status_df, users_subs_df, users_trial_df])


def get_cohort(row):
    if row['promo_type'] == 'cards':
        return 'cards'

    promo_type, offer_duration, ub_type = row['promo_type'], row['offer_duration'], row['ub_type']
    if promo_type == 'promo':
        if offer_duration in ('1 month', '30 day'):
            return 'promo_1m'
        elif offer_duration == '3 month':
            return 'promo_3m'
        elif offer_duration in ('6 month', '200 day'):
            return 'promo_6m'
        elif offer_duration == '12 month':
            return 'promo_12m'

    elif promo_type == 'no_promo':
        if ub_type in ['UserReferralBonus', 'UserInvitationBonus']:
            return 'promo_1m'
        elif offer_duration in ('1 month', '30 day'):
            return 'organic_1m'
        elif offer_duration == '3 month':
            return 'organic_3m'
        elif offer_duration == '6 month':
            return 'organic_6m'
        elif offer_duration == '12 month':
            return 'organic_12m'

    return 'other'


def preprocess_users(df, cards=False):
    """Добавляет неделю и год к дате"""

    df['user_id'] = df['user_id'].astype(str)
    df['cohort'] = df.apply(get_cohort, axis=1)
    if not cards:
        df = df[df['cohort'] != 'cards'] # оставляем только B2C

    return df

def preprocess_watching(df):    
    df['user_id'] = df['user_id'].astype(str)
    df['date'] = pd.to_datetime(df['date'])

    df.loc[df['event'] == 'kinom', 'minutes'] = df.loc[df['event'] == 'kinom', 'minutes'].clip(upper=60)
    df.loc[df['event'] == 'tv', 'minutes'] = df.loc[df['event'] == 'tv', 'minutes'].clip(upper=120)
    df.loc[df['event'] == 'series', 'minutes'] = df.loc[df['event'] == 'series', 'minutes'].clip(upper=120)
    df.loc[df['event'] == 'movie', 'minutes'] = df.loc[df['event'] == 'movie', 'minutes'].clip(upper=120)

    df = df[df['minutes'] > 5]

    return df

# загружаем всех пользователей с подпиской и триалом
query = """
SELECT DISTINCT
    user_id, toDate(created_at) AS created_at, 
    CASE 
        WHEN toDate(first_prolong_date) != '1970-01-01'
        THEN toDate(first_prolong_date) - INTERVAL '1 day'
        ELSE toDate(ends_at)  
    END AS end_trial,
    toDate(ends_at) AS ends_at,
    toDate(first_prolong_date) AS first_prolong_date,
    promo_type, offer_duration, ub_type
FROM datamarts.marketing_dash_distr
WHERE created_at != '1970-01-01'
    AND ends_at > '2022-01-01'
    AND (
        (first_prolong_date > created_at) OR first_prolong_date = '1970-01-01'
        )
"""
users_df = pd.read_sql(query, con=con)
users_df = preprocess_users(users_df, cards=False)

# список дат 
start_date = datetime(2023, 9, 6)
end_date = datetime.now() - timedelta(days=1)
date_range = pd.date_range(start=start_date, end=end_date)
dates_df = pd.DataFrame(date_range, columns=['date'])

# пользователи со статусом
status_df = pd.DataFrame()
for index, row in dates_df.iterrows():
    status_df = determine_status(row, df=users_df, status_df=status_df)
status_df = status_df.merge(users_df[['user_id', 'cohort', 'created_at', 'end_trial']])
status_df = status_df[['date', 'user_id', 'cohort', 'status', 'created_at', 'end_trial']]
status_df.head()

# загружаем всех посетивших пользователей c 2023-09-06
query = """SELECT DISTINCT 
    toDate(utc_timestamp) AS date, user_id, 1 AS entered,
    min(utc_timestamp) AS min_time_entered
FROM datamarts.dash_table_distr
WHERE created_at != '1970-01-01'
    AND ends_at >= '2022-01-01'
    AND user_id IS NOT NULL
    AND client_type != 'backend'
    AND toDate(utc_timestamp) >= '2023-09-06'
GROUP BY 1,2,3
"""
entered_df = pd.read_sql(query, con=con)
entered_df['user_id'] = entered_df['user_id'].astype(str)
entered_df['date'] = pd.to_datetime(entered_df['date'])

# смотрение с 2023-09-06
query = """
SELECT 
    toDate(utc_timestamp) AS date,
    user_id,
    CASE
        WHEN client_type = 'web_desktop' THEN 'WEB'
        WHEN client_type IN ('ios', 'android', 'web_mobile') THEN 'Mobile'
        WHEN client_type IN ('Smart TV', 'smart_tv') THEN 'Smart_TV'
        ELSE 'Other'
    END AS device,
    CASE 
        WHEN event_name = 'auto_player_streaming' AND event_page = 'movie' THEN 'movie'
        WHEN event_name = 'auto_player_streaming' AND event_page = 'series' THEN 'series'
        WHEN event_name = 'auto_player_streaming' AND event_page = 'tvchannel' THEN 'tv'
        WHEN event_name = 'auto_kinom_streaming' THEN 'kinom'
        ELSE event_page
    END AS event,
    SUM(toFloat32OrZero(JSONExtractString(payload, 'viewing_time'))) / 60 AS minutes,
    MIN(utc_timestamp) AS min_time_watch
FROM datamarts.dash_table_distr
WHERE toDate(utc_timestamp) BETWEEN '2023-09-06' AND yesterday()
    AND event_name IN ('auto_player_streaming', 'auto_kinom_streaming')
    AND event_page IN ('movie', 'series', 'tvchannel', 'main', 'mychannel')
    AND user_id IS NOT NULL
GROUP BY 1,2,3,4
ORDER BY 1,2,3,4
"""
watch_df = pd.read_sql(query, con=con)
watch_df = preprocess_watching(watch_df)

# датасет активности со статусами
activity_df = status_df.merge(entered_df, how='left', on=['date', 'user_id'])
activity_df['entered'] = activity_df['entered'].fillna(0).astype(int)
activity_df = activity_df.merge(watch_df, how='left', on=['date', 'user_id'])
activity_df['min_time_entered'] = pd.to_datetime(activity_df['min_time_entered'])
activity_df['min_time_watch'] = pd.to_datetime(activity_df['min_time_watch'])
activity_df['delta_watch'] = activity_df['min_time_watch'] - activity_df['min_time_entered']
activity_df['delta_watch'] = activity_df['delta_watch'].dt.total_seconds() / 60
activity_df['delta_watch'] = np.where(activity_df['delta_watch'] > 0, activity_df['delta_watch'], 0)
activity_df = activity_df.sort_values(by=['date', 'user_id']).reset_index(drop=True)
activity_df.head()

Unnamed: 0,date,user_id,cohort,status,created_at,end_trial,entered,min_time_entered,device,event,minutes,min_time_watch,delta_watch
0,2023-09-06,0004c0e6-05dc-4c72-9366-a8356791b38a,promo_1m,trial,2023-09-06,2023-10-05,0,NaT,,,,NaT,0.0
1,2023-09-06,000f94cf-3520-4646-91fa-96e8056cf1a7,organic_1m,sub,2023-07-27,2023-08-09,0,NaT,,,,NaT,0.0
2,2023-09-06,0017e904-fa5a-4813-8c4b-3b867b2f477d,promo_1m,sub,2023-07-11,2023-08-14,0,NaT,,,,NaT,0.0
3,2023-09-06,002880e0-12a6-4425-bde8-57f7d7161677,promo_1m,sub,2023-06-17,2023-07-21,0,NaT,,,,NaT,0.0
4,2023-09-06,002e79ff-2d8d-4df5-9398-7ab2cbb54024,organic_1m,sub,2023-06-04,2023-07-03,0,NaT,,,,NaT,0.0


In [2]:
from sqlalchemy import create_engine

con = create_engine("postgresql+psycopg2://analytics:cv3uJY-CTq@10.129.0.20:6432/analytics")
con.connect()

<sqlalchemy.engine.base.Connection at 0x105a83340>

In [3]:
table_name = "activity_dash"
activity_df.to_sql(table_name, con, if_exists='replace', index=False)

pd.read_sql('SELECT count(*) from public.activity_dash', con=con)

Unnamed: 0,count
0,1813859
