In [301]:
# Load data
from pymongo import MongoClient
from bson.objectid import ObjectId
import os
import time
import json
from dotenv import load_dotenv
### LLMs - Langchain
import openai
from langchain_core.prompts import ChatPromptTemplate
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
### Summary chain
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.chains import LLMChain, MapReduceChain, load_summarize_chain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain_community.document_loaders import TextLoader
# Output format
from langchain_core.output_parsers import JsonOutputParser
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

In [302]:
def get_files(path: str):
    """
    Get all file names under the specified path.
    
    :param path: The root directory path to search.
    :return: A list of file names.
    """
    try:
        return [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
    except FileNotFoundError:
        print(f"Error: The path '{path}' does not exist.")
        return []
    except PermissionError:
        print(f"Error: Permission denied for path '{path}'.")
        return []
def remove_extension(file_names):
    """
    Removes the file extension from a list of file names.

    :param file_names: List of file names.
    :return: List of file names without the extension.
    """
    return [os.path.splitext(file_name)[0] for file_name in file_names]
def connect_mongodb(uri,db_name, collection_name):
    # Connect to MongoDB 
    client = MongoClient(uri)
    
    # Select the database and collection
    db = client[db_name]
    collection = db[collection_name]
    
    return collection
def get_data_by_id(collection, id_value):
    try:
        # Try to cast id_value to ObjectId if it is a valid ObjectId
        try:
            id_value = ObjectId(id_value)
        except:
            # If it is not a valid ObjectId, leave it as is (e.g., string _id)
            pass
        
        document = collection.find_one({"_id": id_value})
        return document
    except Exception as e:
        print(f"Error: {e}")
        return None
def load_specific_file(directory_path: str, file_name: str):
    """
    Load a specific text file from the given directory.

    :param directory_path: Path to the directory containing the traffic log file.
    :param file_name: The exact file name to be loaded.
    :return: The content of the file as a string (JSON formatted if applicable).
    """
    file_path = os.path.join(directory_path, file_name)

    # Check if the file exists
    if not os.path.isfile(file_path):
        print(f"Error: File '{file_name}' not found in the directory.")
        return None

    try:
        loader = TextLoader(file_path)
        doc = loader.load()[0]  # Load the single file
        content = doc.page_content.strip()

        # Check if the content is JSON
        try:
            traffic_json = json.loads(content)
            return json.dumps(traffic_json, indent=2)  # Pretty-print JSON
        except json.JSONDecodeError:
            return content  # Return as raw text if not JSON

    except Exception as e:
        print(f"Error loading file '{file_name}': {e}")
        return None

def create_privacy_compliance_prompt(data_safety: list, sensitive_data: str, explain_data_safety: dict):
    """
    Check if an Android application complies with privacy regulations by comparing
    declared data safety categories with actual sensitive data transmitted.
    
    :param data_safety: List of declared data safety categories.
    :param sensitive_data: JSON string of actual sensitive data detected.
    :param explain_data_safety: Dictionary explaining each data safety category.
    :return: A formatted prompt for an LLM to analyze privacy compliance.
    """
    # Convert list and dictionary to JSON strings
    data_safety_json = json.dumps(data_safety, indent=2)
    
    privacy_compliance_prompt = PromptTemplate(
        input_variables=["data_safety", "sensitive_data", "explain_data_safety"],
        template="""
        You are a cybersecurity and privacy expert analyzing an Android application.
        Your task is to verify whether the app complies with privacy policies by comparing
        its declared data safety categories against the actual sensitive data it transmits.

        ### **Declared Data Safety Categories:**
        ```json
        {data_safety}
        ```

        ### **Actual Sensitive Data Detected:**
        ```json
        {sensitive_data}
        ```

        ### **Explanation of Data Safety Categories:**
        ```json
        {explain_data_safety}
        ```

        ### **Your Task:**
        1. Identify whether all detected sensitive data types are included in the declared data safety categories.
        2. If all detected sensitive data types are in the declared list, return:
        ```json
        {{
            "privacy-compliance": "yes"
        }}
        ```
        3. Otherwise, identify the sensitive data types that were not declared and map them to their corresponding category in `explain_data_safety`. Then return:
        ```json
        {{
            "privacy-compliance": "no",
            "violate": [mapped categories from explain_data_safety]
        }}
        ```
        
        Please only return a **structured JSON output** as described, and don't add any explanation.
        """
    )
    
    formatted_prompt = privacy_compliance_prompt.format(
        data_safety=data_safety_json, 
        sensitive_data=sensitive_data, 
        explain_data_safety=json.dumps(explain_data_safety, indent=2)
    )
    return formatted_prompt
def convert_json_to_dict(json_string):
    """
    Convert a JSON-formatted string into a Python dictionary.

    :param json_string: JSON string containing LLM response.
    :return: Dictionary representation of the JSON.
    """
    try:
        # Remove triple backticks if present
        json_string = json_string.strip().strip("```json").strip("```")
        # Convert JSON string to dictionary
        return json.loads(json_string)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return None
def write_log(file_path, content):
    """Append content to a log file."""
    with open(file_path, "a", encoding="utf-8") as log_file:
        log_file.write(content + "\n")
def update_mongodb_element(collection, _id, update_keyword, update_value):
    """
    Updates a MongoDB document based on the given _id value.
    
    :param collection: The MongoDB collection.
    :param _id: The unique identifier of the document to update.
    :param update_keyword: The field to update in the document.
    :param update_value: Either a string, a dictionary, or a list of dictionaries.
    :return: The result of the update operation.
    """
    # Validate that update_value is either a string, a dictionary, or a list of dictionaries
    if not isinstance(update_value, (str, dict, list)):
        raise ValueError("update_value must be either a string, a dictionary, or a list of dictionaries")

    # If it's a list, ensure it contains only dictionaries
    if isinstance(update_value, list) and not all(isinstance(item, dict) for item in update_value):
        raise ValueError("update_value as a list must contain only dictionaries")

    # MongoDB Update Query
    update_query = {"$set": {update_keyword: update_value}}

    # Perform update
    result = collection.update_one({"_id": _id}, update_query)

    # Return update result
    return result

In [303]:
sensitive_data_file_path = r"C:\Users\ASUS\anaconda3\wearable-privacy-compliance\sensitive_data_wearable_standalone_json"
log_dir = r"C:\Users\ASUS\anaconda3\wearable-privacy-compliance\privacy_compliance_wearable_standalone"

In [304]:
mongoDB_uri = 'mongodb://192.168.1.14:27017'
mongoDB_database = 'wearable-project' 
mongoDB_collection = 'wearable-standalone'

In [305]:
file_name_arr = get_files(sensitive_data_file_path)

In [306]:
package_name_arr = remove_extension(file_name_arr)
print("package_name_arr: ",package_name_arr)

package_name_arr:  ['app.groupcal.www', 'apps.r.compass', 'ch.publisheria.bring', 'com.albuquerquedesign.adanalog013', 'com.anghami', 'com.audible.application', 'com.c25k', 'com.cardiogram.v1', 'com.centr.app', 'com.codverter.wearflashlight', 'com.contorra.golfpad', 'com.customsolutions.android.alexa', 'com.dungelin.heartrate', 'com.exovoid.weather.app', 'com.fish4fun.mycards', 'com.fitbod.fitbod', 'com.fitiv.fitivapplication', 'com.flightradar24free', 'com.fourtechnologies.mynetdiary.ad', 'com.fsoydan.howistheweather', 'com.funnmedia.waterminder', 'com.furyapps.scoreswidget', 'com.GD.Minimal', 'com.golfbuddy.smartcaddie', 'com.google.android.apps.maps', 'com.google.android.contacts', 'com.google.android.deskclock', 'com.google.android.keep', 'com.grint.thegrint.pro', 'com.hrd.iam', 'com.hrd.motivation', 'com.imperon.android.gymapp', 'com.ingravity.woo', 'com.jee.calc', 'com.JiaRen.LCDsimple', 'com.juventus.app.android', 'com.krisdb.wearcasts', 'com.lemonsystems.cipadidas', 'com.luxsan

In [307]:
# Connect to the MongoDB collection
collection = connect_mongodb(mongoDB_uri,mongoDB_database,mongoDB_collection)

In [308]:
#. Chat model
# Setup model
# Load environment variables
load_dotenv()

# Retrieve API key
api_key = os.getenv("OPENAI_API_KEY")
# Ensure the API key is correctly set
if not api_key:
    raise ValueError("OPENAI_API_KEY is not set in the environment variables")

# Initialize the ChatOpenAI model
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    openai_api_key=api_key  # Ensure you explicitly pass the API key
)

In [309]:
explain_data_safety = explain_data_safety = {
  "Location": ["GPS","Latitude","Longitude"],
  "Personal info": [
    "Name",
    "Email Address",
    "User IDs",
    "Address",
    "Phone number",
    "Race and ethnicity",
    "Political or religious beliefs",
    "Sexual orientation",
    "Birthday",
    "Any other personal information. For example, a user's date of birth, gender identity, or veteran status."
  ],
  "Financial info": [
    "User payment info",
    "Purchase history",
    "Credit score",
    "Any other financial information. For example, a user's salary or debts."
  ],
  "Health and fitness": [
    "Health info",
    "Fitness info",
    "Height",
    "Weight"
  ],
  "Messages": [
    "Emails content",
    "SMS or MMS content",
    "Any other types of messages. For example, instant messages or chat content"
  ],
  "Photos or videos": [
    "Photos",
    "Videos"
  ],
  "Audio files": [
    "Voice or sound recordings",
    "Music files",
    "Any other user-created or user-provided audio files"
  ],
  "Files and docs": "Files and docs",
  "Calendar": "Calendar",
  "Contacts": "Contacts",
  "App activity": [
    "App interactions",
    "In-app search history",
    "Installed apps",
    "Any other user-generated content not listed here, or in any other section. For example, user bios, notes, or open-ended responses",
    "Any other user actions not listed here. For example, gameplay information likes, or dialog options"
  ],
  "Web browsing": [
    "Sends requests to users to make your app the default browser app",
    "Maintains a browsing cache or cookies."
  ],
  "App info and performance": [
    "Crash logs",
    "Diagnostics",
    "Any other app performance data"
  ],
  "Device or other IDs": [
    "Examples of these IDs include: an IMEI number, MAC address, Widevine Device ID, Firebase installation ID, or advertising identifier"
  ],
  "Token": ["Oauth token", "Access token", "JWT token","All other token"],
   "Advertising": ["Google Advertising ID (GAID)", "Android ID", "Widevine Device ID","IMEI (International Mobile Equipment Identity)","MEID (Mobile Equipment Identifier)","MAC Address","Serial Number","Firebase Installation ID (FID)","Instance ID","Google Play Install Referre","IP Address","SSID & BSSID","Session IDs","User ID","Device Fingerprinting"],
   "Authentication" : ["Password", "username","Phone number"]
}

In [310]:
for i in range(len(package_name_arr)):  
    print("------------------------------- Loop-"+str(i)+" -------------------------------")
    package_name = package_name_arr[i]
    print("Package name: ", package_name)
    log_file_path = os.path.join(log_dir, f"log-{package_name}.log")
    write_log(log_file_path, f"------------------- Loop-{i} -------------------")
    write_log(log_file_path, f"check_package_name: {package_name}")    
    app_data = get_data_by_id(collection, package_name)
    data_safety = app_data["data-shared"]
    
    sensitive_data_file = package_name+".txt"
    sensitive_data = load_specific_file(sensitive_data_file_path,sensitive_data_file)
    formatted_prompt = create_privacy_compliance_prompt(data_safety, sensitive_data,explain_data_safety)
    write_log(log_file_path, f"================= Prompt Template =================\n{formatted_prompt}")
    response = llm.invoke(formatted_prompt)
    write_log(log_file_path, f"***************** LLMs Response *****************\n{response.content}")
    response_dict = convert_json_to_dict(response.content)
    print("response_dict: ", response_dict)
    write_log(log_file_path, f"***************** Response Dict *****************\n{response_dict}")
    update_result_status = update_mongodb_element(collection, package_name, "privacy-compliance", response_dict)
    print("update_result_status: ",update_result_status)
#     break

------------------------------- Loop-0 -------------------------------
Package name:  app.groupcal.www
response_dict:  {'privacy-compliance': 'no', 'violate': ['Location', 'Personal info', 'Token', 'Advertising']}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-1 -------------------------------
Package name:  apps.r.compass
response_dict:  {'privacy-compliance': 'no', 'violate': ['Personal info', 'Advertising']}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-2 -------------------------------
Package name:  ch.publisheria.bring
response_dict:  {'privacy-compliance': 'no', 'violate': ['Advertising', 'Personal info', 'Token', 'Authentication']}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
--------------------

response_dict:  {'privacy-compliance': 'yes'}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-27 -------------------------------
Package name:  com.google.android.keep
response_dict:  {'privacy-compliance': 'no', 'violate': ['Advertising', 'Authentication']}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-28 -------------------------------
Package name:  com.grint.thegrint.pro
response_dict:  {'privacy-compliance': 'no', 'violate': ['Device or other IDs', 'Location', 'Personal info', 'Authentication', 'Advertising']}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-29 -------------------------------
Package name:  com.hrd.iam
response_dict:  {'privacy-compliance': 'yes'}
upd

response_dict:  {'privacy-compliance': 'yes'}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-55 -------------------------------
Package name:  com.watchfacestudio.minimalblack1
response_dict:  {'privacy-compliance': 'yes'}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-56 -------------------------------
Package name:  com.wikiloc.wikilocandroid
response_dict:  {'privacy-compliance': 'no', 'violate': ['Advertising']}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)
------------------------------- Loop-57 -------------------------------
Package name:  com.windyty.android
response_dict:  {'privacy-compliance': 'no', 'violate': ['Advertising']}
update_result_status:  UpdateResult({'n': 1, 'nModified': 1, 'ok': 1.