In [1]:
import os
import json
import pyspark
import pandas as pd
import numpy as np

import pyspark.sql.types as T
import pyspark.sql.functions as F

from delta import *

In [2]:
builder = pyspark.sql.SparkSession.builder.appName('access_logs_upserts_test') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')

In [3]:
spark = configure_spark_with_delta_pip(builder).getOrCreate()
sc = spark.sparkContext

23/01/07 18:13:16 WARN Utils: Your hostname, Zambo-ROG resolves to a loopback address: 127.0.1.1; using 172.24.112.98 instead (on interface eth0)
23/01/07 18:13:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/wtfzambo/.ivy2/cache
The jars for the packages stored in: /home/wtfzambo/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-feb44f56-5e53-4938-8060-9c98f123b67e;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/wtfzambo/my-stuff/delta_bug/.venv/lib/python3.7/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 232ms :: artifacts dl 6ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runtime;3.5.2 from central in [default]
	org.antlr#antlr4;4.7 from central in [default]
	org.antlr#antlr4-runtime;4.7 from central in [default]
	org.glassfish#javax.json;1.0.4 from central in [default]
	---------------------------------------------------------------------
	|             

In [4]:
schema = """
col_a string,
col_b integer
"""

df = spark.read.format('csv') \
    .option('header', 'true') \
    .option('escape', '"') \
    .schema(schema) \
    .load('data.csv')


In [5]:
df.show()

+-----+-----+
|col_a|col_b|
+-----+-----+
|  foo|    1|
|  bar|    2|
|  baz|    3|
+-----+-----+



In [6]:
delta_path = 'delta-table'
abs_delta_path = os.path.abspath(delta_path)
my_delta_table = f'delta.`{abs_delta_path}`'

In [7]:
df.repartition(1)
df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('overwriteSchema', 'true') \
    .save(abs_delta_path)

dt = DeltaTable.forPath(spark, abs_delta_path)

                                                                                

In [8]:
spark.sql(f"DESCRIBE TABLE {my_delta_table}").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|          col_a|   string|       |
|          col_b|      int|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+



ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8

In [9]:
spark.sql(f"ALTER TABLE {my_delta_table} ADD COLUMNS (my_new_col string)")

DataFrame[]

In [10]:
spark.sql(f"DESCRIBE TABLE {my_delta_table}").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|          col_a|   string|       |
|          col_b|      int|       |
|     my_new_col|   string|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+



In [11]:
dt.history(1).show(vertical=True, truncate=False)

-RECORD 0------------------------------------------------------------------------------------------------------------
 version             | 5                                                                                             
 timestamp           | 2023-01-07 18:13:30.014                                                                       
 userId              | null                                                                                          
 userName            | null                                                                                          
 operation           | ADD COLUMNS                                                                                   
 operationParameters | {columns -> [{"column":{"name":"my_new_col","type":"string","nullable":true,"metadata":{}}}]} 
 job                 | null                                                                                          
 notebook            | null                             

As you can see, the new column seems to be added successfully. However this is not reflected in the `dt` object:

In [12]:
dt.toDF().printSchema()

root
 |-- col_a: string (nullable = true)
 |-- col_b: integer (nullable = true)



In [13]:
dt.toDF().show()

+-----+-----+
|col_a|col_b|
+-----+-----+
|  foo|    1|
|  bar|    2|
|  baz|    3|
+-----+-----+



On the other hand, running something like this:

In [14]:
spark.sql(f"UPDATE {my_delta_table} SET col_a = 'bananas'")

DataFrame[]

Is reflected in the `dt` object:

In [15]:
dt.toDF().show()

+-------+-----+
|  col_a|col_b|
+-------+-----+
|bananas|    1|
|bananas|    2|
|bananas|    3|
+-------+-----+



The only way to see the new column is to re-create the `dt` object by running again:

In [18]:
dt = DeltaTable.forPath(spark, abs_delta_path)

In [22]:
dt.toDF().show()

+-------+-----+----------+
|  col_a|col_b|my_new_col|
+-------+-----+----------+
|bananas|    1|      null|
|bananas|    2|      null|
|bananas|    3|      null|
+-------+-----+----------+



A similar behavior can be seen if casting the spark Dataframe to a Pandas dataframe after updating a column:

In [23]:
spark.sql(f"UPDATE {my_delta_table} SET my_new_col = 'mangos'")

DataFrame[]

In [27]:
# No problem

dt.toDF().show()

+-------+-----+----------+
|  col_a|col_b|my_new_col|
+-------+-----+----------+
|bananas|    1|    mangos|
|bananas|    2|    mangos|
|bananas|    3|    mangos|
+-------+-----+----------+



In [29]:
# Latest update to `my_new_col` is missing

dt.toDF().toPandas()

Unnamed: 0,col_a,col_b,my_new_col
0,bananas,1,
1,bananas,2,
2,bananas,3,
