In [1]:
import os
from pyspark.sql import SparkSession
os.environ['SPARK_CONF_DIR'] = '/opt/spark/conf'

In [10]:
CATALOG_NAME = 'iceberg'
DATA_LOCATION = 's3a://mybucket/warehouse'

DWH_LAYERS = [
    'raw', # bronze layer - raw data
    'dds', # silver layer - detailed data
    'dm',  # gold layer - datamarts
]

SCHEMA_SUFFIX = [
    '',
    'stg', # stage data for every layer
]

PROJECT_PREFIX = 'cryptocurrencies_project'

DATABASES_LIST = [
    f"{PROJECT_PREFIX}_{layer}{'_' + suffix if suffix else ''}" 
    for layer in DWH_LAYERS 
    for suffix in SCHEMA_SUFFIX
]

BTC_TABLE = {
    'name': 'btc',
    'layer': 'raw',
    'attrs': '''
        open DECIMAL,
        high DECIMAL,
        low DECIMAL,
        close DECIMAL,
        volume DECIMAL,
        market_cap DECIMAL
    ''',
}

TABLES_LIST = [BTC_TABLE]

In [28]:
spark = SparkSession \
    .builder \
    .appName("setup-database") \
    .master(os.getenv('SPARK_MASTER_URL')) \
    .config('spark.driver.cores', '1')\
    .config('spark.driver.memory', '1G')\
    .config('spark.executor.instances', '1')\
    .config('spark.executor.cores', '1')\
    .config('spark.executor.memory', '512m')\
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

In [23]:
def create_database(db_name, spark_session=spark):
    spark_session.sql(f"""
        CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{db_name}
        LOCATION '{DATA_LOCATION}/{db_name}'
    """)

def create_table(table_data, spark_session=spark):
    databases = [db_name for db_name in DATABASES_LIST if table_data['layer'] in db_name]
    for db_name in databases:
        spark_session.sql(f"""
            CREATE EXTERNAL TABLE IF NOT EXISTS {CATALOG_NAME}.{db_name}.{table_data['name']} (
                {table_data['attrs']}
            )
            USING iceberg
            LOCATION '{DATA_LOCATION}/{db_name}/{table_data['name']}';
        """)

def show_all_tables(spark_session=spark):
    schemas = [row[0] for row in spark.sql(f"SHOW DATABASES IN {CATALOG_NAME}").collect()]
    # Для каждой схемы выводим таблицы
    for schema in schemas:
        tables = spark.sql(f"SHOW TABLES IN iceberg.{schema}").collect()
        for table in tables:
            print(f"  - {schema}.{table.tableName}")

In [5]:
for db in DATABASES_LIST:
    create_database(db)

In [29]:
for db in TABLES_LIST:
    create_table(db)

In [8]:
spark.sql("SHOW DATABASES IN iceberg").orderBy('namespace').show(truncate=False)

[Stage 0:>                                                          (0 + 1) / 2]

+--------------------------------+
|namespace                       |
+--------------------------------+
|cryptocurrencies_project_dds    |
|cryptocurrencies_project_dds_stg|
|cryptocurrencies_project_dm     |
|cryptocurrencies_project_dm_stg |
|cryptocurrencies_project_raw    |
|cryptocurrencies_project_raw_stg|
|test_iceberg_db                 |
|test_iceberg_db_2               |
+--------------------------------+



                                                                                

In [20]:
spark.sql("SHOW TABLES IN iceberg.cryptocurrencies_project_raw_stg").orderBy('namespace').show(truncate=False)

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------------------+---------+-----------+
|namespace                       |tableName|isTemporary|
+--------------------------------+---------+-----------+
|cryptocurrencies_project_raw_stg|btc      |false      |
+--------------------------------+---------+-----------+



                                                                                

In [30]:
show_all_tables()

  - test_iceberg_db_2.test_iceberg_table
  - test_iceberg_db.test_iceberg_table
  - cryptocurrencies_project_raw.btc
  - cryptocurrencies_project_raw_stg.btc


In [31]:
spark.stop()