In [None]:
from pyspark.sql import SparkSession

iceberg_version = "1.5.0"
spark_major_version = "3.5"

packages = [
    f"org.apache.iceberg:iceberg-spark-runtime-{spark_major_version}_2.12:{iceberg_version}",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "org.postgresql:postgresql:42.6.0"
]

spark = SparkSession.builder \
    .appName("Iceberg Local Dev") \
    .config("spark.jars.packages", ",".join(packages)) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "jdbc") \
    .config("spark.sql.catalog.iceberg.uri", "jdbc:postgresql://postgres:5432/iceberg_catalog") \
    .config("spark.sql.catalog.iceberg.jdbc.user", "admin") \
    .config("spark.sql.catalog.iceberg.jdbc.password", "password") \
    .config("spark.sql.catalog.iceberg.jdbc.schema-version", "V1") \
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://warehouse/iceberg_data") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()


spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.db")


spark.sql("DROP TABLE IF EXISTS iceberg.db.musteriler_v2")

spark.sql("""
CREATE TABLE iceberg.db.musteriler_v2 (
    id bigint,
    isim string,
    sehir string,
    tutar double,
    created_at_date date,
    islem_zamani timestamp
) USING iceberg
PARTITIONED BY (created_at_date, sehir)
""")


from datetime import date, datetime
data = [
    (101, "Ali", "Istanbul", 150.0, date(2024, 1, 1), datetime.now()),
    (102, "Veli", "Izmir", 200.0, date(2024, 1, 2), datetime.now())
]
df = spark.createDataFrame(data, ["id", "isim", "sehir", "tutar", "created_at_date", "islem_zamani"])
df.writeTo("iceberg.db.musteriler_v2").append()

spark.sql("SELECT * FROM iceberg.db.musteriler_v2").show()

+---+----+--------+-----+---------------+--------------------+
| id|isim|   sehir|tutar|created_at_date|        islem_zamani|
+---+----+--------+-----+---------------+--------------------+
|102|Veli|   Izmir|200.0|     2024-01-02|2026-01-04 19:26:...|
|101| Ali|Istanbul|150.0|     2024-01-01|2026-01-04 19:26:...|
+---+----+--------+-----+---------------+--------------------+

