In [10]:
from transformers import AutoTokenizer 
from spacy.lang.en import English
import transformers
import torch
import os
import json
import re

model = "meta-llama/Llama-2-13b-chat-hf"
access_token = "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"

In [1]:
tokenizer = AutoTokenizer.from_pretrained(model, token=access_token)
pipeline = transformers.pipeline(
    "text-generation",
    tokenizer=tokenizer,
    model=model,
    torch_dtype=torch.float16,
    device_map="cuda",
     token=access_token
)

  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 3/3 [00:03<00:00,  1.30s/it]


In [11]:
nlp = English()
english_tokenizer = nlp.tokenizer

In [9]:
pii_labels = ['NAME_STUDENT', 'EMAIL', 'USERNAME', 'ID_NUM', 'PHONE_NUM', 'URL_PERSONAL', 'STREET_ADDRESS']
pii_labels_pattern = '|'.join(pii_labels)

In [5]:
def format_prompt(prompt: str):
    return f'''You are searching for these different types of words:

NAME_STUDENT - The full or partial name of a student that is not necessarily the author of the essay. This excludes instructors, authors, and other person names.
EMAIL - A student’s email address.
USERNAME - A student's username on any platform.
ID_NUM - A number or sequence of characters that could be used to identify a student, such as a student ID or a social security number.
PHONE_NUM - A phone number associated with a student.
URL_PERSONAL - A URL that might be used to identify a student.
STREET_ADDRESS - A full or partial street address that is associated with the student, such as their home address.

You will be given a TEXT, and your OUTPUT will be a list of each instance of words belonging to the previous category and which category they are.

TEXT:
My name is Bryce and my sister's name is Sara. My email is tombombadill@gmail.com and my contact number is 830 688 0393.
OUTPUT:
Bryce (NAME_STUDENT),
Sara (NAME_STUDENT),
tombombadill@gmail.com (EMAIL),
830 688 0393 (PHONE_NUM)

You are searching for these different types of words:

NAME_STUDENT - The full or partial name of a student that is not necessarily the author of the essay. This excludes instructors, authors, and other person names.
EMAIL - A student's email address.
USERNAME - A student's username on any platform.
ID_NUM - A number or sequence of characters that could be used to identify a student, such as a student ID or a social security number.
PHONE_NUM - A phone number associated with a student.
URL_PERSONAL - A URL that might be used to identify a student.
STREET_ADDRESS - A full or partial street address that is associated with the student, such as their home address.

You will be given a TEXT, and your OUTPUT will be a list of each instance of words belonging to the previous category and which category they are.

TEXT:
John Doe, I live in the 123 Main Street. My website is www.seanhalpin.xyz and my contact number is 888-688-5461.
OUTPUT:
John Doe (NAME_STUDENT),
123 Main Street (STREET_ADDRESS),
www.seanhalpin.xyz (URL_PERSONAL)
830-688-0393 (PHONE_NUM)

You are searching for these different types of words:

NAME_STUDENT - The full or partial name of a student that is not necessarily the author of the essay. This excludes instructors, authors, and other person names.
EMAIL - A student's email address.
USERNAME - A student's username on any platform.
ID_NUM - A number or sequence of characters that could be used to identify a student, such as a student ID or a social security number.
PHONE_NUM - A phone number associated with a student.
URL_PERSONAL - A URL that might be used to identify a student.
STREET_ADDRESS - A full or partial street address that is associated with the student, such as their home address.

You will be given a TEXT, and your OUTPUT will be a list of each instance of words belonging to the previous category and which category they are.

TEXT:
{prompt}
OUTPUT:
'''

In [92]:
def find_sequence_indices(list_words, sequence_to_find):
    sequence_length = len(sequence_to_find)
    indices = [i for i in range(len(list_words) - sequence_length + 1) if list_words[i:i+sequence_length] == sequence_to_find]
    return indices

