First load the annotations into individual text files which can be read in for processing.

Privacy policies will be broken into sections of annotations and read to create a structure of which actions data types and purposes appear in each section. Each section meaning all phrases within the policy where actions data types and purposes are connected to each other. 

This will give us information to append to a graph of the privacy stories within the individual policies. 



In [19]:
import os 
import re
from docx import Document


# Running the function with the updated file path
docx_path = '../policies_annotated/Annotated policies.docx'
annotation_dir = '../policies_annotated/annotated_policies'

def extract_annotations(docx_path, annotation_dir):
    # Load the document
    doc = Document(docx_path)

    # Create the output directory if it doesn't exist
    if not os.path.exists(annotation_dir):
        os.makedirs(annotation_dir)

    # Initialize variables to keep track of sections
    current_id = None
    current_privacy_policy = None
    current_data_safety = None
    in_privacy_policy = False
    in_data_safety = False

    info = []
    # Iterate through the paragraphs in the document
    for para in doc.paragraphs:
        if para.style.name == 'Heading 5':  # ID
            # Save previous section's data if available
            if current_id is not None:
                try:
                    save_policy(annotation_dir, current_id, current_privacy_policy, current_data_safety)
                except:
                    print(f'error with {current_id}')

            # Reset variables for the new section
            current_id = para.text.replace('ID: ', '').strip()  # Remove 'ID: ' prefix and trailing spaces
            current_privacy_policy = ''
            current_data_safety = ''
            in_privacy_policy = False
            in_data_safety = False

        elif para.style.name == 'Heading 4':
            if 'privacy' in para.text.lower():
                # We reached the privacy policy section
                in_privacy_policy = True
                in_data_safety = False
            elif 'data' in para.text.lower():
                # We are entering the data safety section
                in_data_safety = True
                in_privacy_policy = False

        elif in_privacy_policy:
            # Add text to the privacy policy
            current_privacy_policy += para.text + '\n'

        elif in_data_safety:
            # Add text to the data safety
            current_data_safety += para.text + '\n'
    
    # Save the last section's data
    if current_id is not None:
        try:
            save_policy(annotation_dir, current_id, current_privacy_policy, current_data_safety)
        except:
            print(f'error with {current_id}')

extract_annotations(docx_path, annotation_dir)

def save_policy(annotation_dir, policy_id, privacy_policy, data_safety):
    # Create a subfolder for the policy, ensuring path is normalized
    policy_folder = os.path.join(annotation_dir, policy_id)
    if not os.path.exists(policy_folder):
        os.makedirs(policy_folder)

    # Save the privacy policy text
    privacy_policy_path = os.path.join(policy_folder, 'privacy_policy.txt')
    with open(privacy_policy_path, 'w', encoding='utf-8') as file:
        file.write(privacy_policy)

    # Save the data safety text
    data_safety_path = os.path.join(policy_folder, 'data_safety.txt')
    with open(data_safety_path, 'w', encoding='utf-8') as file:
        file.write(data_safety)

