![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

# An Introduction to the Iceberg Java API

## [Part 1 - Loading a Catalog and Creating a Table](https://tabular.io/blog/java-api-part-1/)

In [None]:
import org.apache.iceberg.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.aws.AwsProperties;

Map<String, String> properties = new HashMap<>();

properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.rest.RESTCatalog");
properties.put(CatalogProperties.URI, "http://rest:8181");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3a://warehouse/wh");
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
properties.put(AwsProperties.S3FILEIO_ENDPOINT, "http://minio:9000");

RESTCatalog catalog = new RESTCatalog();
Configuration conf = new Configuration();
catalog.setConf(conf);
catalog.initialize("demo", properties);
catalog.name();

In [None]:
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;

Schema schema = new Schema(
      Types.NestedField.required(1, "level", Types.StringType.get()),
      Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
      Types.NestedField.required(3, "message", Types.StringType.get()),
      Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
    );
schema

In [None]:
import org.apache.iceberg.PartitionSpec;

PartitionSpec spec = PartitionSpec.builderFor(schema)
      .hour("event_time")
      .identity("level")
      .build();
spec

In [None]:
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.Namespace;

Namespace nyc = Namespace.of("nyc");
TableIdentifier name = TableIdentifier.of(nyc, "logs");
name

In [None]:
catalog.createTable(name, schema, spec)

In [None]:
catalog.dropTable(name)

## [Part 2 - Table Scans](https://tabular.io/blog/java-api-part-2/)

In [None]:
catalog.createTable(name, schema, spec)

In [None]:
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("Java API Demo")
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
  .config("spark.sql.catalog.demo.uri", "http://rest:8181")
  .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
  .config("spark.sql.catalog.demo.s3.endpoint", "http://minio:9000")
  .config("spark.sql.defaultCatalog", "demo")
  .config("spark.eventLog.enabled", "true")
  .config("spark.eventLog.dir", "/home/iceberg/spark-events")
  .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")
  .getOrCreate();

spark.sparkContext().setLogLevel("ERROR");

In [None]:
String query = "INSERT INTO demo.nyc.logs "
             + "VALUES "
             + "('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), "
             + "('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), "
             + "('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))";

spark.sql(query).show()

In [None]:
import org.apache.iceberg.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.RESTCatalog;

Map<String, String> properties = new HashMap<>();

properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.rest.RESTCatalog");
properties.put(CatalogProperties.URI, "http://rest:8181");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3a://warehouse/wh/");
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
properties.put(AwsProperties.S3FILEIO_ENDPOINT, "http://minio:9000");

RESTCatalog catalog = new RESTCatalog();
Configuration conf = new Configuration();
catalog.setConf(conf);
catalog.initialize("demo", properties);

In [None]:
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

Namespace nyc = Namespace.of("nyc");
TableIdentifier name = TableIdentifier.of(nyc, "logs");
Table table = catalog.loadTable(name);

In [None]:
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.IcebergGenerics;

CloseableIterable<Record> result = IcebergGenerics.read(table).build();

In [None]:
for (Record r: result) {
    System.out.println(r);
}

In [None]:
import org.apache.iceberg.expressions.Expressions;

CloseableIterable<Record> result = IcebergGenerics.read(table)
        .where(Expressions.equal("level", "error"))
        .build();

In [None]:
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.TableScan;

TableScan scan = table.newScan();

In [None]:
import org.apache.iceberg.expressions.Expressions;

TableScan filteredScan = scan.filter(Expressions.equal("level", "info")).select("message")

In [None]:
Iterable<CombinedScanTask> result = filteredScan.planTasks();

In [None]:
import org.apache.iceberg.DataFile;

CombinedScanTask task = result.iterator().next();
DataFile dataFile = task.files().iterator().next().file();
System.out.println(dataFile);

## [Part 3 - Table Scans](https://tabular.io/blog/java-api-part-3/)

In [None]:
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.PartitionSpec;

Schema schema = new Schema(
      Types.NestedField.optional(1, "event_id", Types.StringType.get()),
      Types.NestedField.optional(2, "username", Types.StringType.get()),
      Types.NestedField.optional(3, "userid", Types.IntegerType.get()),
      Types.NestedField.optional(4, "api_version", Types.StringType.get()),
      Types.NestedField.optional(5, "command", Types.StringType.get())
    );

Namespace webapp = Namespace.of("webapp");
TableIdentifier name = TableIdentifier.of(webapp, "user_events");
catalog.createTable(name, schema, PartitionSpec.unpartitioned());

In [None]:
import java.util.UUID;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.data.GenericRecord;

GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Bruce", "userid", 1, "api_version", "1.0", "command", "grapple")));
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Wayne", "userid", 1, "api_version", "1.0", "command", "glide")));
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Clark", "userid", 1, "api_version", "2.0", "command", "fly")));
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Kent", "userid", 1, "api_version", "1.0", "command", "land")));
ImmutableList<GenericRecord> records = builder.build();

In [None]:
import org.apache.iceberg.Files;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.data.parquet.GenericParquetWriter;

String filepath = table.location() + "/" + UUID.randomUUID().toString();
OutputFile file = table.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
    Parquet.writeData(file)
        .schema(schema)
        .createWriterFunc(GenericParquetWriter::buildWriter)
        .overwrite()
        .withSpec(PartitionSpec.unpartitioned())
        .build();
try {
  for (GenericRecord record : builder.build()) {
    dataWriter.write(record);
  }
} finally {
  dataWriter.close();
}

In [None]:
import org.apache.iceberg.DataFile;

DataFile dataFile = dataWriter.toDataFile();

In [None]:
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.Table;

Namespace webapp = Namespace.of("webapp");
TableIdentifier name = TableIdentifier.of(webapp, "user_events");
Table tbl = catalog.loadTable(name);
tbl.newAppend().appendFile(dataFile).commit()

In [None]:
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.IcebergGenerics;

CloseableIterable<Record> result = IcebergGenerics.read(tbl).build();
for (Record r: result) {
    System.out.println(r);
}