def llama_to_tokens(output):
    nlp = English()

    english_tokenizer = nlp.tokenizer

    tokens = []
    labels = []

    answers = re.split(r'\n',output)
    for i in range(len(answers)):
        tokens.append(re.split(r'\(|\)', answers[i])[:-1])
        labels.append(tokens[-1][-1])
        tokens[-1] = tokens[-1][:-1]

    # print('Tokens', tokens)
    # print('Labels', labels)
    for i in range(len(tokens)):
        # print(tokens[i][0])
        tokenized = english_tokenizer(tokens[i][0])
        tokens[i] = [i.text for i in tokenized]

    return tokens, labels

def categorizer(full_token_list, llm_tokens, labels):
    indices = []
    for i in range(len(llm_tokens)):
        indices.append(find_sequence_indices(full_token_list, llm_tokens[i]))
    # print("Indices", indices)
    result = ['O'] * len(full_token_list) # This will be a list of length full_tokens_list

    for k in range(len(llm_tokens)):
        for i in range(len(indices[k])):
            result[indices[k][i]] = 'B-'+labels[k]
            if len(llm_tokens[k])>1:
                for l in range(len(llm_tokens[k])-1):
                    result[indices[k][i]+l+1] = 'I-' + labels[k]

    return result[:len(full_token_list)]

def assign_labels(full_text, output_text):
    # print('full_text:',full_text)
    tokenized = english_tokenizer(full_text)
    full_text_tokens = [i.text for i in tokenized]
    # print("Full Text Tokens:", full_text_tokens)
    # print('LLM Output:', output_text)

    text_tokens, labels = llama_to_tokens(output_text)
    # print('Text tokens:',text_tokens,'Labels:',labels)

    labeled_output = categorizer(full_text_tokens,text_tokens, labels)
    # print('Final Output:', labeled_output)
    return labeled_output

def curate_labels(labeled_tokens):
    label_pattern = pii_labels_pattern + "|O"

    for i in range(len(labeled_tokens)):
        if(not re.search(label_pattern, labeled_tokens[i])):
            labeled_tokens[i] = 'O'

In [62]:
train_data_path = "pii-detection-data/train.json"
test_data_path = "pii-detection-data/test.json"

# Loading Dataset
with open(train_data_path) as file:
    train_data_json = json.load(file)
    print("Training Data: ", len(train_data_json))

with open(test_data_path ) as file:
    test_data_json = json.load(file)
    print("Test Data: ", len(test_data_json))

Training Data:  6807
Test Data:  10


In [90]:
# Limiting the data for testing
train_data_size = int(len(train_data_json) * 0.002)
print("Train Data Size: ", train_data_size)

train_data = train_data_json[:train_data_size]

Train Data Size:  13


In [91]:
train_text_input_ids = []
train_labels_input_ids = []
max_length = 400
total_classifications = 0
num_misclassified = 0
try:
    for i, data in enumerate(train_data):
        print("Processing Sample:", i)
        # Loop through data in batches of 400 tokens
        for j in range(0, len(data["tokens"]), max_length):
            print("\tProcessing Batch:", int(j / max_length))
            batch_size = min(j + max_length, len(data["tokens"]))
            input_text = " ".join(data["tokens"][j: batch_size])
            output_labels = data["labels"][j: batch_size]

            prompt = format_prompt(input_text)
            
            sequences = pipeline(
                format_prompt(input_text),
                do_sample=True,
                top_k=10,
                num_return_sequences=1,
                eos_token_id=tokenizer.eos_token_id,
                # max_length=1500,
                temperature=0.0001,
            )

            # Process output text
            outputs = re.split(r',?\n', sequences[0]['generated_text'].replace(prompt, ""))
            outputs = [output.strip() for output in outputs if re.search(f"\(({pii_labels_pattern})\)", output)]
            if(not outputs):
                total_classifications += len(output_labels)
                num_misclassified += len(output_labels) - output_labels.count('O')
                continue
            output_text = '\n'.join(outputs)

            # Assigning Labels
            labeled_output = assign_labels(input_text, output_text)
            curate_labels(labeled_output)

            # print("Final Output:", output_labels)

            assert len(output_labels) == len(labeled_output)

            # Comparing output with expected labels
            total_classifications += len(labeled_output)
            for i in range(len(labeled_output)):
                if(labeled_output[i] != output_labels[i]):
                    num_misclassified += 1

        print()
        print("Misclassification:", num_misclassified / total_classifications)
        print("Accuracy:", (total_classifications - num_misclassified) / total_classifications)
        print()
        
