In [57]:
from typing import Dict, Optional
import requests
import pandas as pd
from zipfile import ZipFile
from io import BytesIO, StringIO
from datetime import datetime, timedelta
import time
import numpy as np
import pytz
from concurrent.futures import ThreadPoolExecutor

import time

from requests import RequestException


def request_with_retries(requests_method, url,
                         max_retries=5, retry_delay=60, err_prefix="", **kwargs):
    for attempt in range(max_retries):
        try:
            response = requests_method(url, **kwargs)

            if response.status_code in [429, 500, 502, 503, 504, 404]:
                print(f"{err_prefix}Ошибка {response.status_code}. "
                      f"Попытка {attempt + 1} из {max_retries}. Ожидание {retry_delay} секунд...")
                time.sleep(retry_delay)
            else:
                return response

        except RequestException as e:
            print(f"{err_prefix}Ошибка сети: {e}. Попытка {attempt + 1} из {max_retries}. Ожидание {retry_delay} секунд...")
            time.sleep(retry_delay)

    raise Exception(f"{err_prefix}Запрос не выполнен после {max_retries} попыток.")



def _process_csv_data(csv_data: str) -> Optional[pd.DataFrame]:
    required_columns = [
        "sku", "Название товара", "Цена товара, ₽", "Показы", "Клики", "CTR (%)",
        "В корзину", "Средняя цена клика, ₽", "Расход, ₽ с НДС",
        "Расход за минусом бонусов, ₽ с НДС", "Заказы", "Выручка, ₽",
        "Заказы модели", "Выручка с заказов модели, ₽", "Дата"
    ]
    try:
        df = pd.read_csv(StringIO(csv_data), sep=";", decimal=",", skiprows=1)
        processed_df = pd.DataFrame(columns=required_columns)

        if "sku" in df.columns:
            processed_df["sku"] = df.get("sku", 0)
            processed_df["Название товара"] = df.get("Название товара", "")
            processed_df["Цена товара, ₽"] = df.get("Цена товара, ₽", 0)
            processed_df["Показы"] = df.get("Показы", 0)
            processed_df["Клики"] = df.get("Клики", 0)
            processed_df["CTR (%)"] = df.get("CTR (%)", 0)
            processed_df["В корзину"] = df.get("В корзину", 0)
            processed_df["Средняя цена клика, ₽"] = df.get("Средняя цена клика, ₽", 0)
            processed_df["Расход, ₽ с НДС"] = df.get("Расход, ₽, с НДС", 0)
            processed_df["Расход за минусом бонусов, ₽ с НДС"] = df.get(
                "Расход за минусом бонусов, ₽ с НДС", 0)
            processed_df["Заказы"] = df.get("Заказы", 0)
            processed_df["Выручка, ₽"] = df.get("Выручка, ₽", 0)
            processed_df["Заказы модели"] = df.get("Заказы модели", 0)
            processed_df["Выручка с заказов модели, ₽"] = df.get("Выручка с заказов модели, ₽", 0)
            processed_df["Дата"] = "0"
        elif "Ozon ID" in df.columns:
            processed_df["sku"] = df.get("Ozon ID", 0)
            processed_df["Название товара"] = df.get("Наименование", "")
            processed_df["Цена товара, ₽"] = df.get("Цена продажи", 0)
            processed_df["Показы"] = 0
            processed_df["Клики"] = df.get("Количество", 0)
            processed_df["CTR (%)"] = 0
            processed_df["В корзину"] = 0
            processed_df["Средняя цена клика, ₽"] = 0
            processed_df["Расход, ₽ с НДС"] = df.get("Расход, ₽", 0)
            processed_df["Расход за минусом бонусов, ₽ с НДС"] = 0
            processed_df["Заказы"] = 0
            processed_df["Выручка, ₽"] = df.get("Цена продажи", 0)
            processed_df["Заказы модели"] = 0
            processed_df["Выручка с заказов модели, ₽"] = 0
            processed_df["Дата"] = df.get("Дата", "0")
        else:
            print(f"Неизвестный формат файла. Пропуск обработки.")
            return None

        processed_df = processed_df[processed_df["sku"].notnull() & (processed_df["sku"] != "Всего")]
        processed_df = processed_df.fillna(0)
        processed_df = processed_df.replace([np.inf, -np.inf], 0)

        if processed_df.empty:
            print(f"Данные не были добавлены, так как DataFrame пуст.")
        return processed_df
    except Exception as e:
        print(f"Ошибка обработки CSV данных: {e}")


