![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

# An Introduction to the Iceberg Scala API

## [Part 1 - Loading a Catalog and Creating a Table](https://tabular.io/blog/java-api-part-1/)

In [None]:
spark

In [None]:
import org.apache.iceberg.catalog.Catalog
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.rest.RESTCatalog
import org.apache.iceberg.aws.s3.S3FileIOProperties

import scala.collection.JavaConverters._

val properties: Map[String, String] = Map(
  CatalogProperties.CATALOG_IMPL -> "org.apache.iceberg.rest.RESTCatalog",
  CatalogProperties.URI -> "http://rest:8181",
  CatalogProperties.WAREHOUSE_LOCATION -> "s3a://warehouse/wh",
  CatalogProperties.FILE_IO_IMPL -> "org.apache.iceberg.aws.s3.S3FileIO",
  S3FileIOProperties.ENDPOINT -> "http://minio:9000"
)

val catalog = new RESTCatalog()
val conf = new Configuration()
catalog.setConf(conf)
catalog.initialize("demo", properties.asJava)
catalog.name()

In [None]:
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types

val schema = new Schema(
  Types.NestedField.required(1, "level", Types.StringType.get()),
  Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
  Types.NestedField.required(3, "message", Types.StringType.get()),
  Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
)

schema

In [None]:
import org.apache.iceberg.PartitionSpec

val spec = PartitionSpec.builderFor(schema).hour("event_time").identity("level").build()

In [None]:
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.catalog.Namespace

val nyc = Namespace.of("nyc")
val name = TableIdentifier.of(nyc, "logs")

In [None]:
val sql = s"DROP TABLE IF EXISTS $name"
spark.sql(sql)

In [None]:
catalog.createTable(name, schema, spec)

## [Part 2 - Table Scans](https://tabular.io/blog/java-api-part-2/)

In [None]:
val sql = s"DROP TABLE IF EXISTS $name"
spark.sql(sql)

In [None]:
catalog.createTable(name, schema, spec)

In [None]:
val query =
  """INSERT INTO demo.nyc.logs
    |VALUES
    |('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')),
    |('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')),
    |('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))
    |""".stripMargin

spark.sql(query).show()

In [None]:
import org.apache.iceberg.catalog.Catalog
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.rest.RESTCatalog

val properties: Map[String, String] = Map(
  CatalogProperties.CATALOG_IMPL -> "org.apache.iceberg.rest.RESTCatalog",
  CatalogProperties.URI -> "http://rest:8181",
  CatalogProperties.WAREHOUSE_LOCATION -> "s3a://warehouse/wh/",
  CatalogProperties.FILE_IO_IMPL -> "org.apache.iceberg.aws.s3.S3FileIO",
  S3FileIOProperties.ENDPOINT -> "http://minio:9000"
);

val catalog = new RESTCatalog()
val conf = new Configuration()
catalog.setConf(conf)
catalog.initialize("demo", properties.asJava)

In [None]:
import org.apache.iceberg.Table
import org.apache.iceberg.TableScan
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier

val nyc = Namespace.of("nyc")
val name = TableIdentifier.of(nyc, "logs")
val table = catalog.loadTable(name)

In [None]:
import org.apache.iceberg.io.CloseableIterable
import org.apache.iceberg.data.Record
import org.apache.iceberg.data.IcebergGenerics

val result = IcebergGenerics.read(table).build()

In [None]:
result.asScala.foreach(println)

In [None]:
import org.apache.iceberg.expressions.Expressions

val result = IcebergGenerics.read(table).where(Expressions.equal("level", "error")).build()

In [None]:
import org.apache.iceberg.CombinedScanTask
import org.apache.iceberg.TableScan

val scan = table.newScan()

In [None]:
import org.apache.iceberg.expressions.Expressions

val filteredScan = scan.filter(Expressions.equal("level", "info")).select("message")

In [None]:
val result = filteredScan.planTasks()

In [None]:
import org.apache.iceberg.DataFile;

val task = result.iterator().next();
val dataFile = task.files().iterator().next().file();
System.out.println(dataFile);

## [Part 3 - Table Scans](https://tabular.io/blog/java-api-part-3/)

In [None]:
val webapp = Namespace.of("webapp")
val name = TableIdentifier.of(webapp, "user_events")

spark.sql(s"DROP TABLE IF EXISTS $name")

In [None]:
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.PartitionSpec

val schema = new Schema(
  Types.NestedField.optional(1, "event_id", Types.StringType.get()),
  Types.NestedField.optional(2, "username", Types.StringType.get()),
  Types.NestedField.optional(3, "userid", Types.IntegerType.get()),
  Types.NestedField.optional(4, "api_version", Types.StringType.get()),
  Types.NestedField.optional(5, "command", Types.StringType.get())
)

catalog.createTable(name, schema, PartitionSpec.unpartitioned())

In [None]:
import java.util.UUID
import com.google.common.collect.{ImmutableList, ImmutableMap}
import scala.jdk.CollectionConverters._
import org.apache.iceberg.data.GenericRecord

val record = GenericRecord.create(schema)
val builder = ImmutableList.builder[GenericRecord]()

val records = List(
  Map(
    "event_id" -> UUID.randomUUID().toString,
    "username" -> "Bruce",
    "userid" -> 1.asInstanceOf[AnyRef],
    "api_version" -> "1.0",
    "command" -> "grapple"
  ),
  Map(
    "event_id" -> UUID.randomUUID().toString,
    "username" -> "Wayne",
    "userid" -> 1.asInstanceOf[AnyRef],
    "api_version" -> "1.0",
    "command" -> "glide"
  ),
  Map(
    "event_id" -> UUID.randomUUID().toString,
    "username" -> "Clark",
    "userid" -> 1.asInstanceOf[AnyRef],
    "api_version" -> "2.0",
    "command" -> "fly"
  ),
  Map(
    "event_id" -> UUID.randomUUID().toString,
    "username" -> "Kent",
    "userid" -> 1.asInstanceOf[AnyRef],
    "api_version" -> "1.0",
    "command" -> "land"
  )
).map(data => record.copy(data.mapValues(_.asInstanceOf[AnyRef]).toMap.asJava)).foreach(builder.add)

In [None]:
import java.util.UUID
import org.apache.iceberg.io.{DataWriter, OutputFile}
import org.apache.iceberg.parquet.Parquet
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.data.parquet.GenericParquetWriter
import org.apache.iceberg.PartitionSpec

val filepath = s"${table.location()}/${UUID.randomUUID().toString}"
val file: OutputFile = table.io().newOutputFile(filepath)

val dataWriter: DataWriter[GenericRecord] =
  Parquet.writeData(file)
    .schema(schema)
    .createWriterFunc(GenericParquetWriter.buildWriter(_))
    .overwrite()
    .withSpec(PartitionSpec.unpartitioned())
    .build()

try {
  for (record <- builder.build().asScala) {
    dataWriter.write(record)
  }
} finally {
  dataWriter.close()
}


In [None]:
import org.apache.iceberg.DataFile

val dataFile = dataWriter.toDataFile()

In [None]:
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.Table;

val webapp = Namespace.of("webapp");
val name = TableIdentifier.of(webapp, "user_events");
val tbl = catalog.loadTable(name);
tbl.newAppend().appendFile(dataFile).commit()

In [None]:
import org.apache.iceberg.io.CloseableIterable
import org.apache.iceberg.data.Record
import org.apache.iceberg.data.IcebergGenerics

val result = IcebergGenerics.read(tbl).build();
result.asScala.foreach(println)