In [None]:
                # НАСТРОЙКА

In [1]:
from pyspark.sql import SparkSession
import os
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_spark_session() -> SparkSession:
    """
    Создать Spark сессию с Iceberg REST Catalog и MinIO.
    Credentials берутся из переменных окружения.
    
    Returns:
        SparkSession: Настроенная сессия
    """
    
    access_key = os.environ.get('AWS_ACCESS_KEY_ID', 'admin')
    secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', 'password')
    
    spark = SparkSession.builder \
        .appName("Iceberg-Basics") \
        .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.iceberg.type", "rest") \
        .config("spark.sql.catalog.iceberg.uri", "http://iceberg-rest:8181") \
        .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
        .config("spark.sql.catalog.iceberg.warehouse", "s3://warehouse") \
        .config("spark.sql.catalog.iceberg.s3.endpoint", "http://minio:9000") \
        .config("spark.sql.catalog.iceberg.s3.access-key-id", access_key) \
        .config("spark.sql.catalog.iceberg.s3.secret-access-key", secret_key) \
        .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") \
        .config("spark.sql.defaultCatalog", "iceberg") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .getOrCreate()
    
    logger.info(f"Spark version: {spark.version}")
    logger.info(f"Default catalog: iceberg")
    logger.info(f"REST Catalog: http://iceberg-rest:8181")
    
    return spark

spark = create_spark_session()


INFO:__main__:Spark version: 3.5.0
INFO:__main__:Default catalog: iceberg
INFO:__main__:REST Catalog: http://iceberg-rest:8181


In [None]:
import os

def check_environment_variables() -> None:
    """Проверить переменные окружения."""
    
    print("=" * 80)
    print("ПЕРЕМЕННЫЕ ОКРУЖЕНИЯ")
    print("=" * 80)
    
    env_vars = [
        'SPARK_HOME',
        'JAVA_HOME',
        'PATH',
        'PYTHONPATH',
        'SPARK_DRIVER_MEMORY',
        'SPARK_EXECUTOR_MEMORY',
        'PYSPARK_PYTHON',
        'PYSPARK_DRIVER_PYTHON'
    ]
    
    for var_name in env_vars:
        var_value = os.environ.get(var_name, 'НЕ УСТАНОВЛЕНА')
        print(f"{var_name}: {var_value}")
    
    print("=" * 80)
check_environment_variables()

In [None]:
import os

def check_spark_config() -> None:
    """Проверить spark-defaults.conf файл."""
    
    spark_home = os.environ.get('SPARK_HOME')
    print("\n" + "=" * 80)
    print("SPARK CONFIGURATION (spark-defaults.conf)")
    print("=" * 80)
    
    if not spark_home:
        print("SPARK_HOME: НЕ УСТАНОВЛЕН")
        print("=" * 80)
        return
    
    config_path = f"{spark_home}/conf/spark-defaults.conf"
    print(f"Путь к файлу: {config_path}")
    print("-" * 80)
    
    if os.path.exists(config_path):
        with open(config_path, 'r') as f:
            lines = f.readlines()
        
        if lines:
            for line in lines:
                line = line.strip()
                if line and not line.startswith('#'):
                    print(line)
        else:
            print("Статус: ФАЙЛ ПУСТОЙ")
    else:
        print("Статус: ФАЙЛ НЕ НАЙДЕН")
    
    print("=" * 80)
check_spark_config()

In [None]:
import os
import glob

def check_spark_jars() -> None:
    """Проверить JAR файлы в Spark."""
    
    spark_home = os.environ.get('SPARK_HOME')
    
    print("\n" + "=" * 80)
    print("JAR ФАЙЛЫ В SPARK")
    print("=" * 80)
    
    if not spark_home:
        print("SPARK_HOME: НЕ УСТАНОВЛЕН")
        print("=" * 80)
        return
    
    jars_dir = f"{spark_home}/jars"
    print(f"Директория JAR: {jars_dir}")
    print("-" * 80)
    
    if not os.path.exists(jars_dir):
        print(f"Статус: ДИРЕКТОРИЯ НЕ НАЙДЕНА")
        print("=" * 80)
        return
    
    # Критичные JAR для Iceberg
    jar_patterns = {
        'iceberg-spark-runtime': '*iceberg-spark-runtime*.jar',
        'iceberg-aws-bundle': '*iceberg-aws-bundle*.jar',
        'hadoop-aws': '*hadoop-aws*.jar',
        'aws-java-sdk-bundle': '*aws-java-sdk-bundle*.jar',
        'postgresql': '*postgresql*.jar'
    }
    
    for jar_name, pattern in jar_patterns.items():
        jars = glob.glob(f"{jars_dir}/{pattern}")
        
        if jars:
            for jar_path in jars:
                filename = os.path.basename(jar_path)
                size_mb = os.path.getsize(jar_path) / (1024 * 1024)
                print(f"{jar_name}: {filename} ({size_mb:.2f} MB)")
        else:
            print(f"{jar_name}: НЕ НАЙДЕН")
    
    print("-" * 80)
    
    # Подсчет всех JAR
    all_jars = glob.glob(f"{jars_dir}/*.jar")
    print(f"Всего JAR файлов: {len(all_jars)}")
    print("=" * 80)