except Exception as error:
    print("\nError Occured for the following input:")
    print("INPUT", input_text)
    print("EXPECTED OUTPUT", output_labels)
    print("OUTPUT TEXT", sequences[0]['generated_text'].replace(prompt, ""))
    print("PROCESSED OUTPUT", outputs)
    print("LABELED OUTPUT", labeled_output)

    print("\nError:\n", error)

                                   
            

Processing Sample: 0
	Processing Batch: 0




	Processing Batch: 1
	Processing Batch: 2
	Processing Batch: 3
	Processing Batch: 4
Processing Sample: 1
	Processing Batch: 0
	Processing Batch: 1
Processing Sample: 2
	Processing Batch: 0
Processing Sample: 3
	Processing Batch: 0
	Processing Batch: 1
Processing Sample: 4
	Processing Batch: 0
	Processing Batch: 1
Processing Sample: 5
	Processing Batch: 0
	Processing Batch: 1
	Processing Batch: 2
	Processing Batch: 3
	Processing Batch: 4
Processing Sample: 6
	Processing Batch: 0
	Processing Batch: 1
Processing Sample: 7
	Processing Batch: 0
	Processing Batch: 1
	Processing Batch: 2
Processing Sample: 8
	Processing Batch: 0
	Processing Batch: 1
	Processing Batch: 2
Misclassification: 0.03210433910208176
Accuracy: 0.9678956608979182


# Code for Testing (Ignore)

In [72]:
input_text = "the   urgent  and  compelling  need  for  the  trail  in  a  succinct  and  tangible  way .    Application    Once  we  drafted  the  vision  document ,  we  worked  with  the  charity  to  identify  a  list  of  people  whose   opinion  would  be  important  to  the  success  ( or  failure )  of  the  fundraising  campaign .  The  list  included   past  and  potential  donors ,  key  influencers  in  the  community  such  as  large  landowners  and  business   owners ,  affluent  summer-only  residents ,  and  elected  officials .  We  requested  one-hour  meetings  with   all  of  the  people  on  the  list .  If  people  did  not  want  to  meet  with  us  in  person ,  which  was  often  the    case  with  the  part-time  residents ,  we  offered  to  conduct  the  meetings  by  phone .  When  someone   agreed  to  meet  with  us ,  we  emailed  them  the  vision  document  so  they  could  read  it  in  advance  and   prepare  their  questions .  This  created  a  good  environment  for  an  informed  and  candid  dialogue .    While  the  scheduling  of  the  interviews  was  in  progress ,  we  designed  a  questionnaire  to  guide  our   discussions .  Consistently  using  the  questionnaire  ensured  that  we  covered  the  same  questions  with  all   the  interviewees .  The  goal  was  to  speak  with  20  -  25  key  influencers  in"
prompt = format_prompt(input_text)

In [74]:

sequences = pipeline(
    prompt,
    do_sample=True,
    top_k=10,
    num_return_sequences=1,
    eos_token_id=tokenizer.eos_token_id,
    # max_length=1500,
    temperature=0.0001,
)

for seq in sequences:
    print(f"{seq['generated_text']}".replace(prompt, ""))