def extract_annotations_and_tags(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()

    # Regex patterns for annotations and individual tags
    annotation_pattern = r'\{\#s(.*?)\}'
    action_pattern = r'\[\#a(.*?)\]'
    data_type_pattern = r'\[\#dt(.*?)\]'
    purpose_pattern = r'\[\#p(.*?)\]'

    # Extracting annotations
    annotations = re.findall(annotation_pattern, text, re.DOTALL)

    # Extracting individual tags within each annotation
    actions = set()
    data_types = set()
    purposes = set()
    for annotation in annotations:
        actions.update(re.findall(action_pattern, annotation))
        data_types.update(re.findall(data_type_pattern, annotation))
        purposes.update(re.findall(purpose_pattern, annotation))

    return annotations, list(actions), list(data_types), list(purposes)

annotations_results = {}

# Iterate through all directories within 'annotated_policies'
for app_dir in os.listdir(annotation_dir):
    app_directory_path = os.path.join(annotation_dir, app_dir)

    # Check if it's a directory
    if os.path.isdir(app_directory_path):
        # Iterate through all files named 'privacy_policy.txt' within the app directory
        for file_name in os.listdir(app_directory_path):
            if file_name == 'privacy_policy.txt':
                file_path = os.path.join(app_directory_path, file_name)
                annotations, actions, data_types, purposes = extract_annotations_and_tags(file_path)
                result_key = os.path.join(app_dir, file_name)
                annotations_results[result_key] = {
                    'Annotations': annotations,
                    'Actions': actions,
                    'Data Types': data_types,
                    'Purposes': purposes
                }

# Print the results for all directories
for result_key, result_data in annotations_results.items():
    print(f"Results for {result_key}:")
    print(result_data)




error with Description:
error with Privacy policy:
error with Description:
error with Privacy policy:
error with Id: com.canvism
error with Id: com.marblehead.mobile
Results for afmstudio.craigslistclassifiedslocal\privacy_policy.txt:
{'Annotations': [' We [#a collect] [#dt personal information] to [#p provide and enhance our Services]./', " we [#a collect] and how we obtain it. 2.1 Information Provided by You During your registration or use of our Services, you may voluntarily provide the following personal information: [#dt Email address] [#dt Mobile telephone number] [# dt Geographic location] [#dt Profile picture] [#dt User ID]  [#dt Facebook] or [#dt Google account details] [#dt Driver's license] or other [#dt government ID] /", ' Information [#a Collected] Automatically When you interact with our Platform or use our Services, we automatically [#a collect] the following information about you: [#dt Device Information:][#dt Operating system version], [#dt device make and model], [#d

In [20]:
# Now we will combine the related actions data types and purposes within each #s 

# There will be connections between all unique entities within each #s 

# And fill out template under conditions of valid stories 

# And display graphs for each connection 


# Initialize a dictionary to hold grouped annotations for each file
grouped_annotations_by_file = {}

# Iterate through the annotations_results dictionary
for file_name, results in annotations_results.items():
    grouped_annotations = []
    for annotation in results['Annotations']:
        # Splitting each annotation into individual elements
        # Assuming each element is separated by a space or some delimiter
        elements = annotation.split()  # Modify this if a different delimiter is used
        grouped_annotations.append(annotation)
    
    # Store the grouped annotations for this file
    grouped_annotations_by_file[file_name] = grouped_annotations

# Now, grouped_annotations_by_file contains grouped annotations for each file
for file_name, grouped_annotations in grouped_annotations_by_file.items():
    print(f"Groups for {file_name}:")
    for group in grouped_annotations:
        print(group)
    print("\n") 

# This list represents the privacy stories which are fully present in individual annotation
def generate_type_1_story(grouped_annotations_by_file):
    sentences_by_file = {}

    for file_name, grouped_annotations in grouped_annotations_by_file.items():
        unique_sentences = set()  # Use a set to store unique sentences
        for group in grouped_annotations:
            # Extract individual actions, data types, and purposes
            actions = re.findall(r'\[\#a(.*?)\]', group)
            data_types = re.findall(r'\[\#dt(.*?)\]', group)
            purposes = re.findall(r'\[\#p(.*?)\]', group)

            # Create sentences for each combination of action, data type, and purpose
            for action in actions:
                for data_type in data_types:
                    for purpose in purposes:
                        sentence = f"We {action.strip()} {data_type.strip()} for the purpose of {purpose.strip()}."
                        unique_sentences.add(sentence)  # Add sentence to the set

        sentences_by_file[file_name] = list(unique_sentences)  # Convert set back to list

    return sentences_by_file

type_1_stories = generate_type_1_story(grouped_annotations_by_file)


def build_annotations(grouped_annotations_by_file):
    all_a = {}
    all_dt = {}
    all_p = {}

    for file_name, grouped_annotations in grouped_annotations_by_file.items():
        a = {}
        dt = {}
        p = {}
        print(f"File: {file_name}")

        for i, group in enumerate(grouped_annotations):
            # Extract individual actions, data types, and purposes within the annotation
            actions = re.findall(r'\[\#a(.*?)\]', group)
            a[i] = [action.strip() for action in actions]
            data_types = re.findall(r'\[\#dt(.*?)\]', group)
            dt[i] = [data_type.strip() for data_type in data_types]
            purposes = re.findall(r'\[\#p(.*?)\]', group)
            p[i] = [purpose.strip() for purpose in purposes]

            # Printing the elements of the current annotation
            print(f"\nAnnotation {i+1}:")
            print("Actions:", a[i])
            print("Data Types:", dt[i])
            print("Purposes:", p[i])

        all_a[file_name] = a
        all_dt[file_name] = dt
        all_p[file_name] = p
        print("\n")  

    return all_a, all_dt, all_p

all_a, all_dt, all_p = build_annotations(grouped_annotations_by_file)

def group_annotations_by_index(all_a, all_dt, all_p):
    grouped_annotations = {}

    for file_name in all_a.keys():
        grouped_file_annotations = []

        num_annotations = len(all_a[file_name])

        for i in range(num_annotations):
            annotation_group = {
                'actions': all_a[file_name].get(i, []),
                'data_types': all_dt[file_name].get(i, []),
                'purposes': all_p[file_name].get(i, [])
            }
            grouped_file_annotations.append(annotation_group)

        grouped_annotations[file_name] = grouped_file_annotations

    return grouped_annotations


# Now, 'grouped_annotations_by_file' contains the grouped annotations per file.\
# file_name = 'your_file_name_here'
# annotations_for_file = grouped_annotations_by_file.get(file_name, [])




Groups for afmstudio.craigslistclassifiedslocal\privacy_policy.txt:
 We [#a collect] [#dt personal information] to [#p provide and enhance our Services]./
 we [#a collect] and how we obtain it. 2.1 Information Provided by You During your registration or use of our Services, you may voluntarily provide the following personal information: [#dt Email address] [#dt Mobile telephone number] [# dt Geographic location] [#dt Profile picture] [#dt User ID]  [#dt Facebook] or [#dt Google account details] [#dt Driver's license] or other [#dt government ID] /
 Information [#a Collected] Automatically When you interact with our Platform or use our Services, we automatically [#a collect] the following information about you: [#dt Device Information:][#dt Operating system version], [#dt device make and model], [#dt and mobile network details]. [#dt Location Information]: Depending on permissions, we may collect [#dt  location data] using methods like [#dt IP address], [#dt GPS],[#dt  Wi-Fi access poin

In [21]:

# Functions to match similar data types and actions according to their appearance in the ontology 
import json
import nltk
from nltk.stem import WordNetLemmatizer, PorterStemmer
from nltk.stem import WordNetLemmatizer

nltk.download('punkt')
nltk.download('wordnet')

def load_ontology(file_path):
    try:
        with open(file_path, 'r') as file:
            ontology_data = json.load(file)
        return ontology_data
    except FileNotFoundError:
        print(f"Error: The ontology file '{file_path}' was not found.")
        return None
    except Exception as e:
        print(f"Error: An error occurred while loading the ontology file. {str(e)}")
        return None

#TODO Get working for multiple words 
def find_category_level_in_sentence(sentence, ontology):
    lemmatizer = WordNetLemmatizer()
    stemmer = PorterStemmer()
    words = sentence.split()
    wordsList = []
    for word in words:
        cleaned_word = word.strip("()")
        lemmatized_word = lemmatizer.lemmatize(cleaned_word)
        stemmed_word = stemmer.stem(lemmatized_word)
        wordsList.append(word)
        if len(wordsList) > 1:
            # First try to find the original word from the ontology
            level = find_category_level(' '.join(wordsList), ontology)  # Join multiple words into a single string
            if level is not None:
                return ' '.join(wordsList), level

        # Try to find the original word from the ontology for lemmatized and stemmed versions
        level = find_category_level(lemmatized_word, ontology)
        if level is not None:
            return cleaned_word, level

        level = find_category_level(stemmed_word, ontology)
        if level is not None:
            return cleaned_word, level

    return None, None

#Update to search for word in any form being within certain layer if is action , data type or purpose 
def find_category_level(word, category=None, level=0):
    if category is None:
        category = ontology
    if isinstance(category, dict):
        for key, value in category.items():
            if word.lower() in key.lower():
                return level
            elif isinstance(value, dict):
                result = find_category_level(word, value, level=level + 1)
                if result is not None:
                    return result
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, dict):
                        result = find_category_level(word, item, level=level + 1)
                        if result is not None:
                            return result
                    elif isinstance(item, str) and word.lower() in [syn.lower() for syn in value]:
                        return level
    return None

# Load the ontology data from a local JSON file
ontology = load_ontology('../privacy_ontology.json')

if ontology is not None:
    # Example usage:
    for file in os.listdir('../policies_annotated/policies_annotated_text/'):
        sentence = "app interactions"
        word_to_find, level = find_category_level_in_sentence(sentence, ontology)
        if word_to_find is not None:
            print(f"The word '{word_to_find}' is found at level {level} in the ontology.")
        else:
            print(f"No word found in the sentence that is in the ontology.")


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Baldw\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Baldw\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Now for processing the data safety section, we can automatically annotate all of the items as they would appear in the ontology and load these into annotated texts which can be processed into a graph. 

In [22]:
# Functions to attribute the words within data safety to the privacy ontology 

def find_in_ontology(input_string, ontology):
    """
    Load a JSON file and find the level and path for a multi-word input, using real stemming.
    It prioritizes multi-word matches before checking individual words.

    :param input_string: The string to search for in the JSON data.
    :param ontology: Path to the JSON file.
    :return: A tuple with the level and path at which the string is found, or None if not found.
    """
    stemmer = PorterStemmer()
    words = input_string.split()

    def stem_words(word_list):
        return ' '.join([stemmer.stem(word.lower()) for word in word_list])

    def search_nested(data, level, path, search_term):
        if isinstance(data, dict):
            for key, value in data.items():
                key_stem = stemmer.stem(key.lower())
                if search_term == key_stem:
                    return level, path + [key]
                found_level, found_path = search_nested(value, level + 1, path + [key], search_term)
                if found_level:
                    return found_level, found_path
        elif isinstance(data, list):
            for item in data:
                item_stem = stemmer.stem(item.lower())
                if search_term == item_stem:
                    return level, path
        return None, []

    with open(ontology, 'r') as file:
        json_data = json.load(file)

    # Search for the whole phrase first
    stemmed_phrase = stem_words(words)
    level, path = search_nested(json_data, 1, [], stemmed_phrase)
    if level:
        return level, path

    # If not found, search for smaller groups of words, then individual words
    for i in range(len(words), 0, -1):
        for j in range(len(words) - i + 1):
            stemmed_group = stem_words(words[j:j+i])
            level, path = search_nested(json_data, 1, [], stemmed_group)
            if level:
                return level, path

    return None, []

# File path to the JSON file
ontology = '../privacy_ontology.json'

# Testing the function with "use third party" and "app interactions"
input_strings = ['cart']
for input_string in input_strings:
    level, path = find_in_ontology(input_string, ontology)
    print(f"Input: '{input_string}' found at level {level}, path: {path}")

def annotate_text_with_ontology_tags(text, ontology):
    """
    Annotates a given text with tags based on whether words or phrases match entries in the ontology.
    Tags are [#a] for actions, [#dt] for data types, and [#p] for purposes.
    Only stems words for matching actions and data types, not purposes.

    :param text: The text to annotate.
    :param ontology: Path to the ontology JSON file.
    :return: The annotated text.
    """
    stemmer = PorterStemmer()

    def stem_word(word):
        return stemmer.stem(word.lower())

    with open(ontology, 'r') as file:
        ontology = json.load(file)

    # Flatten the ontology for easier searching
    flattened_ontology = {}
    def flatten_ontology(data, category, path=[]):
        if isinstance(data, dict):
            for key, value in data.items():
                if category in ["Actions", "Data Types"]:
                    key_stem = stem_word(key)
                else:
                    key_stem = key.lower()  # Do not stem for "Purpose"
                flattened_ontology[key_stem] = (category, path + [key])
                flatten_ontology(value, category, path + [key])
        elif isinstance(data, list):
            for item in data:
                if category in ["Actions", "Data Types"]:
                    item_stem = stem_word(item)
                else:
                    item_stem = item.lower()  # Do not stem for "Purpose"
                flattened_ontology[item_stem] = (category, path)

    for category, data in ontology.items():
        flatten_ontology(data, category)

    # Split the text into words
    words = text.split()

    # Function to get category tag
    def get_tag(category):
        if category == "Actions":
            return "[#a]"
        elif category == "Data Types":
            return "[#dt]"
        elif category == "Purpose":
            return "[#p]"
        return ""

    # Annotate the text
    annotated_words = []
    skip_next = False
    for i in range(len(words)):
        if skip_next:
            skip_next = False
            continue

        # Check for two-word matches
        if i < len(words) - 1:
            two_word_action_data = stem_word(words[i] + ' ' + words[i + 1])
            two_word_purpose = (words[i] + ' ' + words[i + 1]).lower()
            if two_word_action_data in flattened_ontology or two_word_purpose in flattened_ontology:
                key = two_word_action_data if two_word_action_data in flattened_ontology else two_word_purpose
                category, _ = flattened_ontology[key]
                annotated_words.append(get_tag(category) + words[i] + ' ' + words[i + 1] + get_tag(category))
                skip_next = True
                continue

        # Check for one-word matches
        one_word_action_data = stem_word(words[i])
        one_word_purpose = words[i].lower()
        if one_word_action_data in flattened_ontology or one_word_purpose in flattened_ontology:
            key = one_word_action_data if one_word_action_data in flattened_ontology else one_word_purpose
            category, _ = flattened_ontology[key]
            annotated_words.append(get_tag(category) + words[i] + get_tag(category))
        else:
            annotated_words.append(words[i])

    return ' '.join(annotated_words)


#(The output of running data_safety_scraper for ai.blueplate.app)
text = ("Data shared Data that may be shared with other companies or organizations Financial info User payment info and Purchase history Data shared and for what purpose info User payment info App functionality Purchase history App functionality Analytics Developer communications Advertising or marketing Personalization Account management Personal info Name Email address and Phone number Data shared and for what purpose info Name App functionality Email address App functionality Analytics Advertising or marketing Phone number App functionality App activity App interactions Data shared and for what purpose info App interactions App functionality Analytics Personalization Account management Data collected Data this app may collect Personal info Name User IDs and Address Data collected and for what purpose info Name App functionality Analytics Developer communications Advertising or marketing Personalization Account management User IDs App functionality Analytics Account management Address Optional App functionality Advertising or marketing App activity App interactions Data collected and for what purpose info App interactions App functionality Analytics Fraud prevention security and compliance Personalization Security practices Data is encrypted in transit Your data is transferred over a secure connection You can request that data be deleted The developer provides a way for you to request that your data be deleted info For more information about collected and shared data see the developer's privacy policy")
# Annotating the text
annotated_text = annotate_text_with_ontology_tags(text, ontology)
annotated_text = annotated_text.lower()
print(annotated_text)





Input: 'cart' found at level 4, path: ['Data Types', 'Personal Data', 'Financial', 'Carts']
data [#a]shared[#a] data that may be [#a]shared[#a] with other companies or organizations [#dt]financial[#dt] info user payment info and [#dt]purchase history[#dt] data [#a]shared[#a] and for what purpose info user payment info app [#p]functionality[#p] [#dt]purchase history[#dt] app [#p]functionality[#p] [#p]analytics[#p] developer communications [#p]advertising[#p] or [#p]marketing[#p] [#p]personalization[#p] [#p]account management[#p] personal info [#dt]name[#dt] [#dt]email address[#dt] and [#dt]phone number[#dt] data [#a]shared[#a] and for what purpose info [#dt]name[#dt] app [#p]functionality[#p] [#dt]email address[#dt] app [#p]functionality[#p] [#p]analytics[#p] [#p]advertising[#p] or [#p]marketing[#p] [#dt]phone number[#dt] app [#p]functionality[#p] [#dt]app activity[#dt] [#dt]app interactions[#dt] data [#a]shared[#a] and for what purpose info [#dt]app interactions[#dt] app [#p]functional

In [23]:

#Annotate all of the data safety files created in processing docx file 
import os
import json
from nltk.stem import PorterStemmer
def process_data_safety_files(base_directory, ontology_path):

    # Iterate through subdirectories and process each data safety file
    for subdir in os.listdir(base_directory):
        subdir_path = os.path.join(base_directory, subdir)
        if os.path.isdir(subdir_path):
            data_safety_file = os.path.join(subdir_path, 'data_safety.txt')
            annotated_file = os.path.join(subdir_path, 'data_safety_annotated.txt')

            if os.path.exists(data_safety_file):
                # Read the data safety text
                with open(data_safety_file, 'r', encoding='utf-8') as file:
                    data_safety_text = file.read()

                # Annotate the text
                annotated_text = annotate_text_with_ontology_tags(data_safety_text, ontology)

                # Save the annotated text
                with open(annotated_file, 'w', encoding='utf-8') as file:
                    file.write(annotated_text)
                # print(f"Annotated file saved in {annotated_file}")

# File path to the JSON ontology file
ontology = '../privacy_ontology.json'

# Process all data safety files
process_data_safety_files(annotation_dir, ontology)
