# 2 - Concurrency

In [36]:
%pip install tenacity --quiet

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, -1, Finished, Available)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0[0m[39;49m -> [0m[32;49m23.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/nfs4/pyenv-83b1a4ec-c3bc-4a52-abb7-7092cdccbb2a/bin/python -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.





## Imports and data classes

In [37]:
import os
import uuid
from joblib import Parallel, delayed
from dataclasses import dataclass
from delta.tables import *

@dataclass
class TaskResult:
    task_id: int
    is_successful: bool
    error_message: str


StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 44, Finished, Available)

## File system preparation

In [38]:
import pyspark.sql.functions as F

table_path_root = "Files/delta-lake/2-concurrency"

table_append = f"{table_path_root}/append"
table_merge = f"{table_path_root}/merge"
table_merge_retry = f"{table_path_root}/merge-retry"
table_partitioned = f"{table_path_root}/partitioned"

if mssparkutils.fs.exists(table_path_root):
    mssparkutils.fs.rm(table_path_root, True)

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 45, Finished, Available)

## Data preparation

Good option is Databricks Labs [Data Generator](https://github.com/databrickslabs/dbldatagen)

In [39]:
def get_data_frame(spark_session=spark, start=0, end=1000_000):
    df = spark_session.sql(f"""
    SELECT
        id, CONCAT('Record ', id) as name, CAST(RAND() * 1000 AS INT) as value
    FROM
        RANGE({start}, {end})
    """)

    return df

display(get_data_frame(end=5))

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 46, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0835fc3f-c2ff-4ccd-9618-e51ce04890a7)

In [40]:
def create_table(table_path):
    if mssparkutils.fs.exists(table_path):
        mssparkutils.fs.rm(table_path, True)
    
    df = get_data_frame()

    print(f"Creating table at: {table_path}")
    df.write.format("delta").mode("overwrite").save(table_path)

    display(df.limit(5))

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 47, Finished, Available)

In [41]:
PARALLELISM = 4
def run_paralell_jobs(func_to_run, items):
    tasks = [delayed(func_to_run)(x) for x in items]
    result = Parallel(n_jobs=PARALLELISM, prefer="threads")(tasks)
    return result

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 48, Finished, Available)

## Blind Appends

In [42]:
create_table(table_append)

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 49, Finished, Available)

Creating table at: Files/delta-lake/2-concurrency/append


SynapseWidget(Synapse.DataFrame, 654a07c8-87b3-42eb-8bde-db1d696e07ed)

In [43]:
def append_table(task):
  print(f"Starting task: {task}")

  dfUpdates = get_data_frame(start=task*10_000, end=(task+1)*10_000)
  (
    dfUpdates
      .write
      .format("delta")
      .option("userMetadata", f"commit from task {task}")
      .mode("append")
      .save(table_append)
  )

  print(f"Processed task: {task} for records with ids {task*10_000} till {(task+1)*10_000 - 1}")
  return TaskResult(task, True, None)

run_paralell_jobs(append_table, range(4))

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 50, Finished, Available)

Starting task: 0
Starting task: 1
Starting task: 2
Starting task: 3
Processed task: 0 for records with ids 0 till 9999
Processed task: 1 for records with ids 10000 till 19999
Processed task: 2 for records with ids 20000 till 29999
Processed task: 3 for records with ids 30000 till 39999


[TaskResult(task_id=0, is_successful=True, error_message=None),
 TaskResult(task_id=1, is_successful=True, error_message=None),
 TaskResult(task_id=2, is_successful=True, error_message=None),
 TaskResult(task_id=3, is_successful=True, error_message=None)]

In [44]:
spark.read.format("delta").load(table_append).count()

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 51, Finished, Available)

1040000

In [45]:
display(DeltaTable.forPath(spark, table_append).history())

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 52, Finished, Available)

SynapseWidget(Synapse.DataFrame, 449233e8-5e4f-4af3-a2f0-861d72469456)

## Concurrent merges

In [46]:
create_table(table_merge)

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 53, Finished, Available)

Creating table at: Files/delta-lake/2-concurrency/merge


SynapseWidget(Synapse.DataFrame, 9c1279eb-78a6-44ef-a0b3-8fe505f44855)

