# Tokenization Demo


## Init Spark

In [1]:
import findspark
findspark.init(spark_home=r'/home/mcunhal/spark')
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Tokenizer")\
    .config("spark.sql.warehouse.dir", '/home/mcunhal/demo_tokenization/')\
    .enableHiveSupport()\
    .master('spark://192.168.1.229:7077')\
    .getOrCreate()

In [2]:
spark.sql("CREATE DATABASE IF NOT EXISTS demo_tokenization")


DataFrame[]

In [3]:
spark.sql(r"CREATE TABLE IF NOT EXISTS demo_tokenization.RAW_DATA(\
    iban STRING,\
    nino STRING,\
    first_name STRING,\
    last_name STRING,\
    email STRING,\
    gender STRING,\
    ip_address STRING,\
    post_code STRING,\
    city STRING,\
    country STRING,\
    balance DOUBLE,\
    created_on STRING\
)\
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\
STORED AS TEXTFILE\
")

DataFrame[]

## Define functions

In [4]:
import numpy as np
import pandas as pd
import base64
import hashlib
from Crypto.Cipher import AES
from pyspark.sql.types import *
from pyspark.sql.functions import lit
import re

In [20]:
def pad(s):
    return s + (32 - len(s) % 32) * chr(32 - len(s) % 32)

def unpad(s):
    return s[:-ord(s[len(s)-1:])]

def generate_key(seed):
    return hashlib.sha256(seed.encode()).digest()

def generate_key_string(seed):
    key = generate_key(seed)
    return base64.b64encode(key).decode('utf-8')

def encrypt(key, raw):
    raw = pad(raw)
    key = generate_key(key)
    iv = b'h\r\xef\x10\x8e\x1e\x8e\xd08\xb7iW\x93\xea-\xd2'
    cipher = AES.new(key, AES.MODE_CBC, iv)
    return base64.b64encode(cipher.encrypt(raw)).decode('utf-8')

def decrypt(key ,enc):
    enc = base64.b64decode(enc.encode('utf-8'))
    iv = b'h\r\xef\x10\x8e\x1e\x8e\xd08\xb7iW\x93\xea-\xd2'
    key = base64.b64decode(key.encode('utf-8'))
    cipher = AES.new(key, AES.MODE_CBC, iv)
    return unpad(cipher.decrypt(enc)).decode('utf-8')


def postcode_trim(data, depth=1):
    regex = re.compile('^([a-zA-Z]{1,})([0-9]{1,2}) ?(?:([0-9])([a-zA-Z])([a-zA-Z]))?$')
    if re.match(regex,data):
        parsed_postcode = re.split(regex,data)
        return_postcode = ''.join([('' if i+1 !=3 else ' ') + parsed_postcode[i+1] for i in range(depth)])
       
        return return_postcode
    else:
        return None    
def encode_number(data):
    return data*97.5 +100.42

def decode_number(data):
    return (data-100.42)/97.5

def bin_number(data,bins,min_bin,max_bin):
    bins = np.linspace(min_bin, max_bin, bins)
    
    binned = np.digitize(data,bins,right=True)
    
    if binned == len(bins):
        return max_bin
    else:
        return float(bins[binned])

def apply_encoding(encode_type,data,*args,):
    if encode_type == 'tokenize':
        if args:
            key=args[0]
        else:
            key='adasdadnfdm 4po435lkdrjkwera'
        return encrypt(key,data)
    
    if encode_type == 'hash':  
        return generate_key_string(data)  
    
    if encode_type == 'postcode':        
        if args:
            depth = args[0]
        else:
            depth = 1        
        return postcode_trim(data,depth)
    
    if encode_type == 'number':
        return encode_number(data)
    
    if encode_type == 'bin_number':
        '''
        args[0] -> bin size
        args[1] -> min value
        args[2] -> max value
        '''
        if len(args) < 3:
            raise ValueError('Number of bins, min and max value need to be provided')
        else:
            return bin_number(data,args[0],args[1],args[2])
        
    raise ValueError("Encode type {} doesn't exist".format(encode_type))
        