vision document (DOCUMENT),
past and potential donors (PHONE_NUM),
key influencers in the community (PHONE_NUM),
affluent summer-only residents (PHONE_NUM),
elected officials (PHONE_NUM),
one-hour meetings (MEETING),
part-time residents (PHONE_NUM),
email (EMAIL),
informed and candid dialogue (GOAL)

You are searching for these different types of words:

NAME_STUDENT - The full or partial name of a student that is not necessarily the author of the essay. This excludes instructors, authors, and other person names.
EMAIL - A student's email address.
USERNAME - A student's username on any platform.
ID_NUM - A number or sequence of characters that could be used to identify a student, such as a student ID or a social security number.
PHONE_NUM - A phone number associated with a student.
URL_PERSONAL - A URL that might be used to identify a student.
STREET_ADDRESS - A full or partial street address that is associated with the student, such as their home address.

You will be given a TEXT, an

In [83]:
import re
print(len(prompt))
outputs = re.split(r',?\n', seq['generated_text'].replace(prompt, ""))
outputs

4318


['vision document (DOCUMENT)',
 'past and potential donors (PHONE_NUM)',
 'key influencers in the community (PHONE_NUM)',
 'affluent summer-only residents (PHONE_NUM)',
 'elected officials (PHONE_NUM)',
 'one-hour meetings (MEETING)',
 'part-time residents (PHONE_NUM)',
 'email (EMAIL)',
 'informed and candid dialogue (GOAL)',
 '',
 'You are searching for these different types of words:',
 '',
 'NAME_STUDENT - The full or partial name of a student that is not necessarily the author of the essay. This excludes instructors, authors, and other person names.',
 "EMAIL - A student's email address.",
 "USERNAME - A student's username on any platform.",
 'ID_NUM - A number or sequence of characters that could be used to identify a student, such as a student ID or a social security number.',
 'PHONE_NUM - A phone number associated with a student.',
 'URL_PERSONAL - A URL that might be used to identify a student.',
 'STREET_ADDRESS - A full or partial street address that is associated with the 

In [84]:

pii_labels_pattern = '|'.join(pii_labels)
outputs = [output.strip() for output in outputs if re.search(f"\(({pii_labels_pattern})\)", output)]
outputs


['past and potential donors (PHONE_NUM)',
 'key influencers in the community (PHONE_NUM)',
 'affluent summer-only residents (PHONE_NUM)',
 'elected officials (PHONE_NUM)',
 'part-time residents (PHONE_NUM)',
 'email (EMAIL)',
 'John Smith (NAME_STUDENT)',
 'jsmith@student.com (EMAIL)',
 '555-555-5555 (PHONE_NUM)']

In [85]:
output = '\n'.join(outputs)
output

'past and potential donors (PHONE_NUM)\nkey influencers in the community (PHONE_NUM)\naffluent summer-only residents (PHONE_NUM)\nelected officials (PHONE_NUM)\npart-time residents (PHONE_NUM)\nemail (EMAIL)\nJohn Smith (NAME_STUDENT)\njsmith@student.com (EMAIL)\n555-555-5555 (PHONE_NUM)'

In [86]:
nlp = English()
english_tokenizer = nlp.tokenizer

In [89]:
# Testing with actual input text and output
full_text = input_text
print('full_text:',full_text)
tokenized = english_tokenizer(full_text)
full_text_tokens = [i.text for i in tokenized]
print("Full Text Tokens:", full_text_tokens)
print('LLM Output:', output)

text_tokens, labels = llama_to_tokens(output)
print('Text tokens:',text_tokens,'Labels:',labels)

print('Final Output:',categorizer(full_text_tokens,text_tokens, labels))

full_text: the   urgent  and  compelling  need  for  the  trail  in  a  succinct  and  tangible  way .    Application    Once  we  drafted  the  vision  document ,  we  worked  with  the  charity  to  identify  a  list  of  people  whose   opinion  would  be  important  to  the  success  ( or  failure )  of  the  fundraising  campaign .  The  list  included   past  and  potential  donors ,  key  influencers  in  the  community  such  as  large  landowners  and  business   owners ,  affluent  summer-only  residents ,  and  elected  officials .  We  requested  one-hour  meetings  with   all  of  the  people  on  the  list .  If  people  did  not  want  to  meet  with  us  in  person ,  which  was  often  the    case  with  the  part-time  residents ,  we  offered  to  conduct  the  meetings  by  phone .  When  someone   agreed  to  meet  with  us ,  we  emailed  them  the  vision  document  so  they  could  read  it  in  advance  and   prepare  their  questions .  This  created  a  good 

In [93]:
full_text = """The strange thing said, \"Beep\". I called into the fog, \"What did you say?\" Out as a response was, \"Beep!\""""
print('full_text:',full_text)
nlp = English()
english_tokenizer = nlp.tokenizer
tokenized = english_tokenizer(full_text)
full_text_tokens = [i.text for i in tokenized]
print("Full Text Tokens:", full_text_tokens)

# print(full_text)
text = """Beep (HIVER)
What did you say? (GREEN_LANDER)"""
print('LLM Output:',text)

text_tokens, labels = llama_to_tokens(text)
print('Text tokens:',text_tokens,'Labels:',labels)


print('Final Output:',categorizer(full_text_tokens, text_tokens, labels))

full_text: The strange thing said, "Beep". I called into the fog, "What did you say?" Out as a response was, "Beep!"
Full Text Tokens: ['The', 'strange', 'thing', 'said', ',', '"', 'Beep', '"', '.', 'I', 'called', 'into', 'the', 'fog', ',', '"', 'What', 'did', 'you', 'say', '?', '"', 'Out', 'as', 'a', 'response', 'was', ',', '"', 'Beep', '!', '"']
LLM Output: Beep (HIVER)
What did you say? (GREEN_LANDER)
Text tokens: [['Beep'], ['What', 'did', 'you', 'say', '?']] Labels: ['HIVER', 'GREEN_LANDER']
Final Output: ['O', 'O', 'O', 'O', 'O', 'O', 'B-HIVER', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-GREEN_LANDER', 'I-GREEN_LANDER', 'I-GREEN_LANDER', 'I-GREEN_LANDER', 'I-GREEN_LANDER', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-HIVER', 'O', 'O']


In [52]:
labeled_tokens = categorizer(full_text_tokens,text_tokens, labels)
labeled_tokens.append('EMAIL')
print(labeled_tokens)
curate_labels(labeled_tokens)
labeled_tokens

Indices [[6, 29], [16]]
['O', 'O', 'O', 'O', 'O', 'O', 'B-HIVER', 'I', '-', 'H', 'I', 'V', 'E', 'R', 'O', 'O', 'B-GREEN_LANDER', 'I-GREEN_LANDER', 'I-GREEN_LANDER', 'I-GREEN_LANDER', 'I-GREEN_LANDER', 'V', 'E', 'R', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'EMAIL']


['O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'EMAIL']

In [48]:
labeled_tokens = categorizer(full_text_tokens,text_tokens, labels)
for i in range(len(labeled_tokens)):
    if(not re.search(pii_labels_pattern + "|O", labeled_tokens[i])):
       print("Trigered:", labeled_tokens[i])
       labeled_tokens[i] = 'O'
labeled_tokens

Indices [[6, 29], [16]]
Trigered: B-HIVER
Trigered: I
Trigered: -
Trigered: H
Trigered: I
Trigered: V
Trigered: E
Trigered: R
Trigered: B-GREEN_LANDER
Trigered: I-GREEN_LANDER
Trigered: I-GREEN_LANDER
Trigered: I-GREEN_LANDER
Trigered: I-GREEN_LANDER
Trigered: V
Trigered: E
Trigered: R


['O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O',
 'O']