In [1]:
# https://pf.mgcom.ru/task/1433882

In [1]:
import pandas as pd
import numpy as np
import os
from datetime import datetime, timedelta
from decimal import Decimal, ROUND_HALF_UP

In [2]:
#AM Orders For Matching
PATH_AM_ORDERS_FOR_MATCHING = r'D:\Work\Rigla__Bud_Zdorov\2024-09-12\Output\2024-09-12_165117_am_orders_for_dwh_matching_2024-05-01__2024-05-06.csv'

#DWH перед этим пересохранить файлы двх клиента, изменить источник файла на юникод utf-8 и разделитель - запятая
DWH_START_DATE = '2024-05-01'
DWH_END_DATE = '2024-05-06'
PATH_DWH_BZ = r'D:\Work\Rigla__Bud_Zdorov\2024-05-07\DWH\DWH_orders_bz_01.05-06.05_sep.csv'
PATH_DWH_RIGLA = r'D:\Work\Rigla__Bud_Zdorov\2024-05-07\DWH\DWH_orders_rigla_01.05-06.05_sep.csv'

#OUTPUT
PATH_OUTPUT = r'D:\Work\Rigla__Bud_Zdorov\2024-09-12\Output'

In [13]:
def save_df_to_csv(df, dir_to_save, name):
    ts = datetime.now().strftime("%Y-%m-%d_%H%M%S")
    file_name = ts + '_' + name + '.csv'
    if not os.path.exists(dir_to_save):
        os.makedirs(dir_to_save)
    csv_path = os.path.join(os.path.normpath(dir_to_save), file_name)
    df.to_csv(csv_path, index=False, sep=';', decimal=',')
    print_ts(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Сохранено в файл \n{csv_path}")
    return

def print_ts(message):
    print("{ts}: {message}".format(ts=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), message=message))
    return

# Выставляем флаги условий
def set_conditions(xdf):
    print_ts('Выставляем флаги условий')
    df = xdf.copy(deep=True)
    
    # Условия
    cond_1 = df['dwh_order_id']==df['order_id']
    # Используется диапазон дат
    cond_2 = (df['dwh_am_diff_days']>=-7) & (df['dwh_am_diff_days']<=0)
    cond_3 = df['dwh_order_sum']==df['total']
    cond_4 = df['is_reinstallation']=='true'

    # DWH совпадает с AM по Order ID
    df['dwh_am_order_id_equal'] = np.where(cond_1, 1, 0)

    # DWH совпадает с AM по Order ID и Дате
    df['dwh_am_order_id_date_equal'] = np.where(cond_1 & cond_2, 1, 0)

    # DWH совпадает с AM по Order ID и Сумме
    df['dwh_am_order_id_sum_equal'] = np.where(cond_1 & cond_3, 1, 0)

    # DWH совпадает с AM по Order ID, Дате и Сумме
    df['dwh_am_order_id_date_sum_equal'] = np.where(cond_1 & cond_2 & cond_3, 1, 0)
    
    df['am_is_reinstallation'] = np.where(cond_4, 1, 0)
    df['dwh_am_order_id_equal__am_is_reinstallation'] = np.where(cond_1 & cond_4, 1, 0)
    df['dwh_am_order_id_date_equal__am_is_reinstallation'] = np.where(cond_1 & cond_2 & cond_4, 1, 0)
    df['dwh_am_order_id_sum_equal__am_is_reinstallation'] = np.where(cond_1 & cond_3 & cond_4, 1, 0)
    df['dwh_am_order_id_date_sum_equal__am_is_reinstallation'] = np.where(cond_1 & cond_2 & cond_3 & cond_4, 1, 0)
    print_ts('OK!')
    print()
    return df

def try_decimal(x):    
    try:
        y = Decimal(x)
    except:
        y = Decimal('0')
    return y     

# Наивная валидация датф по формату и году
def validate_dates(xdate, date_format):
    try:
        date = datetime.strptime(xdate, date_format)
        valid = True if date.year == 2024 else False
    except:
        valid = False
    return valid