def _fetch_ozon_ad_campaign_statistics(date_from: datetime.date, date_to: datetime.date, seller_legal: str, client_id: str, client_secret: str) -> pd.DataFrame:
    log_prefix = f"Реклама (стат). {date_from}-{date_to}. {seller_legal}. "
    err_prefix = f"\033[91mWARNING: {log_prefix}\033[0m"

    try:
        formatted_date_from = date_from.strftime("%Y-%m-%dT00:00:00Z")
        formatted_date_to = date_to.strftime("%Y-%m-%dT23:59:59Z")
        formatted_date_from_short = date_from.strftime("%Y-%m-%d")
        formatted_date_to_short = date_to.strftime("%Y-%m-%d")
    except ValueError as e:
        raise Exception(f"{err_prefix}Неверный формат даты. Ожидается DD.MM.YYYY. {str(e)}")

    # Авторизация: получение токена
    auth_url = "https://api-performance.ozon.ru/api/client/token"
    auth_payload = {
        "client_id": client_id,
        "client_secret": client_secret,
        "grant_type": "client_credentials"
    }
    auth_headers = {"Content-Type": "application/json"}

    auth_response = request_with_retries(
        requests.post, url=auth_url, headers=auth_headers, json=auth_payload, err_prefix=err_prefix
    )
    if auth_response.status_code != 200:
        raise Exception(f"{err_prefix}Ошибка авторизации: {auth_response.status_code}, {auth_response.text}")

    token_data = auth_response.json()
    access_token = token_data.get("access_token")
    if not access_token:
        raise Exception(f"{err_prefix}Не удалось получить токен доступа.")

    # Получение данных по рекламным кампаниям
    expense_url = "https://api-performance.ozon.ru/api/client/statistics/expense"
    expense_headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    expense_params = {
        "dateFrom": formatted_date_from_short,
        "dateTo": formatted_date_to_short
    }

    expense_response = request_with_retries(
        requests.get, url=expense_url, headers=expense_headers, params=expense_params, err_prefix=err_prefix
    )
    if expense_response.status_code != 200:
        raise Exception(
            f"{err_prefix} Ошибка получения данных по рекламным кампаниям: {expense_response.status_code}, {expense_response.text}"
        )

    # Обработка ответа в формате CSV
    try:
        csv_data = expense_response.text  # Получаем текстовый ответ
        df = pd.read_csv(StringIO(csv_data), sep=";", decimal=",")  # Преобразуем в DataFrame
    except Exception as e:
        raise Exception(f"{err_prefix} Ошибка обработки данных: {e}")

    # Получение списка кампаний
    if df.empty:
        print(f"{err_prefix}Данные по рекламным кампаниям отсутствуют в ответе. Завершение функции.")
        return None

    campaign_ids = list(set(df["ID"].astype(str)))
    if not campaign_ids:
        raise Exception(f"{err_prefix}Список ID кампаний пуст.")

    # Выгрузка статистики по 10 кампаний за раз
    statistics_url = "https://api-performance.ozon.ru:443/api/client/statistics"
    report_status_url = "https://api-performance.ozon.ru:443/api/client/statistics"
    report_url = "https://api-performance.ozon.ru:443/api/client/statistics/report"
    statistics_headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}

    result_data = []

    for i in range(0, len(campaign_ids), 10):
        print(f"{log_prefix} Чанк {i} из {len(campaign_ids)}:")
        campaigns_chunk = campaign_ids[i:i + 10]
        statistics_payload = {
            "campaigns": campaigns_chunk,
            "from": formatted_date_from,
            "to": formatted_date_to,
            "groupBy": "NO_GROUP_BY"
        }

        statistics_response = request_with_retries(
            requests.post, url=statistics_url, headers=statistics_headers, json=statistics_payload, err_prefix=err_prefix
        )

        if statistics_response.status_code != 200:
            raise Exception(
                f"{err_prefix}Ошибка отправки запроса на статистику: {statistics_response.status_code}, {statistics_response.text}"
            )

        report_id = statistics_response.json().get("UUID")
        if not report_id:
            raise Exception(f"{err_prefix}Не удалось получить UUID отчета.")

        # Проверка статуса отчета
        current_state = None
        for N in range(60):
            report_status_response = request_with_retries(
                requests.get, url=f"{report_status_url}/{report_id}", headers=statistics_headers, err_prefix=err_prefix
            )

            if report_status_response.status_code != 200:
                raise Exception(
                    f"{err_prefix}Ошибка проверки статуса отчета: {report_status_response.status_code}, {report_status_response.text}"
                )

            report_status = report_status_response.json()
            current_state = report_status.get("state")
            if current_state == "OK":
                print(f"{log_prefix}Проверка {N}: статус: {current_state}")
                break
            else:
                print(f"{log_prefix}Проверка {N}: статус: {current_state}")
                time.sleep(30)
        else:
            raise Exception(f"{err_prefix}Отчет не готов после максимального количества попыток.")

        if current_state == "OK":
            report_response = request_with_retries(
                requests.get, url=f"{report_url}?UUID={report_id}", headers=statistics_headers, err_prefix=err_prefix
            )
            if report_response.status_code == 200:
                content_type = report_response.headers.get("Content-Type", "").lower()
                if "text/csv" in content_type:
                    csv_data = report_response.text
                    result_data.append(_process_csv_data(csv_data))
                elif "application/zip" in content_type:
                    with ZipFile(BytesIO(report_response.content)) as zip_file:
                        for file_name in zip_file.namelist():
                            with zip_file.open(file_name) as file:
                                csv_data = file.read().decode("utf-8")
                                result_data.append(_process_csv_data(csv_data))
                else:
                    raise Exception(f"{err_prefix}Неизвестный формат ответа. Ожидается CSV или ZIP.")
                print(f"{log_prefix}Добавление отчетов из архива закончено")

    result_data = [data for data in result_data if data is not None]
    if result_data:
        result_data = pd.concat(result_data)
        return result_data
    else:
        return None


