In [0]:
#  ref: https://learn.microsoft.com/en-us/azure/databricks/getting-started/connect-to-azure-storage
#  https://learn.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes

In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import DataFrame
from delta import DeltaTable

In [0]:
service_credential = dbutils.secrets.get(scope="scope",key="databricssample")

spark.conf.set("fs.azure.account.auth.type.upskilling456.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.upskilling456.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.upskilling456.dfs.core.windows.net", "027aadd5-2489-4768-93dd-6efc9042130e")
spark.conf.set("fs.azure.account.oauth2.client.secret.upskilling456.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.upskilling456.dfs.core.windows.net", "https://login.microsoftonline.com/16b3c013-d300-468d-ac64-7eda0820b6d3/oauth2/token")

In [0]:
source_path_sample_1 = "abfss://sample@upskilling456.dfs.core.windows.net/sample1.json"
source_path_sample_2 = "abfss://sample@upskilling456.dfs.core.windows.net/sample2.json"

target_path = "abfss://sample@upskilling456.dfs.core.windows.net/delta_table"

In [0]:
# define schema
schema = StructType([
        StructField("user_id", StringType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("city", StringType(), nullable=False),
        StructField("country", StringType(), nullable=False),
    ])

In [0]:


#country = USA
data_frame_sample_1 = (spark.read.format("JSON")
                        .schema(schema)                     
                        .option("multiline", True)
                        .load(source_path_sample_1)
                ).cache()

#country = UK
data_frame_sample_2 = (spark.read.format("JSON")
                        .schema(schema)                     
                        .option("multiline", True)
                        .load(source_path_sample_2)
                ).cache()


In [0]:
data_frame_sample_1.limit(5).display()

user_id,name,city,country
1,Isabella Martinez,San Diego,USA
2,Olivia Johnson,Philadelphia,USA
3,Mia Davis,Houston,USA
4,Isabella Miller,Philadelphia,USA
5,Evelyn Rodriguez,Dallas,USA


In [0]:
data_frame_sample_2.limit(5).display()

user_id,name,city,country
1,Emma Rodriguez,London,UK
2,Evelyn Davis,Edinburgh,UK
3,Evelyn Johnson,Manchester,UK
4,Amelia Smith,Glasgow,UK
5,Amelia Miller,Glasgow,UK


In [0]:
# Create one delta table with two partitions

partition_by = ["country", "city"]

delta_table = (DeltaTable.createIfNotExists()
               .addColumns(schema)
               .partitionedBy(partition_by)
               .location(target_path).execute())
delta_table

Out[14]: <delta.tables.DeltaTable at 0x7fe3300c62b0>

In [0]:
# merge 

target_table_alias = "target"
source_table_alias = "source"

country = "UK"

conditions = (f"{target_table_alias}.user_id = {source_table_alias}.user_id and \
               {target_table_alias}.city = {source_table_alias}.city and \
               {target_table_alias}.country = '{country}'")

print(conditions)

target.user_id = source.user_id and                target.city = source.city and                target.country = 'UK'


In [0]:
delta_merge_builder = (delta_table
                        .alias(target_table_alias)
                        .merge(
                            data_frame_sample_1.alias(source_table_alias),
                            condition = conditions
                        )
                        .whenMatchedUpdateAll()
                        .whenNotMatchedInsertAll()
                        .whenNotMatchedBySourceDelete()
                        )

delta_merge_builder.execute()

In [0]:
delta_table.toDF().limit(10).display()

user_id,name,city,country
3,Evelyn Johnson,Manchester,UK
15,Evelyn Martinez,Manchester,UK
23,Ava Jones,Manchester,UK
47,Olivia Johnson,Manchester,UK
56,Evelyn Johnson,Manchester,UK
68,Harper Miller,Manchester,UK
69,Olivia Johnson,Manchester,UK
71,Evelyn Rodriguez,Manchester,UK
91,Olivia Jones,Manchester,UK
106,Olivia Davis,Manchester,UK
