In [1]:
pip install pyspark==3.5.0

Collecting py4j==0.10.9.7 (from pyspark==3.5.0)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.7
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import IntegerType, StringType
from concurrent.futures import ThreadPoolExecutor, as_completed

In [3]:
class OracleDataLoader:
    def __init__(self,
                 spark_session: SparkSession,
                 jdbc_url: str,
                 username: str,
                 password: str):
        """
        Инициализация загрузчика данных

        :param spark_session: Spark сессия
        :param jdbc_url: URL подключения к базе данных
        :param username: Имя пользователя БД
        :param password: Пароль пользователя БД
        """
        self.spark = spark_session
        self.jdbc_url = jdbc_url
        self.username = username
        self.password = password

        # Параметры конфигурации
        self.config = {
            'num_threads': 8,
            'chunk_count': 512,
            'fetch_size': 100000,
            'target_num_files': 400
        }

    def _generate_extent_query(self, schema: str, table: str) -> str:
        """
        Генерация запроса для получения информации о экстентах таблицы

        :param schema: Схема БД
        :param table: Имя таблицы
        :return: SQL запрос
        """
        return f"""
        (SELECT
            data_object_id,
            file_id,
            relative_fno,
            file_batch,
            subobject_name,
            MIN(start_block_id) start_block_id,
            MAX(end_block_id) end_block_id,
            SUM(blocks) blocks
        FROM
            (SELECT
                o.data_object_id,
                o.subobject_name,
                e.file_id,
                e.relative_fno,
                e.block_id start_block_id,
                e.block_id + e.blocks - 1 end_block_id,
                e.blocks,
                CEIL(SUM(e.blocks) OVER (PARTITION BY o.data_object_id, e.file_id ORDER BY e.block_id ASC) /
                    (SUM(e.blocks) OVER (PARTITION BY o.data_object_id, e.file_id) / {self.config['chunk_count']})) file_batch
            FROM
                dba_extents e,
                dba_objects o,
                dba_tab_subpartitions tsp
            WHERE
                o.owner = '{schema}'
                AND o.object_name = '{table}'
                AND e.owner = '{schema}'
                AND e.segment_name = '{table}'
                AND o.owner = e.owner
                AND o.object_name = e.segment_name
                AND (o.subobject_name = e.partition_name
                    OR (o.subobject_name IS NULL AND e.partition_name IS NULL))
                AND o.owner = tsp.table_owner(+)
                AND o.object_name = tsp.table_name(+)
                AND o.subobject_name = tsp.subpartition_name(+))
        GROUP BY
            data_object_id,
            file_id,
            relative_fno,
            file_batch,
            subobject_name
        ORDER BY 
            data_object_id,
            file_id,
            relative_fno,
            file_batch,
            subobject_name)
        """

    def _create_rowid_query_udf(self):
        """
        Создание UDF для генерации запросов по диапазону rowid

        :return: UDF функция
        """

        @udf(returnType=StringType())
        def generate_rowid_query(data_object_id, relative_fno, start_block_id, end_block_id, columns):
            columns_str = ", ".join(columns)
            return f"""
            SELECT /*+ NO_INDEX(t) */ {columns_str}
            FROM table_name
            WHERE (rowid >= dbms_rowid.rowid_create(1, {data_object_id}, {relative_fno}, {start_block_id}, 0)
                   AND rowid <= dbms_rowid.rowid_create(1, {data_object_id}, {relative_fno}, {end_block_id}, 32767))
            """

        return generate_rowid_query

    def load_data(self,
                  source_schema: str,
                  source_table: str,
                  target_path: str,
                  target_table: str,
                  partition: str = None):
        """
        Основной метод загрузки данных

        :param source_schema: Схема источника
        :param source_table: Таблица источника
        :param target_path: Путь для сохранения
        :param target_table: Целевая таблица
        :param partition: Партиция (опционально)
        """
        # Получение информации об экстентах
        extent_df = (self.spark.read
                     .format("jdbc")
                     .option("url", self.jdbc_url)
                     .option("user", self.username)
                     .option("password", self.password)
                     .option("driver", "oracle.jdbc.driver.OracleDriver")
                     .option("dbtable", self._generate_extent_query(source_schema, source_table))
                     .load()
                     )

        # Фильтрация по партиции если указана
        if partition:
            extent_df = extent_df.filter(col("subobject_name") == partition)

        # Получение списка колонок
        columns_df = (self.spark.read
                      .format("jdbc")
                      .option("url", self.jdbc_url)
                      .option("user", self.username)
                      .option("password", self.password)
                      .option("driver", "oracle.jdbc.driver.OracleDriver")
                      .option("dbtable", f"(SELECT * FROM {source_schema}.{source_table} WHERE 1=0)")
                      .load()
                      )
        columns = columns_df.columns

        # Создание UDF для генерации запросов
        rowid_query_udf = self._create_rowid_query_udf()

        # Генерация запросов для загрузки
        queries_df = (extent_df
                      .withColumn("query",
                                  rowid_query_udf(
                                      col("data_object_id").cast(IntegerType()),
                                      col("relative_fno").cast(IntegerType()),
                                      col("start_block_id").cast(IntegerType()),
                                      col("end_block_id").cast(IntegerType()),
                                      lit(columns)
                                  )
                                  )
                      .select("relative_fno", "file_batch", "query")
                      )

        # Параллельная загрузка данных
        def load_chunk(query_info):
            relative_fno, file_batch, query = query_info
            chunk_key = f"{relative_fno}_{file_batch}"

            chunk_df = (self.spark.read
                        .format("jdbc")
                        .option("url", self.jdbc_url)
                        .option("user", self.username)
                        .option("password", self.password)
                        .option("driver", "oracle.jdbc.driver.OracleDriver")
                        .option("dbtable", f"({query})")
                        .option("fetchSize", self.config['fetch_size'])
                        .load()
                        )

            chunk_df.write.mode("overwrite").orc(f"{target_path}_tmp/part_{chunk_key}")
            return chunk_key

        # Выполнение параллельной загрузки
        with ThreadPoolExecutor(max_workers=self.config['num_threads']) as executor:
            query_list = queries_df.collect()
            futures = [executor.submit(load_chunk, (row.relative_fno, row.file_batch, row.query)) for row in query_list]

            # Ожидание завершения загрузки
            as_completed(futures)

        # Объединение и сохранение результата
        result_df = self.spark.read.orc(f"{target_path}_tmp/*")
        (result_df
         .coalesce(self.config['target_num_files'])
         .write
         .mode("overwrite")
         .option("path", target_path)
         .format("orc")
         .saveAsTable(target_table)
         )

        # Очистка временных файлов
        import shutil
        shutil.rmtree(f"{target_path}_tmp", ignore_errors=True)

In [None]:
spark = SparkSession.builder.appName("Egor").config("spark.master", "spark://spark-master:7077").getOrCreate()

In [None]:
jdbc_url = "jdbc:oracle:thin:@//oracle:1521/db"
username = "sys"
password = "admin"

# Создание загрузчика
loader = OracleDataLoader(spark, jdbc_url, username, password)


In [None]:
loader.load_data(
    source_schema="SYS",
    source_table="LARGE_DATA_TABLE",
    target_path="/warehouse/tablespace/external/hive/customer360_stg.db/service_feature_orcl",
    target_table="customer360_stg.service_feature_orcl",
    partition="SERVICE_FEATURE_MAX"
)

In [None]:
spark.stop()