def fetch_last_day_ozon_ad_campaign_statistics(config: Dict[str, Dict[str, str]]) -> pd.DataFrame:
    moscow_tz = pytz.timezone('Europe/Moscow')
    # TODO: replace to days=1
    yesterday_date = datetime.now(moscow_tz).date() - timedelta(days=195)
    result_df = []
    for seller_legal, data in config.items():
        client_id = data['client_id']
        client_secret = data['client_secret']
        df = _fetch_ozon_ad_campaign_statistics(yesterday_date, yesterday_date, seller_legal, client_id, client_secret)
        if df is None:
            continue
        df["date"] = pd.to_datetime(yesterday_date)
        if seller_legal == "inter":
            df["legal_entity"] = "ИНТЕР"
        elif seller_legal == "ut":
            df["legal_entity"] = "АТ"
        else:
            raise Exception(f"Необработанный seller_legal={seller_legal}")
        result_df.append(df)

    if result_df:
        result_df = pd.concat(result_df)

        return result_df
    else:
        return None


In [46]:
config = {
    
}

In [58]:
df = fetch_last_day_ozon_ad_campaign_statistics(config['ozon_ad_compaign'])

Реклама (стат). 2024-07-15-2024-07-15. ut.  Чанк 0 из 3:
Реклама (стат). 2024-07-15-2024-07-15. ut. Проверка 0: статус: NOT_STARTED
Реклама (стат). 2024-07-15-2024-07-15. ut. Проверка 1: статус: IN_PROGRESS
Реклама (стат). 2024-07-15-2024-07-15. ut. Проверка 2: статус: OK
Реклама (стат). 2024-07-15-2024-07-15. ut. Добавление отчетов из архива закончено


In [59]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 3 entries, 0 to 0
Data columns (total 17 columns):
 #   Column                              Non-Null Count  Dtype         
---  ------                              --------------  -----         
 0   sku                                 3 non-null      object        
 1   Название товара                     3 non-null      object        
 2   Цена товара, ₽                      3 non-null      float64       
 3   Показы                              3 non-null      int64         
 4   Клики                               3 non-null      int64         
 5   CTR (%)                             3 non-null      float64       
 6   В корзину                           3 non-null      int64         
 7   Средняя цена клика, ₽               3 non-null      int64         
 8   Расход, ₽ с НДС                     3 non-null      float64       
 9   Расход за минусом бонусов, ₽ с НДС  3 non-null      int64         
 10  Заказы                             

