# ````Tech Talk````
<img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=200/>
###Addressing GDPR and CCPA Scenarios with Delta Lake and Apache Spark™

Delta Lake is a very effective tool for addressing GDPR and CCPA compliance requirements, because its structured data storage layer adds transactional capabilities to your data lake. 

In this demonstration, we will be reviewing:

- How to **convert your existing data to Delta**   
- How to delete and clean up personal information quickly and efficiently with **Delta's DELETE capabilities**
  - How to speed up DELETE FROM queries using **broadcasting**
  - How to speed up DELETE FROM queries using Databricks **Z-Ordering**
- **Audit History**
- **Vaccum** and Retention
- Recommendations on setting Efficient Pipelines
  - **Pseudonymization** of the Personal Identifiers
  - **Compaction** and **Optimize**
  
For any errors/questions about the notebook: Please contact vini@databricks.com

d
#### SETUP


To run this notebook, we have to [create a cluster](https://docs.databricks.com/clusters/create.html)
- Cluster used for demo: **Databricks Runtime 6.6 ** | **3 worker nodes** and a **driver** each of type **xlarge, 30 GB memory and 4 cores**

###Our Dataset
In the workflow described below, we reference a database gdpr containing a sample dataset with 65,000,000 rows and as many distinct customer IDs, amounting to 3.228 GB of data.

######CUSTOMER TABLE
The schema of the customers table is as below 
````
|-- c_customer_sk: integer (nullable = true)
|-- c_customer_id: string (nullable = true)
|-- c_current_cdemo_sk: integer (nullable = true)
|-- c_current_hdemo_sk: integer (nullable = true)
|-- c_current_addr_sk: integer (nullable = true)
|-- c_first_shipto_date_sk: integer (nullable = true)
|-- c_first_sales_date_sk: integer (nullable = true)
|-- c_salutation: string (nullable = true)
|-- c_first_name: string (nullable = true)
|-- c_last_name: string (nullable = true)
|-- c_preferred_cust_flag: string (nullable = true)
|-- c_birth_day: integer (nullable = true)
|-- c_birth_month: integer (nullable = true)
|-- c_birth_year: integer (nullable = true)
|-- c_birth_country: string (nullable = true)
|-- c_login: string (nullable = true)
|-- c_email_address: string (nullable = true)
|-- c_last_review_date: string (nullable = true)
````
######Customer REQUESTs
The keys to be deleted (customer_id) as a part of the customer request are in `gdpr.customer_delete_keys` table,  are roughly about 10% (337.615 MB) of the Customer table

````
|-- c_customer_sk: integer (nullable = true)
|-- c_customer_id: string (nullable = true)
````

#### ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 1. Convert tables to Delta format

In [0]:
%scala
spark.sql("CONVERT TO DELTA parquet.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5`")

In [0]:
%scala
val csv_data = "/databricks-datasets/flights/departuredelays.csv"
//"/databricks-datasets/COVID/coronavirusdataset/PatientInfo.csv" 
val homeDir = "/Users/vini.jaiswal@databricks.com/demo" 

val csvDF = (spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(csv_data))

csvDF.write.mode("overwrite").format("delta").save(homeDir + "/csv3-delta/")

#### ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 2. Delete from Delta Lake table

Three different approaches:

- Step 2a. Delete data using Delta DELETE statements without broadcasting
- Step 2b. Delete data using Delta DELETE statements with broadcasting
- Step 2c. Delete data using Delta DELETE statements, with broadcasting and Z-Ordering

#### Step 2a: Delete data using Delta `DELETE` statements

In [0]:
%sql
set spark.sql.autoBroadcastJoinThreshold = -1;

key,value
spark.sql.autoBroadcastJoinThreshold,-1


In [0]:
%scala
spark.sql("""
SELECT count(*)
FROM   delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` 
WHERE  c_customer_id IN (SELECT c_customer_id 
                         FROM   gdpr.customer_delete_keys)""").show()

In [0]:
%scala
spark.sql("""
DELETE FROM delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` AS t1
WHERE  EXISTS (SELECT c_customer_id 
               FROM   gdpr.customer_delete_keys
               WHERE  t1.c_customer_id = c_customer_id)""")

In [0]:
%scala
spark.sql("""
SELECT count(*)
FROM   delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` 
WHERE  c_customer_id IN (SELECT c_customer_id 
                         FROM   gdpr.customer_delete_keys)""").show()

**Note**: Because we were able to easily `DELETE` the data, the above value should be `0`.

In [0]:
%scala
val customer_t5 = "dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5"
val df = spark.read.format("delta").option("versionAsOf", "0").load("dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5")
df.write.format("delta").mode("overwrite").save(customer_t5)

#### Step 2b: Delete data using Delta `DELETE` statements, this time with broadcasting

In [0]:
%sql set spark.sql.autoBroadcastJoinThreshold = 1024000000;

key,value
spark.sql.autoBroadcastJoinThreshold,1024000000


In [0]:
%scala
spark.sql("""
SELECT count(*)
FROM   delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` 
WHERE  c_customer_id IN (SELECT c_customer_id 
                         FROM   gdpr.customer_delete_keys)""").show()

In [0]:
%scala
spark.sql("""DELETE 
FROM   delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` AS t1 
WHERE  EXISTS 
       ( 
              SELECT c_customer_id 
              FROM   gdpr.customer_delete_keys
              WHERE  t1.c_customer_id = c_customer_id)""")

#####From the two runs above; you can easily observe that being able to broadcast helps the delete process in determining which files are to be rewritten.

In [0]:
%scala
val df = spark.read.format("delta").option("versionAsOf", "0").load("dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5")
df.write.format("delta").mode("overwrite").save(customer_t5)

#### Step 2c: Delete data using Delta `DELETE` statements, with broadcasting and Z-Ordering

In [0]:
%scala
spark.sql("optimize delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` zorder by c_customer_id")

In [0]:
%scala
spark.sql("select count(1) from delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` where c_customer_id in (select c_customer_id from gdpr.customer_delete_keys)").show()

In [0]:
%scala
spark.sql("""DELETE 
FROM   delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5` AS t1 
WHERE  EXISTS 
       ( 
              SELECT c_customer_id 
              FROM   gdpr.customer_delete_keys
              WHERE  t1.c_customer_id = c_customer_id)""")

In [0]:
%scala
val df = spark.read.format("delta").option("versionAsOf", "0").load("dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5")
df.write.format("delta").mode("overwrite").save(customer_t5)

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Audit Transactional logs with Delta Lake

In [0]:
%scala
display(spark.sql("DESCRIBE HISTORY delta.`dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t2`"))

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
21,2020-06-18T03:28:05.000+0000,100708,vini.jaiswal@databricks.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""c_customer_id""], batchId -> 0, auto -> false)",,List(6718320),0618-012132-spiky36,20,SnapshotIsolation,False,"Map(numRemovedFiles -> 208, numRemovedBytes -> 3194861916, p25FileSize -> 540468009, minFileSize -> 461126371, numAddedFiles -> 6, maxFileSize -> 542929260, p75FileSize -> 542859315, p50FileSize -> 541858629, numAddedBytes -> 3170428305)",,
20,2020-06-18T03:21:57.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0618-012132-spiky36,19,WriteSerializable,False,"Map(numRemovedFiles -> 23, numDeletedRows -> 6500000, numAddedFiles -> 200, numCopiedRows -> 42250002)",,
19,2020-05-07T16:48:32.000+0000,100708,vini.jaiswal@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(6718320),0506-044457-pump2,18,WriteSerializable,False,"Map(numFiles -> 31, numOutputBytes -> 3453647006, numOutputRows -> 65000000, numParts -> 0)",,
18,2020-05-07T16:47:05.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0506-044457-pump2,17,WriteSerializable,False,"Map(numTotalRows -> 65000000, numFiles -> 26, numRemovedFiles -> 9, numCopiedRows -> 58500000, numDeletedRows -> 6500000, numOutputRows -> 57311759, numParts -> 0, numOutputBytes -> 3041064134, numAddedFiles -> 26)",,
17,2020-05-07T16:45:04.000+0000,100708,vini.jaiswal@databricks.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""c_customer_id""], batchId -> 0, auto -> false)",,List(6718320),0506-044457-pump2,16,SnapshotIsolation,False,"Map(numFiles -> 9, numRemovedFiles -> 31, numRemovedBytes -> 3453647007, p25FileSize -> 117798102, minFileSize -> 63127260, numOutputRows -> 65000000, numParts -> 0, numOutputBytes -> 3448893504, numAddedFiles -> 9, maxFileSize -> 530630703, p75FileSize -> 530592677, p50FileSize -> 530560834, numAddedBytes -> 3448893505)",,
16,2020-05-07T16:39:09.000+0000,100708,vini.jaiswal@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(6718320),0506-044457-pump2,15,WriteSerializable,False,"Map(numFiles -> 31, numOutputBytes -> 3453647006, numOutputRows -> 65000000, numParts -> 0)",,
15,2020-05-07T16:37:39.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0506-044457-pump2,14,WriteSerializable,False,"Map(numTotalRows -> 65000000, numFiles -> 26, numRemovedFiles -> 9, numCopiedRows -> 58500000, numDeletedRows -> 6500000, numOutputRows -> 57314981, numParts -> 0, numOutputBytes -> 3041396932, numAddedFiles -> 26)",,
14,2020-05-07T16:35:35.000+0000,100708,vini.jaiswal@databricks.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""c_customer_id""], batchId -> 0, auto -> false)",,List(6718320),0506-044457-pump2,13,SnapshotIsolation,False,"Map(numFiles -> 9, numRemovedFiles -> 31, numRemovedBytes -> 3453647007, p25FileSize -> 136690928, minFileSize -> 62942854, numOutputRows -> 65000000, numParts -> 0, numOutputBytes -> 3449127978, numAddedFiles -> 9, maxFileSize -> 530668371, p75FileSize -> 530603594, p50FileSize -> 530562862, numAddedBytes -> 3449127979)",,
13,2020-05-07T16:31:22.000+0000,100708,vini.jaiswal@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(6718320),0506-044457-pump2,12,WriteSerializable,False,"Map(numFiles -> 31, numOutputBytes -> 3453647006, numOutputRows -> 65000000, numParts -> 0)",,
12,2020-05-07T16:29:48.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0506-044457-pump2,11,WriteSerializable,False,"Map(numTotalRows -> 65000000, numFiles -> 23, numRemovedFiles -> 31, numCopiedRows -> 58500000, numDeletedRows -> 6500000, numOutputRows -> 42250002, numParts -> 0, numOutputBytes -> 2245108004, numAddedFiles -> 23)",,


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 4. Vacuum Delta Lake tables