check_spark_jars()


In [None]:
import subprocess
from typing import Dict, Optional

def get_package_version(package_name: str) -> Optional[str]:
    """
    Получить версию установленного пакета.
    
    Args:
        package_name: Название пакета
        
    Returns:
        Версия пакета или None если не установлен
    """
    result = subprocess.run(
        ['pip', 'show', package_name],
        capture_output=True,
        text=True
    )
    
    if result.returncode == 0:
        for line in result.stdout.split('\n'):
            if line.startswith('Version:'):
                return line.split(':', 1)[1].strip()
    return None

def check_python_packages() -> None:
    """Проверить установленные Python пакеты."""
    
    print("\n" + "=" * 80)
    print("PYTHON ПАКЕТЫ")
    print("=" * 80)
    
    packages = [
        'pyspark',
        'pyiceberg',
        'delta-spark',
        'great-expectations',
        'py4j',
        'pandas',
        'numpy'
    ]
    
    for package in packages:
        version = get_package_version(package)
        status = version if version else 'НЕ УСТАНОВЛЕН'
        print(f"{package}: {status}")
    
    print("=" * 80)

check_python_packages()


In [None]:
                # ПОЛУЧИТЬ

In [3]:
# Просмотр namespaces, таблиц и описать таблицу 
spark.sql("SHOW NAMESPACES IN iceberg").show()
spark.sql("SHOW TABLES IN iceberg.sandbox").show()
spark.sql("DESCRIBE EXTENDED sandbox.users").show(truncate=False)

+---------+
|namespace|
+---------+
|  sandbox|
+---------+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  sandbox|   orders|      false|
|  sandbox| products|      false|
|  sandbox|    users|      false|
+---------+---------+-----------+

+----------------------------+----------------------------+-------+
|col_name                    |data_type                   |comment|
+----------------------------+----------------------------+-------+
|user_id                     |bigint                      |NULL   |
|name                        |string                      |NULL   |
|email                       |string                      |NULL   |
|age                         |bigint                      |NULL   |
|country                     |string                      |NULL   |
|is_active                   |boolean                     |NULL   |
|created_at                  |timestamp_ntz               |NULL   |
|                   

In [None]:
# Чтение из таблицы users
# Запросы можно делать к iceberg.sandbox.users или к sandbox.users
users_df = spark.sql("""
    SELECT * 
    FROM iceberg.sandbox.users
    LIMIT 10 """)
users_df.show()
# users_df.printSchema()

In [None]:
# Чтение из таблицы products
spark.sql("SELECT * FROM sandbox.products").show(5)

In [None]:
# Чтение из таблицы orders
spark.sql("SELECT * FROM sandbox.orders").show(5)

In [None]:
                # СОЗДАТЬ

In [None]:
"""
Через генератор тестовых данных CreateTestData.py создал структуру Iceberg Data Lakehouse из трех таблиц в s3
- sandbox.users Пользователи
- sandbox.orders Заказы
- sandbox.products Товары 
Описание структуры таблиц и данных смотреть в CreateTestData.py в этом блакноте изучем работу с Iceberg через Spark SQL и Spark DataFrame API 
"""

In [None]:
# Создать namespace'ы
spark.sql("CREATE NAMESPACE IF NOT EXISTS sandbox")

In [None]:
# Создаем новую независимую таблицу для практики Spark SQL
# customer_reviews (отзывы клиентов) Эта таблица будет связана с существующими, но не изменит их структуру.
"""
Блок TBLPROPERTIES при создании таблицы задаёт свойства Iceberg, которые управляют хранилищем и форматами данных:
"""

