In [None]:
import pandas as pd
import psycopg2
import psycopg2.extras
import getpass
import csv
import logging
from datetime import datetime
import os

In [None]:
# Запрашиваем логин и пароль
username = input("Мой логин: ")
password = getpass.getpass("Мой пароль: ")

In [None]:
# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='data_extraction.log'
)

In [None]:
# Настройка параметров соединения с GreenPlum
def get_db_connection(username, password):
    try:
        conn = psycopg2.connect(
            host="prod.host",
            database="greenplum",
            user=username,
            password=password
        )
        logging.info("Успешное подключение к базе данных")
        return conn
    except psycopg2.Error as e:
        logging.error(f"Ошибка подключения: {e}")
        raise

In [None]:
# Параметры порционирования 
batch_size = 100000 # Указать размер пакетов, по которым скрипт обращается к БД
output_dir = rf"C:\Users\{username}\Documents\Python Scripts"
file_name = "имя_итогового_файла" # Указать название файла

# Создаем папку, если её нет
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [None]:
# Подключаемся к GreenPlum
conn = get_db_connection(username, password)
cur = conn.cursor()

# SQL-запрос
    sql_query = """
    WITH product_availability AS (
        SELECT 
            va.product_id,
            va.seller_id,
            CASE 
                WHEN vo.availability = 'true' AND vo.eligibility = 'true' AND vo.rank = '1' 
                THEN 1 
                ELSE 0 
            END AS is_available_for_seller,
            MAX(CASE 
                WHEN vo.availability = 'true' AND vo.eligibility = 'true' AND vo.rank = '1' 
                THEN 1 
                ELSE 0 
            END) OVER (PARTITION BY va.product_id) AS is_available_anywhere
        FROM assort_srvc_ods.v_assortments AS va
        JOIN offer_service_ods.v_offers AS vo ON va.id = vo.assortment_id
        WHERE va.is_actual = '1' AND vo.is_actual = '1'
    )
    SELECT
        concat(vs.id, '-', va.product_id) AS product_key,
        vs.id AS seller_id,
        vs.name AS seller,
        CASE
            WHEN vs.status = 'enabled' THEN 'Активен'
            WHEN vs.status = 'created' THEN 'Договор не подписан'
            ELSE 'В архиве'
        END AS seller_status,
        vm2.name AS manager_mp,
        va.product_id,
        CASE 
            WHEN COUNT(*) OVER (PARTITION BY va.product_id) > 1 
            THEN 'Мультиоффер' 
            ELSE 'Уник'
        END AS "Уникальность товара",
        COALESCE(split_part(va.nomenclature_code, '.', 1), '0') AS department,
        COALESCE(split_part(va.nomenclature_code, '.', 2), '0') AS sub_department,
        concat(vac.code, '_', vac.name) AS pim_model,
        va.title,
        va.brand,
        CASE
            WHEN va.status = 'ready' THEN 'Активен'
            ELSE 'В архиве'
        END AS assortment_status,
        vo.stock,
        vo.price,
        CASE
            WHEN pa.is_available_for_seller = 1 THEN 'Доступен сейчас'
            WHEN pa.is_available_anywhere = 1 THEN 'Доступен от другого селлера'
            ELSE 'Недоступен сейчас'
        END AS on_site,
        wa.seller_message as metchant_comment,
        wa.marketplace_message as marketplace_comment
    FROM assort_srvc_ods.v_assortments AS va
    LEFT JOIN seller_service_ods.v_sellers AS vs 
        ON va.seller_id = vs.id
    LEFT JOIN seller_service_ods.v_managers AS vm2 
        ON vs.manager_id = vm2.id
    LEFT JOIN offer_service_ods.v_offers AS vo 
        ON va.id = vo.assortment_id
    LEFT JOIN assort_srvc_ods.v_assortment_categories AS vac 
        ON va.category_code = vac.code
    LEFT JOIN warehouse_assortment_service_ods.v_warehouses_assortments AS wa
        ON wa.assortment_id = va.id
    LEFT JOIN product_availability pa
        ON pa.product_id = va.product_id AND pa.seller_id = va.seller_id
    WHERE va.product_id IS NOT NULL
      AND vs.status = 'enabled'
      AND va.is_actual = '1' 
      AND vs.is_actual = '1' 
      AND vm2.is_actual = '1' 
      AND vo.is_actual = '1' 
      AND vac.is_actual = '1'
      AND wa.is_actual = '1'
    ORDER BY concat(vs.id, '-', va.product_id)
;
"""

try:
    # Выполняем запрос
    cur.execute(sql_query)
    
    # Инициализируем список для данных
    all_data = []
    row_count = 0
    file_count = 0
    temp_files = []  # Список для хранения имен промежуточных файлов
    
    # Порционная загрузка
    while True:
        rows = cur.fetchmany(batch_size)
        if not rows:
            break
        
        # Преобразуем в DataFrame
        df = pd.DataFrame(rows, columns=[desc[0] for desc in cur.description])
        
        # Обрабатываем NaN
        df = df.fillna("")
        
        # Убираем переносы строк
        df = df.map(lambda x: str(x).replace("\n", " ") if isinstance(x, str) else x)
        
        # Добавляем в общий список
        all_data.append(df)
        row_count += len(rows)
        
        # Сохраняем порцию в файл
        file_count += 1
        output_file = os.path.join(output_dir, f"{file_count}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file_name}")
        df.to_csv(
            output_file,
            encoding="utf-8",
            sep=";",
            errors="replace",
            index=False,
            quoting=csv.QUOTE_ALL
        )
        temp_files.append(output_file)  # Добавляем имя файла в список
        logging.info(f"Сохранен файл {output_file} с {len(rows)} строками")

    # Объединяем все части в один файл (опционально)
    if all_data:
        final_df = pd.concat(all_data, ignore_index=True)
        final_file = os.path.join(output_dir, f"{file_name}.csv")
        final_df.to_csv(
            final_file,
            encoding="windows-1251",
            sep=";",
            errors="replace",
            index=False,
            quoting=csv.QUOTE_ALL
        )
        logging.info(f"Сохранен итоговый файл {final_file} с {row_count} строками")
        print(f"✅ Итоговый файл '{final_file}' сохранен! Всего строк: {row_count}. Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

        # Удаляем промежуточные файлы
        for temp_file in temp_files:
            if os.path.exists(temp_file):
                os.remove(temp_file)
                logging.info(f"Удален промежуточный файл {temp_file}")
    else:
        logging.warning("Данные отсутствуют")
        print(f"⚠️ Данные отсутствуют, файл не создан. Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

except psycopg2.Error as e:
    logging.error(f"Ошибка выполнения запроса: {e}")
    print(f"❌ Ошибка: {e}")
except Exception as e:
    logging.error(f"Неизвестная ошибка: {e}")
    print(f"❌ Неизвестная ошибка: {e}")
finally:
    # Закрываем соединение
    cur.close()
    conn.close()
    logging.info("Соединение с базой закрыто")