# Iceberg REST Catalog

In [4]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master("spark://spark-master:7077")  # type: ignore
    .appName("Testing Iceberg")
    # .config(
    #     "spark.jars.packages",
    #     "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,org.apache.iceberg:iceberg-aws-bundle:1.4.2",
    # )
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/22 15:00:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Spark Config

```bash
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,org.apache.iceberg:iceberg-aws-bundle:1.4.2 \
 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
 --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
 --conf spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
 --conf spark.sql.catalog.spark_catalog.warehouse=/opt/workspace/warehouse \
 --conf spark.sql.catalog.spark_catalog.uri=http://rest:8181 \
 --conf spark.sql.catalog.spark_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
 --conf spark.sql.catalog.spark_catalog.s3.endpoint=http://minio:9000
```


In [5]:
spark.conf.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
spark.conf.set("spark.sql.catalog.demo.uri", "http://rest:8181")
spark.conf.set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
spark.conf.set("spark.sql.catalog.demo.s3.endpoint", "http://minio:9000")
spark.conf.set("spark.sql.defaultCatalog", "demo")

In [6]:
%%sql
CREATE NAMESPACE new_namespace;

23/12/22 15:00:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
%%sql
CREATE TABLE IF NOT EXISTS demo.new_namespace.quote (Date Date, Close Float) USING iceberg PARTITIONED BY (YEAR(Date));

In [16]:
quote = spark.read.csv("data/GDAXI.INDX.csv", header=True, inferSchema=True).select(["Date", "Close"])
quote.writeTo("demo.new_namespace.quote").append()

                                                                                

In [None]:
spark.stop()

# PyIceberg

In [21]:
import os

from pyiceberg.catalog import load_catalog

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") or ""
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") or ""

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://rest:8181",
        # "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
        "s3.endpoint": "http://minio:9000",
        "s3.access-key-id": AWS_ACCESS_KEY_ID,
        "s3.secret-access-key": AWS_SECRET_ACCESS_KEY,
    },
)

In [13]:
catalog.list_namespaces()

[('new_namespace',)]

In [14]:
catalog.list_tables("new_namespace")

[('new_namespace', 'quote')]

In [22]:
catalog.load_table("new_namespace.quote").scan().to_arrow()

pyarrow.Table
Date: date32[day]
Close: float
----
Date: [[2010-01-04,2010-01-05,2010-01-06,2010-01-07,2010-01-08,...,2010-12-23,2010-12-27,2010-12-28,2010-12-29,2010-12-30],[2011-01-03,2011-01-04,2011-01-05,2011-01-06,2011-01-07,...,2011-12-23,2011-12-27,2011-12-28,2011-12-29,2011-12-30],...,[2004-01-02,2004-01-05,2004-01-06,2004-01-07,2004-01-08,...,2004-12-23,2004-12-27,2004-12-28,2004-12-29,2004-12-30],[2005-01-03,2005-01-04,2005-01-05,2005-01-06,2005-01-07,...,2005-12-23,2005-12-27,2005-12-28,2005-12-29,2005-12-30]]
Close: [[6048.3,6031.86,6034.33,6019.36,6037.61,...,7057.69,6970.73,6972.1,6995.47,6914.19],[6989.74,6975.35,6939.82,6981.39,6947.84,...,5878.93,5889.76,5771.27,5848.78,5898.35],...,[4018.5,4035.9,4035.44,4004.4,4045.43,...,4251.62,4235.36,4261.79,4247.75,4256.08],[4291.53,4290.5,4258.24,4300.94,4316.4,...,5419.05,5444.84,5447.15,5458.58,5408.26]]

In [28]:
import duckdb
conn = catalog.load_table("new_namespace.quote").scan().to_duckdb(table_name="quote")

conn.sql("SELECT * FROM quote").pl()

Date,Close
date,f32
2010-01-04,6048.299805
2010-01-05,6031.859863
2010-01-06,6034.330078
2010-01-07,6019.359863
2010-01-08,6037.609863
2010-01-11,6040.5
2010-01-12,5943.0
2010-01-13,5963.140137
2010-01-14,5988.879883
2010-01-15,5875.970215