By default, `vacuum()` retains all the data needed for the last 7 days. For this example, since this table does not have 7 days worth of history, we will retain 0 hours, which means to only keep the latest state of the table.

In [0]:
%scala
//setting up for vacuum
import io.delta.tables._
val delta_path = "dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5"

val deltaTable = DeltaTable.forPath(spark, delta_path)
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
21,2020-06-18T03:28:05.000+0000,100708,vini.jaiswal@databricks.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""c_customer_id""], batchId -> 0, auto -> false)",,List(6718320),0618-012132-spiky36,20,SnapshotIsolation,False,"Map(numRemovedFiles -> 208, numRemovedBytes -> 3194861916, p25FileSize -> 540468009, minFileSize -> 461126371, numAddedFiles -> 6, maxFileSize -> 542929260, p75FileSize -> 542859315, p50FileSize -> 541858629, numAddedBytes -> 3170428305)"
20,2020-06-18T03:21:57.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0618-012132-spiky36,19,WriteSerializable,False,"Map(numRemovedFiles -> 23, numDeletedRows -> 6500000, numAddedFiles -> 200, numCopiedRows -> 42250002)"
19,2020-05-07T16:48:32.000+0000,100708,vini.jaiswal@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(6718320),0506-044457-pump2,18,WriteSerializable,False,"Map(numFiles -> 31, numOutputBytes -> 3453647006, numOutputRows -> 65000000, numParts -> 0)"
18,2020-05-07T16:47:05.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0506-044457-pump2,17,WriteSerializable,False,"Map(numTotalRows -> 65000000, numFiles -> 26, numRemovedFiles -> 9, numCopiedRows -> 58500000, numDeletedRows -> 6500000, numOutputRows -> 57311759, numParts -> 0, numOutputBytes -> 3041064134, numAddedFiles -> 26)"
17,2020-05-07T16:45:04.000+0000,100708,vini.jaiswal@databricks.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""c_customer_id""], batchId -> 0, auto -> false)",,List(6718320),0506-044457-pump2,16,SnapshotIsolation,False,"Map(numFiles -> 9, numRemovedFiles -> 31, numRemovedBytes -> 3453647007, p25FileSize -> 117798102, minFileSize -> 63127260, numOutputRows -> 65000000, numParts -> 0, numOutputBytes -> 3448893504, numAddedFiles -> 9, maxFileSize -> 530630703, p75FileSize -> 530592677, p50FileSize -> 530560834, numAddedBytes -> 3448893505)"
16,2020-05-07T16:39:09.000+0000,100708,vini.jaiswal@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(6718320),0506-044457-pump2,15,WriteSerializable,False,"Map(numFiles -> 31, numOutputBytes -> 3453647006, numOutputRows -> 65000000, numParts -> 0)"
15,2020-05-07T16:37:39.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0506-044457-pump2,14,WriteSerializable,False,"Map(numTotalRows -> 65000000, numFiles -> 26, numRemovedFiles -> 9, numCopiedRows -> 58500000, numDeletedRows -> 6500000, numOutputRows -> 57314981, numParts -> 0, numOutputBytes -> 3041396932, numAddedFiles -> 26)"
14,2020-05-07T16:35:35.000+0000,100708,vini.jaiswal@databricks.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""c_customer_id""], batchId -> 0, auto -> false)",,List(6718320),0506-044457-pump2,13,SnapshotIsolation,False,"Map(numFiles -> 9, numRemovedFiles -> 31, numRemovedBytes -> 3453647007, p25FileSize -> 136690928, minFileSize -> 62942854, numOutputRows -> 65000000, numParts -> 0, numOutputBytes -> 3449127978, numAddedFiles -> 9, maxFileSize -> 530668371, p75FileSize -> 530603594, p50FileSize -> 530562862, numAddedBytes -> 3449127979)"
13,2020-05-07T16:31:22.000+0000,100708,vini.jaiswal@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(6718320),0506-044457-pump2,12,WriteSerializable,False,"Map(numFiles -> 31, numOutputBytes -> 3453647006, numOutputRows -> 65000000, numParts -> 0)"
12,2020-05-07T16:29:48.000+0000,100708,vini.jaiswal@databricks.com,DELETE,"Map(predicate -> [""exists(t1.`c_customer_id`)""])",,List(6718320),0506-044457-pump2,11,WriteSerializable,False,"Map(numTotalRows -> 65000000, numFiles -> 23, numRemovedFiles -> 31, numCopiedRows -> 58500000, numDeletedRows -> 6500000, numOutputRows -> 42250002, numParts -> 0, numOutputBytes -> 2245108004, numAddedFiles -> 23)"