# Подготавливаем данные DWH
def dwh_data_prep_adhoc(xdf, date_format):
    df = xdf.copy()
    df['date_valid'] = df['create_date'].apply(lambda x: validate_dates(x, date_format))
    if(not df['date_valid'].values.all()):
        incorrect_count = df[~df['date_valid']].shape[0]
        print_ts(f'Не все даты прошли проверку. Будет удалено {incorrect_count} записей.')
        save_df_to_csv(df[~df['date_valid']], PATH_OUTPUT, f'incorrect_dates__{incorrect_count}')
        df = df[df['date_valid']]
        
    df['dwh_order_create_date'] = pd.to_datetime(df['create_date'], format=date_format)        
    df['number_brand'] = df['number'].str.split('/', expand=True)[0]
    df['dwh_order_id'] = df['number'].str.split('/', expand=True)[1]
    df['dwh_order_sum'] = df['order_sum'].fillna('0')
    df['dwh_order_sum'] = df['dwh_order_sum'].apply(lambda x: try_decimal(x))    
    df.drop_duplicates(ignore_index=True, inplace=True)
    brand_cond_v = [
        df['site_name'].str.contains('budzdorov'),
        df['site_name'].str.contains('rigla')
    ]
    brand_choises = ['budzdorov.ru', 'rigla.ru']
    df['dwh_brand'] = np.select(brand_cond_v, brand_choises, default=None)
    cols_to_use = [
        'dwh_brand',
        'dwh_order_create_date',
        'dwh_order_id',
        'dwh_order_sum'
    ]
    df = df[cols_to_use]
    return df

# Получаем данные AM (заказы для метчинга)
def get_am_data_adhoc(path):
    print_ts(f'Получаем данные AM из CSV \n{path}')
    df = pd.read_csv(path, dtype=object)
    df.drop_duplicates(inplace=True, ignore_index=True)
    # df['event_date'] = pd.to_datetime(df['event_date'], format='%Y-%m-%d %H:%M:%S
    df['event_date'] = pd.to_datetime(df['event_date'], format='%Y-%m-%d')
    df['event_datetime'] = pd.to_datetime(df['event_date'], format='%Y-%m-%d %H:%M:%S')
    total_events = df.shape[0]
    print_ts(f'Всего событий AM: {total_events}')
    print_ts('OK!')
    print()
    return df

# Получаем данные DWH
def get_dwh_data_adhoc(path_bz, path_rigla):
    print_ts('Получаем данные DWH')

    print_ts('Получаем данные BZ\n' + path_bz)
    dwh_data_bz_raw = pd.read_csv(path_bz, sep=';', dtype=object)
    print_ts('Подготавливаем данные BZ')
    dwh_data_bz = dwh_data_prep_adhoc(dwh_data_bz_raw, "%Y-%m-%d")
    bz_events = dwh_data_bz.shape[0]
    print_ts(f'Всего событий BZ: {bz_events}')
    print()


    print_ts('Получаем данные Rigla\n' + path_rigla)
    dwh_data_rigla_raw = pd.read_csv(path_rigla, sep=';', dtype=object)
    print_ts('Подготавливаем данные Rigla')
    dwh_data_rigla = dwh_data_prep_adhoc(dwh_data_rigla_raw, "%Y-%m-%d")
    rigla_events = dwh_data_rigla.shape[0]
    print_ts(f'Всего событий Rigla: {rigla_events}')
    print()

    # Объединяем данные
    dwh_data = pd.concat([dwh_data_bz, dwh_data_rigla])
    print_ts(f'Строк данных DWH: {dwh_data.shape[0]}')
    count_of_bz_rigla_unique = dwh_data[['dwh_brand','dwh_order_id']].drop_duplicates().shape[0]
    print_ts(f'Уникальных заказов: {count_of_bz_rigla_unique}')    

    df = dwh_data[(dwh_data['dwh_order_create_date']>=DWH_START_DATE) & (dwh_data['dwh_order_create_date']<=DWH_END_DATE)]
    df.reset_index(drop=True, inplace=True)
    print_ts('Всего записей DWH: {0}'.format(dwh_data.shape[0]))
    print_ts('Отфильтровано по дате: {0}'.format(dwh_data.shape[0] - df.shape[0]))
    print_ts('Использовано для отчётного периода записей: {0}'.format(df.shape[0]))
    print_ts('')
    print(df['dwh_order_create_date'].agg(['min','max'])) 
    print()
    return df

# Метчим данные: к данным AM привязываем DWH
def get_am_dwh_joined_data(am_data, dwh_data):
    print_ts('Метчим данные - к данным AM привязываем DWH')
    am_dwh_orders = am_data.merge(dwh_data, how='left', left_on=['order_id','brand'], right_on=['dwh_order_id','dwh_brand'])
    am_dwh_orders['am_unique'] = 1    
    
    # Добавляем разницу в днях
    am_dwh_orders['dwh_am_diff_days'] = (am_dwh_orders['dwh_order_create_date'] - am_dwh_orders['event_date']).dt.days
    
    # Убираем лишние колонки
    useful_columns = [
        'am_unique',
        'dwh_order_create_date', 
        'dwh_order_id',
        'dwh_order_sum', 
        'dwh_brand', 
        'event_name', 
        'event_json',
        'event_datetime', 
        'city', 
        'appmetrica_device_id',
        'brand', 
        'order_id', 
        'total', 
        'publisher_name',
        'tracker_name',
        'is_reinstallation',
        'event_date',
        'event_month', 
        'dwh_am_diff_days'
    ]
    df = am_dwh_orders[useful_columns].copy()
    df['total'] = df['total'].apply(lambda x: Decimal(x))
    df['dwh_order_sum'] = df['dwh_order_sum'].apply(lambda x: Decimal(x))
    
    # Добавляем столбец с разницей по суммам
    df['dwh_am_sum_diff'] = df['dwh_order_sum'] - df['total']    
    
    print_ts('OK!')
    print()
    return df