In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 52 entries, 0 to 0
Data columns (total 17 columns):
 #   Column                              Non-Null Count  Dtype         
---  ------                              --------------  -----         
 0   sku                                 52 non-null     object        
 1   Название товара                     52 non-null     object        
 2   Цена товара, ₽                      52 non-null     float64       
 3   Показы                              52 non-null     float64       
 4   Клики                               52 non-null     float64       
 5   CTR (%)                             52 non-null     float64       
 6   В корзину                           52 non-null     float64       
 7   Средняя цена клика, ₽               52 non-null     int64         
 8   Расход, ₽ с НДС                     52 non-null     float64       
 9   Расход за минусом бонусов, ₽ с НДС  52 non-null     int64         
 10  Заказы                            

In [55]:
pd.set_option('display.max_columns', None)

In [56]:
df

Unnamed: 0,ID,Дата,Название,Расход,Расход бонусов,date,legal_entity,sku,Название товара,"Цена товара, ₽",Показы,Клики,CTR (%),В корзину,"Средняя цена клика, ₽","Расход, ₽ с НДС","Расход за минусом бонусов, ₽ с НДС",Заказы,"Выручка, ₽",Заказы модели,"Выручка с заказов модели, ₽"
0,,0,,,,2024-07-15,АТ,1581362232,Мусорное ведро на ножках,1446.0,2560.0,82.0,3.2,2.0,0.0,1318.35,0.0,0.0,0.0,0.0,0.0
0,,0,,,,2024-07-15,АТ,1401867170,"AQRA Набор кастрюль Набор кастрюль нержавейка,...",2592.0,5540.0,161.0,2.91,10.0,0.0,1392.46,0.0,3.0,7792.0,0.0,0.0
0,,0,,,,2024-07-15,АТ,1581359989,Мусорное ведро с черной подставкой и черной кр...,1615.0,4258.0,196.0,4.6,10.0,0.0,1393.06,0.0,4.0,5460.0,0.0,0.0


In [8]:
dbname='app_db'
user='user'
host='db'
port=5432

In [9]:
import getpass
password = getpass.getpass()

 ········


In [33]:
import os
import uuid
from psycopg2 import connect, sql
from functools import wraps
from typing import Callable
import pandas as pd
from datetime import datetime, timedelta
import pytz
from pandas.api.types import is_datetime64_any_dtype

def inject_closable_cursor(func: Callable):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # for tests only
        if 'cursor' in kwargs:
            return func(*args, **kwargs)
        connection = connect(
            dbname=dbname,
            user=user,
            password=password,
            host=host,
            port=port
        )
        try:
            with connection.cursor() as cursor:
                result = func(*args, cursor=cursor, **kwargs)
                connection.commit()
                return result
        except Exception:
            connection.rollback()
            raise
        finally:
            connection.close()
    return wrapper

@inject_closable_cursor
def delete_previous_and_insert_new_postgres_table(df, table_name, date_column="date", **kwargs):
    cursor = kwargs.get('cursor')

    if is_datetime64_any_dtype(df[date_column]):
        df[date_column] = df[date_column].dt.strftime('%Y-%m-%d')

    current_date = df[date_column].iloc[0]
    delete_query = f"""
        DELETE FROM {table_name}
        WHERE "{date_column}" = %s;
    """
    cursor.execute(delete_query, (current_date,))


    # Вставка новых записей
    columns_df = [col for col in df.columns]
    columns_escaped = [f'"{col.replace("%", "percent")}"' for col in columns_df]
    insert_query = f"""
        INSERT INTO {table_name} ({', '.join(columns_escaped)})
        VALUES ({', '.join(['%s' for _ in columns_df])});
    """

    print(f"Количество столбцов: {len(columns_df)}")
    print(f"Пример строки: {df.iloc[0].to_dict()}")
    print(insert_query)

    for _, row in df.iterrows():
        values = [row[col] for col in columns_df]
        if len(values) != len(columns_df):
            print(f"Несоответствие в строке {i}: {values}")
            break
        cursor.execute(insert_query, values)

In [17]:
df['date'].dtype == 'O'