In [0]:
%scala
//performing vacuum
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")
deltaTable.vacuum(retentionHours = 0)

In [0]:
%scala
val df = spark.read.format("delta").option("versionAsOf", "0").load("dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5")
df.write.format("delta").mode("overwrite").save("dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t5")

#####RECOMMENDATION: Setting Retention policy with cloud provider as well

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 5. Other Recommendations

###5A. Improve query performance

In [0]:
%scala
val path = "dbfs:/Users/vini.jaiswal@databricks.com/demo/customer_t6"
val dest_path = "/tmp/vini/customer_comp"
val numFiles = 16

spark.read
 .format("delta")
 .load(path)
 .repartition(numFiles)
 .write
// .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .save("/tmp/vini/customer_comp")


#####AUTO OPTIMIZE


Auto Optimize consists of two complementary features: Optimized Writes and Auto Compaction.
1. Optimized Writes
2. Auto Compaction

--- Available in Databricks runtime version 5.5 or above

#####USAGE:
To ensure all new Delta tables have these features enabled, set the SQL configuration:

`````spark.sql("set spark.databricks.delta.autoCompact.enabled = true")`````
`````spark.sql("set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")`````

In [0]:
%sql
ALTER TABLE delta.`/tmp/vini/customer_comp` SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)

In [0]:
%scala
spark.sql("select count(1) from delta.`/tmp/vini/customer_comp` where c_customer_id in (select c_customer_id from gdpr.customer_delete_keys)").show()

