In [1]:
import base64
import google.auth

from google.cloud import kms_v1
from google.cloud import dlp_v2
from google.oauth2 import service_account

PLAIN_TEXT_KMS_KEY = b"dataservicesasia" # 16, 32 or 64 bits
PROJECT_ID = "data-services-asia-dev"
KMS_LOC = "global"
KMS_RING_NAME = "loancare-dlp-ring"
KMS_KEY_NAME = "loancare-dlp"
KMS_KEY_VERSION = "1"

ImportError: cannot import name 'kms_v1' from 'google.cloud' (unknown location)

In [2]:
class GoogleServices(object):
    """
    Handling Google client services 
    """
    def __init__(self, project_id, crendential_path=None, credential_json=None):
        self.project_id = project_id
        self.set_credential(credential=crendential_path, credential_json=credential_json)
        
    def get_project_id(self):
        return self.project_id
        
    def set_credential(self, credential=None, credential_json=None):
        if credential:
            credentials = service_account.Credentials.from_service_account_file(credential)
        elif credential_json:
            credentials = service_account.Credentials.from_service_account_info(credential_json)
        else:
            credentials, project = google.auth.default()
        self.credential = credentials

    def dlp(self):
        dlp_client = dlp_v2.DlpServiceClient(credentials=self.credential)
        return dlp_client
    
    def kms(self):
        kms_client = kms_v1.KeyManagementServiceClient(credentials=self.credential)
        return kms_client

In [3]:
g_services = GoogleServices(PROJECT_ID, crendential_path="./dpl-api-data-services-asia-dev.json")
kms_client = g_services.kms()
dlp_client = g_services.dlp()

In [4]:
kms_name = kms_client.crypto_key_path_path(PROJECT_ID, KMS_LOC, KMS_RING_NAME, KMS_KEY_NAME)
kms_response = kms_client.encrypt(kms_name, PLAIN_TEXT_KMS_KEY)
kms_wrapped_key = kms_response.ciphertext
wrapped_key = kms_wrapped_key # base64.b64decode(kms_wrapped_key)

In [5]:
# De-identify using Cryto Deterministic
def deidentify_crypto(dlp_client, project_id=None, crypto_key=None, surrogate_type=None, info_types=None):
    
    # Construct Crypto Deterministic configuration dictionary
    crypto_deterministic_config = {
        "crypto_key":{
            "unwrapped":{
              "key": crypto_key
            }
        }
    }

    # Add surrogate type
    if surrogate_type:
        crypto_deterministic_config['surrogate_info_type'] = {
            'name': surrogate_type
        }

    # Construct inspect configuration dictionary
    inspect_config = {
        'info_types': [{'name': info_type} for info_type in info_types]
    }

    # Construct deidentify configuration dictionary
    deidentify_config = {
        'info_type_transformations': {
            'transformations': [
                {
                    'primitive_transformation': {
                        'crypto_deterministic_config':
                            crypto_deterministic_config
                    }
                }
            ]
        }
    }

    item = {'value': value}

    # Call the API
    parent = dlp_client.project_path(project_id)
    response = dlp_client.deidentify_content(
        parent, inspect_config=inspect_config,
        deidentify_config=deidentify_config, item=item)

    return response.item.value

In [13]:
# variables for Crypto Deterministic 
reiden_surrogate_type = "EMAIL_ADDRESS"
info_types = [ "EMAIL_ADDRESS" ]
value = "Hello there, here is my email: nazmiasri95@gmail.com"

crypto_key = base64.b64encode(b"dataservicesasia")

In [14]:
enc_text = deidentify_crypto(dlp_client, PROJECT_ID, crypto_key, reiden_surrogate_type, info_types)
print(f"Encrypted text: {enc_text}")

Encrypted text: Hello there, here is my email: EMAIL_ADDRESS(52):ASZqwkn/vTWEbVQBvEHoHTPQ5Ag/GhJGyTOWnTgKupljrxYANP4=


In [8]:
# Construct Crypto Deterministic configuration dictionary
reiden_crypto_deterministic_config = {
    "crypto_key":{
        "unwrapped":{
          "key": crypto_key
        }
    }
}

# Add surrogate type
if reiden_surrogate_type:
    reiden_crypto_deterministic_config['surrogate_info_type'] = {
        'name': reiden_surrogate_type
    }

# Construct inspect configuration dictionary
reiden_inspect_config = {
    'info_types': [{'name': info_type} for info_type in info_types]
}

# Construct reidentify configuration dictionary
reidentify_config = {
    'info_type_transformations': {
        'transformations': [
            {
                'primitive_transformation': {
                    'crypto_deterministic_config':
                        reiden_crypto_deterministic_config
                },
                'info_types': [{'name': info_type} for info_type in info_types]
            }
        ]
    }
}

item = {'value': enc_text}

# Call the API
parent = dlp_client.project_path(PROJECT_ID)
response_reiden = dlp_client.reidentify_content(
    parent, inspect_config=reiden_inspect_config,
    reidentify_config=reidentify_config, 
    item=item)

print(response_reiden)

item {
  value: "Hello there, here is my email: EMAIL_ADDRESS(52):ASZqwkn/vTWEbVQBvEHoHTPQ5Ag/GhJGyTOWnTgKupljrxYANP4="
}
overview {
}



In [9]:
# # variables for FPE
# alphabet = "ALPHA_NUMERIC"

# # Construct FPE configuration dictionary
# crypto_replace_ffx_fpe_config = {
#     'crypto_key': {
#         'kms_wrapped': {
#             'wrapped_key': wrapped_key,
#             'crypto_key_name': kms_name
#         }
#     },
#     'common_alphabet': alphabet
# }

# # Add surrogate type
# if surrogate_type:
#     crypto_replace_ffx_fpe_config['surrogate_info_type'] = {
#         'name': surrogate_type
#     }

# # Construct inspect configuration dictionary
# inspect_config = {
#     'info_types': [{'name': info_type} for info_type in info_types]
# }

# # Construct deidentify configuration dictionary
# deidentify_config = {
#     'info_type_transformations': {
#         'transformations': [
#             {
#                 'primitive_transformation': {
#                     'crypto_replace_ffx_fpe_config':
#                         crypto_replace_ffx_fpe_config
#                 }
#             }
#         ]
#     }
# }

# # Convert string to item
# item = {'value': value}

# # Call the API
# parent = dlp_client.project_path(PROJECT_ID)
# response = dlp_client.deidentify_content(
#     parent, inspect_config=inspect_config,
#     deidentify_config=deidentify_config, item=item)

# # Print results
# print(response)