# Создать таблицу
spark.sql("""
    CREATE TABLE IF NOT EXISTS sandbox.customer_reviews (
        review_id BIGINT,
        user_id BIGINT,
        product_id BIGINT,
        order_id BIGINT,
        rating INT,
        review_text STRING,
        helpful_count INT,
        verified_purchase BOOLEAN,
        review_date TIMESTAMP,
        created_at TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (months(review_date))
    TBLPROPERTIES (
        'write.format.default' = 'parquet',
        'write.metadata.compression-codec' = 'gzip',
        'format-version' = '2'
    )
""")

In [None]:
# Проверить создание
spark.sql("DESCRIBE EXTENDED sandbox.customer_reviews").show(truncate=False)

In [None]:
# Генерация тестовых данных для customer_reviews

In [None]:
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import random

# Прочитать существующие таблицы
orders_df = spark.table("iceberg.sandbox.orders")

# Генерация: ~30% заказов имеют отзывы
reviews_sample = orders_df.sample(0.3)

# Создать отзывы
reviews_df = reviews_sample \
    .select(
        F.monotonically_increasing_id().alias("review_id"),
        F.col("user_id"),
        F.col("product_id"),
        F.col("order_id"),
        F.round(F.rand() * 4 + 1).cast("int").alias("rating"),
        F.concat(
            F.lit("Great product "),
            F.col("product_id").cast("string")
        ).alias("review_text"),
        F.round(F.rand() * 50).cast("int").alias("helpful_count"),
        F.when(F.col("status") == "completed", True).otherwise(False).alias("verified_purchase"),
        (F.col("order_date") + F.expr("INTERVAL 3 DAYS")).alias("review_date"),
        F.current_timestamp().alias("created_at")
    )

# Показать preview
print(f"Всего отзывов: {reviews_df.count()}")


In [None]:
                # ЗАГРУЗИТЬ

In [None]:
# Вариант А: через DataFrame API
reviews_df.writeTo("sandbox.customer_reviews") \
    .using("iceberg") \
    .append()

In [None]:
# Вариант Б: через SQL
# reviews_df.createOrReplaceTempView("temp_reviews")

# spark.sql("""
#     INSERT INTO sandbox.customer_reviews
#     SELECT * FROM temp_reviews
# """)

In [None]:
# Проверить
spark.sql("SELECT COUNT(*) as total FROM sandbox.customer_reviews").show()

In [None]:
                # ОБНОВИТЬ

In [None]:
# Обновить helpful_count для отзывов с высоким рейтингом
spark.sql("""
    UPDATE sandbox.customer_reviews
    SET helpful_count = helpful_count + 10
    WHERE rating >= 4
""")

In [None]:
# Проверить
spark.sql("""
    SELECT rating, AVG(helpful_count) as avg_helpful
    FROM iceberg.sandbox.customer_reviews
    GROUP BY rating
    ORDER BY rating
""").show()

In [None]:
                # УДАЛИТЬ

In [None]:
# Удалить таблицу customer_reviews
"""
DROP TABLE (без PURGE):
Файлы остаются в S3 (безопасность от случайного удаления)
Удаляет метаданные таблицы в REST‑каталоге;
"""

spark.sql("DROP TABLE IF EXISTS sandbox.customer_reviews")

In [None]:
# Проверить что удалилась
spark.sql("SHOW TABLES IN iceberg.sandbox").show()

In [None]:
# Удалить таблицу И файлы из S3
"""
DROP TABLE PURGE:
Удаляет метаданные
Удаляет все файлы из S3
Используй PURGE для полной очистки!

Iceberg REST catalog - PURGE может не удалять данные сразу из-за асинхронности или прав доступа.
Iceberg удаляет файлы окончательно через процедуры обслуживания, а не моментально при DROP.
"""
spark.sql("DROP TABLE IF EXISTS sandbox.customer_reviews PURGE")

# Проверить что удалилась
spark.sql("SHOW TABLES IN iceberg.sandbox").show()

In [None]:
# Удалить отзывы без подтверждения покупки и с низким рейтингом
spark.sql("""
    DELETE FROM sandbox.customer_reviews
    WHERE verified_purchase = false 
      AND rating <= 2
""")