In [0]:
%scala
spark.sql("""DELETE 
FROM   delta.`/tmp/vini/customer_comp` AS t1 
WHERE  EXISTS 
       ( 
              SELECT c_customer_id 
              FROM   gdpr.customer_delete_keys
              WHERE  t1.c_customer_id = c_customer_id)""")

###5B. Pseudonymize data

Your personal data is any information that can be used to directly or indirectly identify you. 

In our pseudonymization scenario, we create a gdpr.customers_lookup  table that contains the real email address and an additional column for a pseudonymized email address.  Now, we can use the pseudo email address in the rest of the data.

When there is a request to forget this information, we can simply delete information from the `gdpr.customers_lookup` table and the rest of the information can remain non-identifiable forever.  No further deletes are needed in any other dataset because when the link mapping is deleted, the data may become anonymized already. 

<a href="https://demo.cloud.databricks.com/#notebook/6758307/command/6763615">Pseudonymization Notebook</a>

###Conclusion

We covered how you can use Delta best practices to solve the requests from GDPR and CCPA.

- **Z ordering** the data on the keys of delete will help with speeding up identifying and rewriting the impacted files
- If you delete statement is predicated like colName='xyz' Delta's internal bloom filters will help filter out irrelevant files
- Ensure the key columns are a part of the first 32 columns in a table, Delta collects stats on the first 32 columns of a table, and these stats help with identifying relevant files for delete operation
- If you can reduce the size of the source **(BroadcastHashJoin)**, you can leverage dynamic file pruning in determining relevant files for deletes. 
- With use of **Auto Optimize**, Databricks dynamically optimizes Apache Spark partition sizes 
- Use of **Vacuum** operation and **retention policies** to comply with the requests
- Use of **Pseudonymization techniques** to unidentify any information that can be used to directly or indirectly identify.

###References

#####To learn more about Delta Lake on Databricks, see <a href="http://docs.databricks.com/delta/index.html">Delta Lake</a>.

#####For blogs about using Delta Lake for GDPR and CCPA compliance written by Databricks experts, see:
- How to Avoid Drowning in GDPR Data Subject Requests in a Data Lake
- Make Your Data Lake CCPA Compliant with a Unified Approach to Data and Analytics
- Efficient Upserts into Data Lakes with Databricks Delta
- To learn about purging personal information in the Databricks workspace, see Manage Workspace Storage.

#####Other References

- <a href="https://docs.databricks.com/security/privacy/gdpr-delta.html">Best Practice Guide</a>
- <a href="https://github.com/databricks/tech-talks">Tech Talk Repo</a>
- <a href="https://www.youtube.com/channel/UC3q8O3Bh2Le8Rj1-Q-_UUbA">Upcoming Session</a>
- <a href="https://databricks.com/blog/2021/06/22/get-your-free-copy-of-delta-lake-the-definitive-guide-early-release.html">Delta Lake: The Definitive Guide</a>