In [None]:
%pip install "aws-encryption-sdk[mpl]>=3.3.0"

In [None]:
hl7_vol_path = "/Volumes/catalog_demos/bronze_landing/volume_encrypted_nodejs_hl7_data"


In [None]:
import dlt
from pyspark.sql.functions import udf, col
import base64
import boto3
import botocore.session
import aws_encryption_sdk
from aws_encryption_sdk.identifiers import CommitmentPolicy

In [None]:
client = None
master_key_provider = None

In [None]:
# @udf(returnType="string")
# def decrypt_text(encrypted_text_base64: str, region_name: str = 'us-west-2') -> str:
#     """
#     Decrypts text using AWS KMS.

#     Args:
#         encrypted_text_base64 (str): The base64 encoded encrypted text.
#         region_name (str): The AWS region name. Defaults to 'us-west-2'.

#     Returns:
#         str: Decrypted plaintext.
#     """
#     from databricks.service_credentials import getServiceCredentialsProvider

#     global client
#     if client is None:
#         creds_provider = getServiceCredentialsProvider("service_credential_kms_access")
        
#         # Initialize boto3 session using the provider
#         session = boto3.Session(
#             botocore_session=creds_provider,
#             region_name=region_name,
#         )
#         client = session.client("kms")

#     ciphertext_blob = base64.b64decode(encrypted_text_base64)
#     response = client.decrypt(
#         CiphertextBlob=ciphertext_blob
#     )
#     return response['Plaintext'].decode('utf-8')

In [None]:
@udf(returnType="string")
def decrypt_node_data(base64_ciphertext, region_name: str = 'us-west-2'):
    from databricks.service_credentials import getServiceCredentialsProvider

    global master_key_provider
    global client
    if master_key_provider is None:
        creds_provider = getServiceCredentialsProvider("service_credential_kms_access")
        session = boto3.Session(
            botocore_session=creds_provider,
            region_name=region_name,
        )
        # Initialize boto3 session using the provider
        # session = boto3.Session(
        #     botocore_session=creds_provider,
        #     region_name=region_name,
        # )


        key_arn = "arn:aws:kms:us-west-2:959790660301:key/2e0fbc91-25ea-4a37-b5f5-ff8c73700c53"
        kms_kwargs = dict(
            key_ids=[key_arn],
            botocore_session=session._session,
        )
        master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)
        client = aws_encryption_sdk.EncryptionSDKClient(
            commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
        )

    ciphertext_bytes = base64.b64decode(base64_ciphertext)
    # Use REQUIRE_ENCRYPT_REQUIRE_DECRYPT for the latest SDK versions
    
    decrypted_plaintext, header = client.decrypt(
        source=ciphertext_bytes,
        key_provider=master_key_provider
    )

    return decrypted_plaintext.decode('utf-8')

In [None]:
bronze_schema = spark.conf.get("bronze_schema")
silver_phi_schema = spark.conf.get("silver_phi_schema")

In [None]:
@dlt.table(comment="Raw table for HL7 messages", name=f"{bronze_schema}.raw_siu")
def raw_siu():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "text")
      .option("wholeText", "true")
      .option("inferSchema", "false")
      .load(hl7_vol_path)
)

In [None]:
@dlt.table(comment="Decrypted HL7 message", name=f"{silver_phi_schema}.cleaned_siu")
def cleaned_siu():
  encrypted_raw_siu = spark.read.table(f"{bronze_schema}.raw_siu")
  cleaned_siu = encrypted_raw_siu.withColumn("decrypted_value", decrypt_node_data("value"))
  return cleaned_siu