def decode(encode_type,data,*args):
    if encode_type == 'tokenize':
        if args:
            key=args[0]
        else:
            key=generate_key_string('adasdadnfdm 4po435lkdrjkwera')
        return decrypt(key,data)

    if encode_type == 'number':
        return decode_number(data)  

### Encode Types:
 - tokenize:
  - Tokenizes a string using a predefined key or by using a submitted key;
  - Based on AES;
  - This tokenization is reversible;
 - hash:
  - Hashes a string using sha256;
  - Non reversible;
 - encode_number:
  - Tokenizes a number by using a predefined function;
  - Function is monotonic, thus enabling some comparisons;
  - Reversible;
  - Easy to crack;
 - bin_number;
  - Outputs a 'binned' value based on min,max and number of bins;
 - postcode_trim:
  - Trims a postcode, based on a a depth;


### Non reversible tokenization

In [32]:
print(apply_encoding('bin_number',32,20,-100,100))
print(apply_encoding('postcode','E13 9NJ',3))
print(apply_encoding('hash','1asd23'))

36.84210526315792
E13 9
VQD/l/UhfjqP+ge/ZukG8Dvtd05Ozy01yI8O9RB9I0g=


### Reversible tokenization

In [33]:
key_value = apply_encoding('hash','basdasdbasd')
print(key_value)
encoded_value = apply_encoding('tokenize','Miguel','basdasdbasd')
print(encoded_value)
print(decode('tokenize',encoded_value,key_value))

xy0CK7dTuqmhvWFkW8u9ibM55Al0SrP17kDHRDxNuTU=
K83JCF7nBiiybG8HUpWsuqA1ym4IlKER3D/w3fjM7w4=
Miguel


In [35]:
encoded_value = apply_encoding('tokenize','Miguel')
print(encoded_value)
print(decode('tokenize',encoded_value))

AgTaWqOdWc+iA0G/CkWBf3ahJ78oxSTUTAJueHP2J/M=
Miguel


In [36]:
encoded_value = apply_encoding('number',1234)
print(encoded_value)
print(decode('number',encoded_value))

120415.42
1234.0


### Registring the functions in spark

In [37]:
udf_apply_encoding = spark.udf.register("apply_encoding", apply_encoding)
udf_decode = spark.udf.register("decode", decode)

## Reading the Data

In [38]:
df = spark.table("demo_tokenization.raw_data")

In [39]:
df.count()

0

### Testing the functions are working in spark

In [43]:
df.select(udf_apply_encoding(lit("tokenize"),"iban","nino")).show()

+------------------------------------+
|apply_encoding(tokenize, iban, nino)|
+------------------------------------+
|                jCEzkLhLZxLQYssIo...|
|                koFe1SmwoIOwdaf/M...|
|                sv0sqcHxk58vspWOj...|
|                DFjmcBbuU6/HEgW5P...|
|                z38+Goe3/K1UQxzJ9...|
|                UZnrajyjBkVoqqiv9...|
|                uSq3furfsZEqF+rDF...|
|                hMrjccPTmBI4gGsJE...|
|                5CYNieeFU4f+4H3c1...|
|                q6SQvmYa9FKuG7SpM...|
|                yTmoYS1kVsQKWGnsy...|
|                pADhr2l6keXZ/D9Qc...|
|                S0LggZ9rzyi4LCq2Y...|
|                VC3HqdPqKBvIrXVJ3...|
|                dgNKye3l/Q26A4Ibi...|
|                8K4PZTQzI8KRQ5yc0...|
|                UAfLnPFd4gWlGSn1m...|
|                YNMpaOk6eD5WcreFj...|
|                FxF4Jf/kCLYZnuDfu...|
|                jPmDe1xX/eH866ure...|
+------------------------------------+
only showing top 20 rows



