### Iceberg Example - Concurrency

This is a simple example showing how Apache Iceberg has partially better concurrency than a PostgresDB. It shows how the "simplicity" of the Iceberg interface makes working with it pretty easy and fun.

In [46]:
### Basic Dependency stuff.
import sys
!pip install --prefix {sys.prefix} psycopg2 pandas faker

import psycopg2
from threading import Thread
import time

[0m

### The PostgresDB Version of Insert & Alter at the same time

In [47]:
### Defining a few SQL Statements, 
drop_if_exists = """DROP TABLE  IF EXISTS public.employee"""

employee_ddl = """CREATE TABLE public.employee (
    id int8 NOT NULL,
    name varchar(120) NOT NULL,
    salary int8 NOT NULL,
    CONSTRAINT emp_pk PRIMARY KEY (id)
);"""

sql_dummy_data = """WITH salary_list AS (
    SELECT '{1000, 2000, 5000}'::INT[] salary
)
INSERT INTO public.employee
(id, name, salary)
SELECT n, 'Employee ' || n as name, salary[1 + mod(n, array_length(salary, 1))]
FROM salary_list, generate_series(1, 1000000) as n"""

# Just in case you're wondering, on my machine the insert takes: 33s

**Task**: We going to insert a million rows of data into our table, and while we're doing this, we're also renaming a couple of columns. In parallel, because stuff happens in parallel, and if we just chain stuff up, well, the day only has 24 hours, right?


In [51]:
conn = psycopg2.connect(host="postgres", port = 5432, database="demo_catalog", user="admin", password="password")
cur = conn.cursor()
  
############--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#-----
## Delete possible old table, create the new one, add a bunch of columns, get ready!

cur.execute(drop_if_exists)
cur.execute(employee_ddl)
cur.execute(f"""SELECT * FROM public.employee""")
print(cur.fetchall())

# Add columns.

count = 0
while count < 5:
    count += 1
    cur.execute(f"""ALTER TABLE public.employee ADD COLUMN last_column_{count} timestamptz;""") 
    print("Adding a column")

time.sleep(5)
############--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#-----
## Define our two concurrent tasks
    
def task1( threadName, delay):
    print("starting task 1; Inserting data")
    cur.execute(sql_dummy_data)
    print("finished task 1; Inserting data")
    
def task4( threadName, delay):
    count=0
    print(f"Task 4 starts to rename stuff")
    while count < 5:
        print(f"Task 4 about to rename column {count+1}")
        count += 1
        cur.execute(f"""ALTER TABLE public.employee RENAME last_column_{count} TO new_column_{count} """) 
        print("Task 4 finished adding a column")

    print("Task 4 finished")


############--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#-----
## Let them run...

# create two new threads
t1 = Thread(target=task1,args=("Thread-1", 2))
t4 = Thread(target=task4,args=("Thread-4", 2))

t1.start()
t4.start()

# wait for the threads to complete
t1.join()
t4.join()

# Close the cursor and connection
cur.close()
conn.close()

[]
Task 2 about to add column 1
Task 2 finished adding a column
Task 2 about to add column 2
Task 2 finished adding a column
Task 2 about to add column 3
Task 2 finished adding a column
Task 2 about to add column 4
Task 2 finished adding a column
Task 2 about to add column 5
Task 2 finished adding a column
starting task 1
Task 4 starts to rename stuff
Task 4 about to rename column 1
finished task 1
Task 4 finished adding a column
Task 4 about to rename column 2
Task 4 finished adding a column
Task 4 about to rename column 3
Task 4 finished adding a column
Task 4 about to rename column 4
Task 4 finished adding a column
Task 4 about to rename column 5
Task 4 finished adding a column
Task 4 finished


### The same two tasks running through Apache Iceberg

In [52]:
spark

In [53]:
%%sql

DROP TABLE IF EXISTS nyc.employee

In [54]:
employee_ddl = """CREATE TABLE nyc.employee (
    id int,
    name varchar(120),
    salary int
);"""

spark.sql(employee_ddl)

DataFrame[]

In [55]:
%%sql

INSERT INTO nyc.employee
(id, name, salary)
VALUES (1, 'bob', 1000)

In [56]:
spark.read.format("iceberg").load("nyc.employee").show(truncate = False)

+---+----+------+
|id |name|salary|
+---+----+------+
|1  |bob |1000  |
+---+----+------+



In [57]:
############--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#-----
## Some preparation data & SQLs

data = [(1, "Bob",1200)]
for i in range(0,10000000):
    data.append((2,f"Bob{i}",i))
print(f"Created {len(data)} data rows to insert")

# if you're wondering why there's 10 million rows here and just 1 million before: On my machine it roughly is 10 
# times faster to write to my local file system through Iceberg than through the Postgres

single_data = [(1, "Bob",1200)]

alter_sql_1 = """ALTER TABLE nyc.employee RENAME COLUMN salary TO new_salary;"""
alter_sql_2 = """ALTER TABLE nyc.employee RENAME COLUMN id TO new_id;"""
alter_sql_3 = """ALTER TABLE nyc.employee RENAME COLUMN name TO new_name;"""

############--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#-----
## Define our two concurrent tasks


def task1():
    print("starting task 1 on Iceberg; Inserting data")
    rdd = spark.sparkContext.parallelize(data)
    df = rdd.toDF()
    columns = ["id","name","salary"]
    df = rdd.toDF(columns)
    df.write.mode("append").saveAsTable("nyc.employee")
    # write takes about 25 secs and is 10 times the size of the other write... 

    print("finished task 1 on Iceberg; Inserting data")

def task2():
    time.sleep(10)
    print("start task 2 for renaming things..")
    spark.sql(alter_sql_1)
    spark.sql(alter_sql_2)
    spark.sql(alter_sql_3)
    
    print("finish renaming things...")
   
    print("start task 2 now inserting one data piece..")

    rdd = spark.sparkContext.parallelize(single_data)
    df = rdd.toDF()
    columns = ["new_id","new_name","new_salary"]
    df = rdd.toDF(columns)
    df.write.mode("append").saveAsTable("nyc.employee")
    print("task 2 done..")

    #spark.read.format("iceberg").load("nyc.employee").show(truncate = False)

############--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#--------#-----
## Letting them run..

# create two new threads
t1 = Thread(target=task1)
t2 = Thread(target=task2)

# start the threads
t1.start()
t2.start()

# wait for the threads to complete
t1.join()
t2.join()
spark.read.format("iceberg").load("nyc.employee").show(truncate = False)


10000001
am here
starting task 1 on Iceberg


22/05/16 12:26:46 WARN TaskSetManager: Stage 57 contains a task of very large size (52559 KiB). The maximum recommended task size is 1000 KiB.
22/05/16 12:26:47 WARN TaskSetManager: Stage 58 contains a task of very large size (52559 KiB). The maximum recommended task size is 1000 KiB.
22/05/16 12:26:48 WARN TaskSetManager: Stage 59 contains a task of very large size (52559 KiB). The maximum recommended task size is 1000 KiB.
[Stage 59:>                                                         (0 + 4) / 4]

start task 2 for renaming things..
finish renaming things...
start task 2 now inserting one data piece..


                                                                                

task 2 done..


                                                                                

finished task 1 on Iceberg
+------+--------+----------+
|new_id|new_name|new_salary|
+------+--------+----------+
|1     |bob     |1000      |
|1     |Bob     |1200      |
|2     |Bob0    |0         |
|2     |Bob1    |1         |
|2     |Bob2    |2         |
|2     |Bob3    |3         |
|2     |Bob4    |4         |
|2     |Bob5    |5         |
|2     |Bob6    |6         |
|2     |Bob7    |7         |
|2     |Bob8    |8         |
|2     |Bob9    |9         |
|2     |Bob10   |10        |
|2     |Bob11   |11        |
|2     |Bob12   |12        |
|2     |Bob13   |13        |
|2     |Bob14   |14        |
|2     |Bob15   |15        |
|2     |Bob16   |16        |
|2     |Bob17   |17        |
+------+--------+----------+
only showing top 20 rows



In [58]:
%%sql

SELECT * FROM nyc.employee.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2022-05-16 12:24:42.446000,5645340098941460626,,True
2022-05-16 12:27:10.753000,4734883099060616137,5.64534009894146e+18,True
2022-05-16 12:27:11.660000,4648749835832900692,4.734883099060616e+18,True


In [59]:
%%sql

SELECT * FROM nyc.employee.snapshots

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2022-05-16 12:24:42.446000,5645340098941460626,,append,/home/iceberg/warehouse/nyc/employee/metadata/snap-5645340098941460626-1-2fb68011-dbbf-4cd7-a1c6-89663f3fd5c5.avro,"{'spark.app.id': 'local-1652690273198', 'changed-partition-count': '1', 'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '1', 'total-position-deletes': '0', 'added-files-size': '866', 'total-delete-files': '0', 'total-files-size': '866', 'total-records': '1', 'total-data-files': '1'}"
2022-05-16 12:27:10.753000,4734883099060616137,5.64534009894146e+18,append,/home/iceberg/warehouse/nyc/employee/metadata/snap-4734883099060616137-1-1ac1affb-9e70-4a76-9687-48b047c70ee7.avro,"{'spark.app.id': 'local-1652690273198', 'changed-partition-count': '1', 'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '1', 'total-position-deletes': '0', 'added-files-size': '922', 'total-delete-files': '0', 'total-files-size': '1788', 'total-records': '2', 'total-data-files': '2'}"
2022-05-16 12:27:11.660000,4648749835832900692,4.734883099060616e+18,append,/home/iceberg/warehouse/nyc/employee/metadata/snap-4648749835832900692-1-bdabe93b-a772-44fc-b14e-cd89191b45be.avro,"{'spark.app.id': 'local-1652690273198', 'changed-partition-count': '1', 'added-data-files': '4', 'total-equality-deletes': '0', 'added-records': '10000001', 'total-position-deletes': '0', 'added-files-size': '37326423', 'total-delete-files': '0', 'total-files-size': '37328211', 'total-records': '10000003', 'total-data-files': '6'}"


In [62]:
spark.read.option("snapshot-id", 4648749835832900692).format("iceberg").load("nyc.employee").show(truncate = False)

+------+--------+----------+
|new_id|new_name|new_salary|
+------+--------+----------+
|1     |bob     |1000      |
|1     |Bob     |1200      |
|2     |Bob0    |0         |
|2     |Bob1    |1         |
|2     |Bob2    |2         |
|2     |Bob3    |3         |
|2     |Bob4    |4         |
|2     |Bob5    |5         |
|2     |Bob6    |6         |
|2     |Bob7    |7         |
|2     |Bob8    |8         |
|2     |Bob9    |9         |
|2     |Bob10   |10        |
|2     |Bob11   |11        |
|2     |Bob12   |12        |
|2     |Bob13   |13        |
|2     |Bob14   |14        |
|2     |Bob15   |15        |
|2     |Bob16   |16        |
|2     |Bob17   |17        |
+------+--------+----------+
only showing top 20 rows



**Cool things about Iceberg**: 
1. The renaming is super fast (because it's just metadata). Renaming is also just a metadata operation in Postgres (correct me if I am wrong!), but changing data types would be just as fast with Iceberg, whereas it would take much longer on a PostgresDB. 
2. It works concurrently! Which doesn't work in Postgres (and creates a mess using just Hive tables).  

Note: Yes of course you can use PostgresQL and tell it to not rewrite the table... 

Also note: Iceberg creates snapshots only on addition of new data! So if we would not insert a new data piece with the new columns, we wouldn't get a snapshot for the "inbetween state", which simply is nice to show off.
