# Генерация данных на Scala с Custom OutputFormat

Этот ноутбук демонстрирует, как использовать Spark Scala API для подключения кастомного Hadoop OutputFormat.

**Важно:** Использование кастомных OutputFormat через `saveAsNewAPIHadoopFile` отключает оптимизации Spark SQL, поэтому это работает медленнее, чем нативный `write.parquet`.

In [None]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport
import org.apache.parquet.hadoop.ParquetOutputFormat

// Подключаем наш JAR (если он еще не в classpath, но обычно для ядра Toree нужно добавлять через AddJar или config при старте)
// В данном случае мы полагаем, что JAR добавлен или лежит доступно.

val spark = SparkSession.builder
    .appName("ScalaLargeDataGen")
    .master("spark://spark-master:7077")
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083")
    .config("spark.jars", "/home/jovyan/work/custom-formats.jar")
    .enableHiveSupport()
    .getOrCreate()

println("Session Created")

In [None]:
// Генерация данных
val numRows = 10000000L
val df = spark.range(numRows).withColumnRenamed("id", "row_id")
    .withColumn("val_int_1", (rand() * 100).cast("int"))
    .withColumn("val_int_2", (rand() * 10000).cast("int"))
    .withColumn("val_double_1", rand())
    .withColumn("val_double_2", rand() * 1000.0)
    .withColumn("category_code", (rand() * 5).cast("int").cast("string"))
    .withColumn("status", expr("elt(cast(rand()*3 as int) + 1, 'ACTIVE', 'INACTIVE', 'PENDING')"))
    .withColumn("random_string_1", hex(expr("rand() * 100000").cast("long")))
    .withColumn("random_string_2", concat(lit("prefix_"), col("row_id")))
    .withColumn("created_date", expr("date_add(current_date(), -cast(rand()*365 as int))"))
    .withColumn("description", lit("Scala test record"))

df.printSchema()

In [None]:
// Запись через RDD + Custom OutputFormat

import com.example.bigdata.SizeBasedParquetOutputFormat
import org.apache.spark.sql.Row

val outputPath = "/user/hive/warehouse/scala_custom_table_path"

// Конфигурируем Job для Parquet
val job = Job.getInstance(spark.sparkContext.hadoopConfiguration)
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
ParquetWriteSupport.setSchema(job, org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.convert(df.schema))

// Важно: RDD должен быть rddOf[(Void, Row)]
val rdd = df.rdd.map(row => (null, row))

println(s"Writing to $outputPath using SizeBasedParquetOutputFormat...")

// Custom Format сохраняет данные
rdd.saveAsNewAPIHadoopFile(
    outputPath,
    classOf[Void],
    classOf[Row],
    classOf[SizeBasedParquetOutputFormat],
    job.getConfiguration
)

println("Write complete.")

In [None]:
// Регистрация Hive таблицы поверх записанных данных
val tableName = "scala_custom_table"
spark.sql(s"DROP TABLE IF EXISTS $tableName")
spark.sql(s"CREATE EXTERNAL TABLE $tableName (row_id LONG, val_int_1 INT) STORED AS PARQUET LOCATION '$outputPath'")
// Примечание: Для полной схемы нужно перечислить все поля в CREATE TABLE, здесь сокращено для примера

spark.sql(s"SELECT count(*) FROM $tableName").show()