In [47]:
def merge_into_table(task):
  print(f"Starting task: {task}")
  
  spark_clone = spark.newSession()
  spark_clone.conf.set("spark.databricks.delta.commitInfo.userMetadata", f"merge into table task: {task}")

  dfUpdates = get_data_frame(spark_clone, start=task*10_000, end=(task+1)*10_000)
  dfUpdates.createOrReplaceTempView(f"updates")  # No risk of using same view name as we have separate Spark sessions

  try:
    spark_clone.sql(f"""
        MERGE INTO delta.`{table_merge}` target USING updates source
        ON target.id = source.id
        WHEN MATCHED 
        THEN UPDATE SET target.value = source.value;
    """)
  except Exception as ex:
    ex_type = type(ex)
    print(f"Task {task} failed with exception of type: {ex_type}")
    return TaskResult(task, False, str(ex))
  
  print(f"Processed task: {task} for records with ids {task*10_000} till {(task+1)*10_000 - 1}")

  return TaskResult(task, True, None)

result = run_paralell_jobs(merge_into_table, range(4))
print("-" * 100)
print(result)

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 54, Finished, Available)

Starting task: 0
Starting task: 1
Starting task: 2
Starting task: 3
Processed task: 0 for records with ids 0 till 9999
Task 2 failed with exception of type: <class 'delta.exceptions.ConcurrentAppendException'>
Task 3 failed with exception of type: <class 'delta.exceptions.ConcurrentAppendException'>
Task 1 failed with exception of type: <class 'delta.exceptions.ConcurrentAppendException'>
----------------------------------------------------------------------------------------------------
[TaskResult(task_id=0, is_successful=True, error_message=None), TaskResult(task_id=1, is_successful=False, error_message='Files were added to the root of the table by a concurrent update. Please try the operation again.\nConflicting commit: {"timestamp":1689738786969,"operation":"MERGE","operationParameters":{"predicate":(target.id = source.id),"matchedPredicates":[{"actionType":"update"}],"notMatchedPredicates":[]},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics

In [48]:
display(DeltaTable.forPath(spark, table_merge).history())

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 55, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7c1cad5d-db1a-40b5-a95e-0358a1fb5642)

## Concurrent merges with retry

In [54]:
create_table(table_merge_retry)

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 61, Finished, Available)

Creating table at: Files/delta-lake/2-concurrency/merge-retry


SynapseWidget(Synapse.DataFrame, d497164b-60b1-49d3-8e64-97cd52987e13)

In [55]:
from tenacity import retry, stop_after_attempt, wait_random
from delta.exceptions import ConcurrentAppendException

MAX_ATTEMPTS = 5
WAIT_TIME_MIN = 1
WAIT_TIME_MAX = 10

def merge_into_table_with_retry(task):
  print(f"Starting task: {task}")

  @retry(stop=stop_after_attempt(MAX_ATTEMPTS), wait=wait_random(min=WAIT_TIME_MIN, max=WAIT_TIME_MAX))
  def merge_it(task):

    print(f"merge_it function is called for task: {task}")


    spark_clone = spark.newSession()
    commit_message = f"merge into table with retry task {task}"
    spark_clone.conf.set("spark.databricks.delta.commitInfo.userMetadata", commit_message)

    dfUpdates = get_data_frame(spark_clone, start=task*10_000, end=(task+1)*10_000)
    dfUpdates.createOrReplaceTempView(f"updates")
    
    spark_clone.sql(f"""
        MERGE INTO delta.`{table_merge_retry}` target USING updates source
        ON target.id = source.id
        WHEN MATCHED 
        THEN UPDATE SET target.value = source.value;
    """)

  # No 100% guarantee that retry will prevent concurrency exceptions but ry clause is skipped here for demo purposes
  merge_it(task)
  
  print(f"Processed task: {task} for records with ids {task*10_000} till {(task+1)*10_000 - 1}")
  return TaskResult(task, True, None)

run_paralell_jobs(merge_into_table_with_retry, range(4))

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 62, Finished, Available)