In [12]:
df.select(udf_apply_encoding(lit("bin_number"),"balance",lit(40),lit(-100000),lit(10000))).show()

+-------------------------------------------------------+
|apply_encoding(bin_number, balance, 40, -100000, 10000)|
+-------------------------------------------------------+
|                                                  10000|
|                                                10000.0|
|                                                  10000|
|                                                10000.0|
|                                                10000.0|
|                                                  10000|
|                                                10000.0|
|                                                10000.0|
|                                                10000.0|
|                                                10000.0|
|                                                10000.0|
|                                                10000.0|
|                                                  10000|
|                                                10000.0|
|             

In [44]:
df.select(udf_apply_encoding(lit("postcode"),"post_code",lit(1)),"post_code").show()

+--------------------------------------+---------+
|apply_encoding(postcode, post_code, 1)|post_code|
+--------------------------------------+---------+
|                                    DL|     DL10|
|                                     S|       S1|
|                                  null|      W1F|
|                                     L|      L33|
|                                    LN|      LN6|
|                                    LE|     LE14|
|                                    LE|     LE15|
|                                    NN|      NN4|
|                                    DN|     DN36|
|                                  null|     EC1V|
|                                    CT|     CT16|
|                                    NE|     NE46|
|                                  null|     WC1B|
|                                  null|     WC1B|
|                                    LS|      LS6|
|                                    BS|     BS14|
|                              

In [14]:
df.select(udf_apply_encoding(lit("number"),"balance")).show()

+-------------------------------+
|apply_encoding(number, balance)|
+-------------------------------+
|                     1239725.17|
|                      871701.67|
|                     1146037.42|
|                      930162.67|
|                      757002.67|
|                     1137018.67|
|                      848828.17|
|              885965.9199999999|
|              942320.9199999999|
|                      786632.92|
|                      791624.92|
|                      824014.42|
|                     1187747.92|
|                      772163.92|
|                      596654.17|
|                     1084466.17|
|                      485845.42|
|              938323.4199999999|
|                     1146729.67|
|                     1206545.92|
+-------------------------------+
only showing top 20 rows



## Creating the output table

In [62]:
df_tok = df\
    .withColumn("key",udf_apply_encoding(lit("hash"),"nino"))\
    .withColumn("iban",udf_apply_encoding(lit("tokenize"),"first_name","nino"))\
    .withColumn("first_name",udf_apply_encoding(lit("tokenize"),"first_name","nino"))\
    .withColumn("last_name",udf_apply_encoding(lit("tokenize"),"last_name","nino"))\
    .withColumn("email",udf_apply_encoding(lit("tokenize"),"email","nino"))\
    .withColumn("gender",udf_apply_encoding(lit("tokenize"),"gender","nino"))\
    .withColumn("ip_address",udf_apply_encoding(lit("tokenize"),"ip_address","nino"))\
    .withColumn("post_code_unique",udf_apply_encoding(lit("tokenize"),"post_code","nino"))\
    .withColumn("post_code",udf_apply_encoding(lit("tokenize"),"post_code"))\
    .withColumn("balance",udf_apply_encoding(lit("number"),"balance"))\
    .withColumn("bin_balance",udf_apply_encoding(lit("bin_number"),"balance",lit(1001),lit(-10000),lit(100000)))\
    .withColumn("nino",udf_apply_encoding(lit("tokenize"),"nino","nino"))\

In [63]:
df_tok.write.saveAsTable('demo_tokenization.tokenized_data',format="parquet",mode="append")


In [15]:
spark.sql("Truncate table demo_tokenization.raw_data")

DataFrame[]

### Tokenization Showcase

In [64]:
spark.sql("select post_code,count(*) from demo_tokenization.tokenized_data group by post_code order by count(*) desc limit 10").show()