# Округляем суммы по следующему правилу:
# сверка по суммам не учитывает копейки и округляется до 10х, т.е. если сумма 182.50 руб. , она приравнивается к сумме 180 руб.
# если сумма 189 руб. , она приравнивается к сумме 190 руб. 
def round_x_half_up(x):
    x = Decimal(x)
    if not x.is_nan():        
        x = (x / 10).quantize(Decimal('0'), ROUND_HALF_UP) * 10
    return x

def modify_totals(xdf):
    df = xdf.copy(deep=True)
    df['dwh_order_sum_original'] = df['dwh_order_sum']
    df['total_original'] = df['total']
    df['total'] = df['total_original'].apply(lambda x: round_x_half_up(x))
    df['dwh_order_sum'] = df['dwh_order_sum_original'].apply(lambda x: round_x_half_up(x))    
    return df

def main():
    # Получаем данные AM (заказы для метчинга)
    am_data_first_orders = get_am_data_adhoc(PATH_AM_ORDERS_FOR_MATCHING)   
    
    # Получаем данные DWH
    dwh_data_subset = get_dwh_data_adhoc(PATH_DWH_BZ, PATH_DWH_RIGLA)    
    
    # Метчим данные: к данным AM привязываем DWH
    am_dwh_orders_short = get_am_dwh_joined_data(am_data_first_orders, dwh_data_subset)    

    # Округляем тоталы
    am_dwh_orders_short = modify_totals(am_dwh_orders_short)    
    
    # Выставляем флаги условий
    task_conds_df = set_conditions(am_dwh_orders_short)    
    
    use_columns = [
        'am_unique','brand', 'publisher_name','event_month',
        'dwh_am_diff_days', 'dwh_am_order_id_equal',
        'dwh_am_order_id_date_equal', 'dwh_am_order_id_sum_equal',
        'dwh_am_order_id_date_sum_equal', 'am_is_reinstallation',
        'dwh_am_order_id_equal__am_is_reinstallation',
        'dwh_am_order_id_date_equal__am_is_reinstallation',
        'dwh_am_order_id_sum_equal__am_is_reinstallation',
        'dwh_am_order_id_date_sum_equal__am_is_reinstallation']
    xdf = task_conds_df[use_columns].copy()    

    # Делаем сводную
    task_conds_df_sum = xdf.groupby(['event_month', 'brand', 'publisher_name'], as_index=False, dropna=False).sum()
    task_conds_df_sum.drop(['dwh_am_diff_days'], axis=1, inplace=True)    
    
    # Сохраняем полученные данные
    print_ts('Сохраняем полученные данные.')
    save_df_to_csv(task_conds_df, PATH_OUTPUT, 'task_conds')
    save_df_to_csv(task_conds_df_sum, PATH_OUTPUT, 'task_conds_agg')
    
    return

In [14]:
#######################################
# Получаем сводку
#######################################
main()

2024-09-12 19:03:16: Получаем данные AM из CSV 
D:\Work\Rigla__Bud_Zdorov\2024-09-12\Output\2024-09-12_165117_am_orders_for_dwh_matching_2024-05-01__2024-05-06.csv
2024-09-12 19:03:19: Всего событий AM: 121538
2024-09-12 19:03:19: OK!

2024-09-12 19:03:19: Получаем данные DWH
2024-09-12 19:03:19: Получаем данные BZ
D:\Work\Rigla__Bud_Zdorov\2024-05-07\DWH\DWH_orders_bz_01.05-06.05_sep.csv
2024-09-12 19:03:19: Подготавливаем данные BZ
2024-09-12 19:03:21: Всего событий BZ: 124044

2024-09-12 19:03:21: Получаем данные Rigla
D:\Work\Rigla__Bud_Zdorov\2024-05-07\DWH\DWH_orders_rigla_01.05-06.05_sep.csv
2024-09-12 19:03:21: Подготавливаем данные Rigla
2024-09-12 19:03:22: Всего событий Rigla: 74978

2024-09-12 19:03:22: Строк данных DWH: 199022
2024-09-12 19:03:22: Уникальных заказов: 199019
2024-09-12 19:03:23: Всего записей DWH: 199022
2024-09-12 19:03:23: Отфильтровано по дате: 0
2024-09-12 19:03:23: Использовано для отчётного периода записей: 199022
2024-09-12 19:03:23: 
min   2024-05-0