Starting task: 0
merge_it function is called for task: 0
Starting task: 1
merge_it function is called for task: 1
Starting task: 2
merge_it function is called for task: 2
Starting task: 3
merge_it function is called for task: 3
Processed task: 3 for records with ids 30000 till 39999
merge_it function is called for task: 0
merge_it function is called for task: 1
merge_it function is called for task: 2
Processed task: 0 for records with ids 0 till 9999
merge_it function is called for task: 2
merge_it function is called for task: 1
Processed task: 2 for records with ids 20000 till 29999
Processed task: 1 for records with ids 10000 till 19999


[TaskResult(task_id=0, is_successful=True, error_message=None),
 TaskResult(task_id=1, is_successful=True, error_message=None),
 TaskResult(task_id=2, is_successful=True, error_message=None),
 TaskResult(task_id=3, is_successful=True, error_message=None)]

In [56]:
display(DeltaTable.forPath(spark, table_merge_retry).history())

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 63, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2558c37f-07a7-4329-a324-003a19367ab2)

## Use Partitions

In [57]:
def get_data_frame_with_year_column(spark_session=spark, start=0, end=1000_000):
    df = spark_session.sql(f"""
    SELECT
        id,
        CONCAT('Record ', id) as name,
        (id % 4) + 2020 as year,
        CAST(RAND() * 1000 AS INT) as value
    FROM
        RANGE({start}, {end})
    """)

    return df

display(get_data_frame_with_year_column(end=10))

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 64, Finished, Available)

SynapseWidget(Synapse.DataFrame, 36ebc264-967f-49da-af60-e384a8eba167)

In [58]:
def create_partitioned_table(table_path):
    if mssparkutils.fs.exists(table_path):
        mssparkutils.fs.rm(table_path, True)
    
    df = get_data_frame_with_year_column(spark)
    
    print(f"Creating partitioned table at: {table_path}")
    df.write.partitionBy("year").format("delta").mode("overwrite").save(table_path)

    display(df.limit(5))

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 65, Finished, Available)

In [59]:
create_partitioned_table(table_partitioned)

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 66, Finished, Available)

Creating partitioned table at: Files/delta-lake/2-concurrency/partitioned


SynapseWidget(Synapse.DataFrame, dc816ecf-9f0e-4f56-8562-b5acdf924e19)

In [60]:
def merge_into_partitioned_table(task):
  print(f"Starting task: {task}")
  
  spark_clone = spark.newSession()
  spark_clone.conf.set("spark.databricks.delta.commitInfo.userMetadata", f"merge into partitioned table task: {task}")

  year = 2020 + task

  dfUpdates = get_data_frame_with_year_column(spark_clone, start=0, end=40_000)
  dfUpdates = dfUpdates.where(f"year = {year}").withColumn("value", F.expr("value * -1"))
  dfUpdates.createOrReplaceTempView(f"updates")


  # Notice the year filter in the ON clause
  spark_clone.sql(f"""
      MERGE INTO delta.`{table_partitioned}` target 
      USING updates source
      ON target.id = source.id AND target.year = {year}
      WHEN MATCHED
      THEN UPDATE SET target.value = source.value;
  """)

  
  print(f"Processed task: {task}")
  return TaskResult(task, True, None)

run_paralell_jobs(merge_into_partitioned_table, range(4))

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 67, Finished, Available)

Starting task: 0
Starting task: 1
Starting task: 2
Starting task: 3
Processed task: 2
Processed task: 3
Processed task: 1
Processed task: 0


[TaskResult(task_id=0, is_successful=True, error_message=None),
 TaskResult(task_id=1, is_successful=True, error_message=None),
 TaskResult(task_id=2, is_successful=True, error_message=None),
 TaskResult(task_id=3, is_successful=True, error_message=None)]

In [61]:
display(DeltaTable.forPath(spark, table_partitioned).history())

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 68, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5461992c-131c-47bd-b51f-0c73e23ca196)

In [63]:
display(
    DeltaTable.forPath(spark, table_partitioned)
    .toDF()
    .where("id between 39990 and 40010")
    .orderBy("id")
)

StatementMeta(, 3b7ef3e1-3a93-46d9-8394-3728ba80ed3d, 70, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3a527061-0f05-48c8-b7f5-e74c0bd4d0ef)