In [None]:
# Показать сколько удалили
spark.sql("""
    SELECT 
        COUNT(*) as total_reviews,
        SUM(CASE WHEN verified_purchase = false THEN 1 ELSE 0 END) as unverified
    FROM sandbox.customer_reviews
""").show()

In [None]:
                # АНАЛИТИЧЕСКИЕ ЗАПРОСЫ

In [2]:
# Проверка распределения
spark.sql("""
  SELECT 
      rating, 
      COUNT(*) AS cnt, 
      AVG(helpful_count) AS avg_helpful
  FROM sandbox.customer_reviews
  GROUP BY rating
  ORDER BY rating
""").show()


+------+-----+------------------+
|rating|  cnt|       avg_helpful|
+------+-----+------------------+
|     1|   54| 55.48148148148148|
|     2|  915| 48.78251366120219|
|     3| 6028|50.233742534837425|
|     4|10536| 49.82232346241458|
|     5|11819| 51.95989508418648|
+------+-----+------------------+



In [None]:
# Получить статистику по странам
spark.sql("""
    SELECT 
        country,
        COUNT(*) as users_count,
        AVG(age) as avg_age
    FROM iceberg.sandbox.users
    GROUP BY country
    ORDER BY users_count DESC
""").show()

In [3]:
# Чтение с фильтром по месяцу (partition pruning)
spark.sql("""
  SELECT 
    COUNT(*) AS cnt,
    MIN(review_date) AS min_dt,
    MAX(review_date) AS max_dt
  FROM sandbox.customer_reviews
  WHERE review_date >= '2024-05-01' AND review_date < '2024-06-01'
""").show()

+----+-------------------+-------------------+
| cnt|             min_dt|             max_dt|
+----+-------------------+-------------------+
|5470|2024-05-01 00:00:00|2024-05-31 00:00:00|
+----+-------------------+-------------------+



In [None]:
# Средний рейтинг по категориям товаров
spark.sql("""
    SELECT 
        p.category,
        COUNT(r.review_id) as reviews_count,
        AVG(r.rating) as avg_rating,
        SUM(r.helpful_count) as total_helpful
    FROM sandbox.customer_reviews r
    JOIN sandbox.products p ON r.product_id = p.product_id
    GROUP BY p.category
    ORDER BY avg_rating DESC
""").show()

In [None]:
# Топ товары по отзывам
spark.sql("""
    SELECT 
        p.product_name,
        p.category,
        COUNT(r.review_id) as reviews_count,
        AVG(r.rating) as avg_rating
    FROM sandbox.customer_reviews r
    JOIN sandbox.products p ON r.product_id = p.product_id
    GROUP BY p.product_name, p.category
    HAVING COUNT(r.review_id) >= 5
    ORDER BY avg_rating DESC, reviews_count DESC
    LIMIT 10
""").show(truncate=False)

In [None]:
# Активные рецензенты
spark.sql("""
    SELECT 
        u.name,
        u.country,
        COUNT(r.review_id) as reviews_written,
        AVG(r.rating) as avg_rating_given
    FROM sandbox.customer_reviews r
    JOIN sandbox.users u ON r.user_id = u.user_id
    GROUP BY u.name, u.country
    ORDER BY reviews_written DESC
    LIMIT 10
""").show()

In [None]:
# Создать агрегированную таблицу средних рейтингов по категориям
# CREATE TABLE AS SELECT
spark.sql("""
  CREATE TABLE IF NOT EXISTS sandbox.category_ratings
  USING iceberg
  TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.metadata.compression-codec' = 'gzip',
    'format-version' = '2'
  )
  AS
  SELECT 
    p.category,
    COUNT(r.review_id) AS reviews_count,
    AVG(r.rating) AS avg_rating,
    SUM(r.helpful_count) AS total_helpful
  FROM sandbox.customer_reviews r
  JOIN sandbox.products p ON r.product_id = p.product_id
  GROUP BY p.category
""")

# Проверка
spark.sql("SELECT * FROM sandbox.category_ratings").show()

In [None]:
                # МЕТАДАННЫЕ

In [None]:

def show_properties(table_name: str) -> None:
    """Показать свойства таблицы Iceberg."""
    spark.sql(f"SHOW TBLPROPERTIES {table_name}").show(truncate=False)


In [None]:
show_properties("sandbox.customer_reviews")