+--------------------+--------+
|           post_code|count(1)|
+--------------------+--------+
|gP+EBWypryqgXUULi...|     149|
|TChH2w4KBeQUAnrs2...|     143|
|yeECKNCaiEIQMWlDq...|     125|
|Ewldi3upBOmCaULBF...|      91|
|/yuAkv5UWD4VG7tUg...|      87|
|5IiSPFWRQ88E4OG5+...|      83|
|ddssSe9WlWdv9/RkB...|      83|
|3DX9tKZO4yEjrH9ru...|      82|
|g76CrtbUh1czU7ES/...|      82|
|Z3sW3uM9GAnGTtohp...|      79|
+--------------------+--------+



In [65]:
spark.sql("select post_code_unique,count(*) from demo_tokenization.tokenized_data group by post_code_unique order by count(*) desc limit 10").show()

+--------------------+--------+
|    post_code_unique|count(1)|
+--------------------+--------+
|8c3+A4KmqQO5zD8R7...|       1|
|H7O8mk7V+vlWK9e8n...|       1|
|ANOrDdtTqNcg/5+64...|       1|
|Yc6Hje25md0h0tMd0...|       1|
|9YziBI0ZnzCiDi+h4...|       1|
|4En302e6Fc5H+lzxD...|       1|
|whqC3k87aPYOYJd3O...|       1|
|GdNMxHF5Lrl+o2Z1n...|       1|
|iP7H0m0j7U6aagx1L...|       1|
|UHkWqijtqIxHC9bbi...|       1|
+--------------------+--------+



In [66]:
spark.sql("select decode('tokenize',post_code),count(*) from demo_tokenization.tokenized_data group by decode('tokenize',post_code) order by count(*) desc limit 10").show()

+---------------------------+--------+
|decode(tokenize, post_code)|count(1)|
+---------------------------+--------+
|                       CV35|     149|
|                       LE15|     143|
|                       BD23|     125|
|                       CH48|      91|
|                       RG20|      87|
|                       SW1E|      83|
|                       EC1V|      83|
|                        M34|      82|
|                        LS9|      82|
|                       OX12|      79|
+---------------------------+--------+



In [67]:
spark.sql("select decode('tokenize',post_code_unique,key),count(*) from demo_tokenization.tokenized_data group by 1 order by count(*) desc limit 10").show()

+---------------------------------------+--------+
|decode(tokenize, post_code_unique, key)|count(1)|
+---------------------------------------+--------+
|                                   CV35|     149|
|                                   LE15|     143|
|                                   BD23|     125|
|                                   CH48|      91|
|                                   RG20|      87|
|                                   SW1E|      83|
|                                   EC1V|      83|
|                                    M34|      82|
|                                    LS9|      82|
|                                   OX12|      79|
+---------------------------------------+--------+



## Old reader

In [None]:
customschema = StructType([StructField('iban', StringType(), True),
                    StructField('nino', StringType(), True),
                    StructField('first_name', StringType(), True),
                    StructField('last_name', StringType(), True),
                    StructField('email', StringType(), True),
                    StructField('gender', StringType(), True),
                    StructField('ip_address', StringType(), True),
                    StructField('post_code', StringType(), True),
                    StructField('city', StringType(), True),
                    StructField('country', StringType(), True),
                    StructField('balance', DoubleType(), True),
                    StructField('created_on', StringType(), True)])

In [15]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .schema(customschema)\
    .load("/home/mcunhal/demo_tokenization/input/")

In [None]:
spark.sql(r"CREATE TABLE IF NOT EXISTS demo_tokenization.TOKENIZED_DATA(\
    iban STRING,\
    nino STRING,\
    first_name STRING,\
    last_name STRING,\
    email STRING,\
    gender STRING,\
    ip_address STRING,\
    post_code STRING,\
    city STRING,\
    country STRING,\
    balance DOUBLE,\
    bin_balance DOUBLE,\
    created_on STRING\
)\
STORED AS PARQUET")