In [None]:
# Проверяем доступность spark
spark

In [None]:
# Проверяем наличие нужных параметров в конфигурации
print(spark.conf.get("spark.sql.extensions"))
print(spark.conf.get("spark.sql.catalog.spark_catalog"))

In [None]:
# Создаем базу данных
spark.sql("create database module_4")

In [2]:
# Создание таблицы при помощи DDL
table_ddl = """CREATE OR REPLACE TABLE module_4.lesson_43 (
  id INT,
  run_parameters STRING,
  status STRING,
  status_message STRING
)
USING DELTA;
"""

spark.sql(table_ddl)

24/07/25 08:05:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

24/07/25 08:05:34 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `module_4`.`lesson_43` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


DataFrame[]

In [1]:
# Форматированная инфомрация о таблице
spark.sql("describe formatted module_4.lesson_43").show()

                                                                                

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  id|                 int|       |
|      run_parameters|              string|       |
|              status|              string|       |
|      status_message|              string|       |
|                    |                    |       |
|      # Partitioning|                    |       |
|     Not partitioned|                    |       |
|                    |                    |       |
|# Detailed Table ...|                    |       |
|                Name|  module_4.lesson_43|       |
|            Location|s3a://yc-dataproc...|       |
|            Provider|               delta|       |
|               Owner|             jupyter|       |
|    Table Properties|[delta.minReaderV...|       |
+--------------------+--------------------+-------+



In [4]:
# Детальное описание Delta таблицы
spark.sql("describe detail module_4.lesson_43").show()

+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
|format|                  id|              name|description|            location|           createdAt|       lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|       tableFeatures|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
| delta|50d416ea-20cd-4f0...|module_4.lesson_43|       null|s3a://yc-dataproc...|2024-07-25 08:05:...|2024-07-25 08:05:21|              []|       0|          0|        {}|               1|               2|[appendOnly, inva...|
+------+--------------------+------------------+-----------+--------------------+-----------

In [5]:
# История изменений Delta таблицы
spark.sql("describe history module_4.lesson_43").show()

                                                                                

+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+
|version|          timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--------------------+
|      0|2024-07-25 08:05:21|  null|    null|CREATE OR REPLACE...|{isManaged -> tru...|null|    null|     null|       null|  Serializable|         true|              {}|        null|Apache-Spark/3.3....|
+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+------------+--

In [6]:
# Записываем в таблицу 1 строку
spark.sql("insert into module_4.lesson_43 values (1, 'some params', 'Run', '')")

                                                                                

DataFrame[]

In [None]:
# Ошибка при insert
spark.sql("insert into module_4.lesson_43 values (100, null, 'Error')")

In [7]:
# Запись строки с указанием колонок
spark.sql("insert into module_4.lesson_43 (id, run_parameters, status, status_message) values (2, 'another params', 'Success', 'Data inserted successfully')")

                                                                                

DataFrame[]

In [8]:
# Проверка результата
spark.table("module_4.lesson_43").show()

                                                                                

+---+--------------+-------+--------------------+
| id|run_parameters| status|      status_message|
+---+--------------+-------+--------------------+
|  2|another params|Success|Data inserted suc...|
|  1|   some params|    Run|                    |
+---+--------------+-------+--------------------+



In [9]:
# Обновление строки с id = 1
spark.sql("update module_4.lesson_43 set status='Warning' where id = 1")

                                                                                

DataFrame[num_affected_rows: bigint]

In [10]:
# Проверка результата
spark.table("module_4.lesson_43").show()

                                                                                

+---+--------------+-------+--------------------+
| id|run_parameters| status|      status_message|
+---+--------------+-------+--------------------+
|  2|another params|Success|Data inserted suc...|
+---+--------------+-------+--------------------+



In [13]:
# Перезаписываем данные в таблице
spark.sql("insert overwrite table module_4.lesson_43 (id, run_parameters, status, status_message) values (3, 'correct params', 'Success', 'Correct data inserted successfully')")

                                                                                

DataFrame[]

In [14]:
# Проверяем успешность перезаписи
spark.table("module_4.lesson_43").show(truncate=False)

                                                                                

+---+--------------+-------+----------------------------------+
|id |run_parameters|status |status_message                    |
+---+--------------+-------+----------------------------------+
|3  |correct params|Success|Correct data inserted successfully|
+---+--------------+-------+----------------------------------+



In [15]:
# Создаем временную таблицу
spark.sql("""CREATE OR REPLACE TEMP VIEW lesson_43_updates AS SELECT * FROM VALUES
  (4, 'wrong params', 'Failed', 'Error: OutOfMemoryException'),
  (1, 'upload test table', 'Success', 'Table was uploaded'),
  (3, 'correct settings', 'Success', 'Empty message'),
  (5, 'correct settings', 'Failed', 'Unavailable source cluster')
  AS tab(id, run_parameters, status, status_message);""")

DataFrame[]

In [17]:
# Операция объединения
spark.sql("""MERGE INTO module_4.lesson_43
    USING lesson_43_updates
    ON module_4.lesson_43.id = lesson_43_updates.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
""")

                                                                                

24/07/25 08:15:39 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
24/07/25 08:15:40 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
24/07/25 08:15:40 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.


                                                                                

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [18]:
# Проверяем успешность вставки
spark.table("module_4.lesson_43").show(truncate=False)

                                                                                

+---+-----------------+-------+---------------------------+
|id |run_parameters   |status |status_message             |
+---+-----------------+-------+---------------------------+
|1  |upload test table|Success|Table was uploaded         |
|3  |correct settings |Success|Empty message              |
|4  |wrong params     |Failed |Error: OutOfMemoryException|
|5  |correct settings |Failed |Unavailable source cluster |
+---+-----------------+-------+---------------------------+



In [20]:
# Просмотриваем историю операций
spark.sql("describe history module_4.lesson_43").show()

+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      5|2024-07-25 08:15:41|  null|    null|               MERGE|{predicate -> (sp...|null|    null|     null|          4|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|      4|2024-07-25 08:09:45|  null|    null|               WRITE|{mode -> Overwrit...|null|    null|     null|          3|  Serializable|        false|{numFiles -> 1, 

In [21]:
# Вывод данных по указанной версии
spark.sql("select * from module_4.lesson_43 version as of 2").show()

                                                                                

+---+--------------+-------+--------------------+
| id|run_parameters| status|      status_message|
+---+--------------+-------+--------------------+
|  2|another params|Success|Data inserted suc...|
|  1|   some params|    Run|                    |
+---+--------------+-------+--------------------+



In [23]:
# Вывод данных по указанной временной метке
spark.sql(f"select * from module_4.lesson_43 timestamp as of '2024-07-25 08:09:46'").show(truncate=False)

                                                                                

+---+--------------+-------+----------------------------------+
|id |run_parameters|status |status_message                    |
+---+--------------+-------+----------------------------------+
|3  |correct params|Success|Correct data inserted successfully|
+---+--------------+-------+----------------------------------+

