In [2]:
from concurrent.futures import ThreadPoolExecutor

from delta import DeltaTable, configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
# setup spark with delta lake

builder = (
    SparkSession.builder.appName("MyApp")
    .config(
        "spark.sql.extensions",
        "io.delta.sql.DeltaSparkSessionExtension",
    )
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/bruno/.ivy2/cache
The jars for the packages stored in: /home/bruno/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-58ebc0b5-07c0-4b80-8428-218a6f55d6e8;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 168ms :: artifacts dl 11ms
	:: modules in use:
	io.delta#delta-core_2.12;2.1.0 from central in [default]
	io.delta#delta-storage;2.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evic

22/10/09 23:18:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/09 23:18:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
_data_path = "./data/concurrency-test"

# create a table with only one column named id, from 0 to 5,
# and write it to the disk
data = (
    spark.range(0, 5)
    .withColumn("value", F.col("id"))
    .write.format("delta")
    .save(_data_path)
)

                                                                                

In [5]:
# define a function to do some updated on the table

def execute_expression(order):

    _data_path = "./data/concurrency-test"

    deltaTable = DeltaTable.forPath(spark, _data_path)

    # if it's odd, sum 1
    if order % 2:
        _expr = F.expr("value + 1")

    # if it's even, multiply by 2
    else:
        _expr = F.expr("value * 2")

    _success = False

    # concurrent updates will fail, so here they're retried until success
    while not _success:

        try:

            deltaTable.update(
                condition=F.expr("id == 1"),
                set={"value": _expr},
            )

            print(f"success, order {order}")

            _success = True

        except Exception:
            pass

In [6]:
# executes the function 10 times, simultaneously

executor = ThreadPoolExecutor(max_workers=10)

for order in range(10):

    executor.submit(execute_expression, order)

success, order 7
success, order 9
success, order 4
success, order 2
success, order 8
success, order 6
success, order 5
success, order 0
success, order 3
success, order 1


In [7]:
# reads the results

deltaTable = DeltaTable.forPath(spark, _data_path)
deltaTable.toDF().orderBy("id").show(100, False)

+---+-----+
|id |value|
+---+-----+
|0  |0    |
|1  |100  |
|2  |2    |
|3  |3    |
|4  |4    |
+---+-----+



In [8]:
# expected value is 63

x = 1

for order in range(10):

    if order % 2:
        x += 1

    else:
        x *= 2
x

63