True

In [18]:
test_df = pd.DataFrame()

In [23]:
test_df['date'] = pd.to_datetime(datetime.now().date() - timedelta(days=2))

In [24]:
test_df['date'].dtype

dtype('<M8[ns]')

In [26]:
is_datetime64_any_dtype(test_df['date'])

True

In [27]:
is_datetime64_any_dtype(df['date'])

False

In [38]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 52 entries, 0 to 0
Data columns (total 17 columns):
 #   Column                              Non-Null Count  Dtype  
---  ------                              --------------  -----  
 0   sku                                 52 non-null     object 
 1   Название товара                     52 non-null     object 
 2   Цена товара, ₽                      52 non-null     float64
 3   Показы                              52 non-null     float64
 4   Клики                               52 non-null     float64
 5   CTR (%)                             52 non-null     float64
 6   В корзину                           52 non-null     float64
 7   Средняя цена клика, ₽               52 non-null     int64  
 8   Расход, ₽ с НДС                     52 non-null     float64
 9   Расход за минусом бонусов, ₽ с НДС  52 non-null     int64  
 10  Заказы                              52 non-null     float64
 11  Выручка, ₽                          52 non-null     f

In [43]:
df.head()

Unnamed: 0,sku,Название товара,"Цена товара, ₽",Показы,Клики,CTR (%),В корзину,"Средняя цена клика, ₽","Расход, ₽ с НДС","Расход за минусом бонусов, ₽ с НДС",Заказы,"Выручка, ₽",Заказы модели,"Выручка с заказов модели, ₽",Дата,date,legal_entity
0,1491493976,Дрель шуруповерт аккумуляторная - PHOQ / 2 акк...,2737.0,52.0,1.0,1.92,0.0,0,18.07,0,0.0,0.0,0.0,0.0,0,2025-01-21,ИНТЕР
1,1583345618,Гайковерт аккумуляторный ударный бесщеточный -...,4505.0,168.0,2.0,1.19,0.0,0,14.1,0,0.0,0.0,0.0,0.0,0,2025-01-21,ИНТЕР
0,1392801042,"Набор кастрюль с крышкой, из нержавеющей стали...",3500.0,2948.0,125.0,4.24,23.0,0,2803.38,0,0.0,0.0,0.0,0.0,0,2025-01-21,ИНТЕР
0,1793090873,Шуруповерт Дрель аккумуляторная ударная бесщет...,2737.0,1782.0,14.0,0.79,0.0,0,333.78,0,0.0,0.0,0.0,0.0,0,2025-01-21,ИНТЕР
0,1408067232,"Набор кастрюль с крышкой, из нержавеющей стали...",2732.0,3405.0,143.0,4.2,21.0,0,3424.63,0,3.0,10557.0,1.0,5300.0,0,2025-01-21,ИНТЕР


In [42]:
df['Средняя цена клика, ₽'].unique()

array([0])

In [44]:
delete_previous_and_insert_new_postgres_table(df, 'ozon_ad_compaign')

Количество столбцов: 17
Пример строки: {'sku': '1491493976', 'Название товара': 'Дрель шуруповерт аккумуляторная - PHOQ / 2 аккумулятора 21v, 23 насадки, кейс с аксессуарами в комплекте', 'Цена товара, ₽': 2737.0, 'Показы': 52.0, 'Клики': 1.0, 'CTR (%)': 1.92, 'В корзину': 0.0, 'Средняя цена клика, ₽': 0, 'Расход, ₽ с НДС': 18.07, 'Расход за минусом бонусов, ₽ с НДС': 0, 'Заказы': 0.0, 'Выручка, ₽': 0.0, 'Заказы модели': 0.0, 'Выручка с заказов модели, ₽': 0.0, 'Дата': '0', 'date': '2025-01-21', 'legal_entity': 'ИНТЕР'}

        INSERT INTO ozon_ad_compaign ("sku", "Название товара", "Цена товара, ₽", "Показы", "Клики", "CTR (percent)", "В корзину", "Средняя цена клика, ₽", "Расход, ₽ с НДС", "Расход за минусом бонусов, ₽ с НДС", "Заказы", "Выручка, ₽", "Заказы модели", "Выручка с заказов модели, ₽", "Дата", "date", "legal_entity")
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    
