In [1]:
import os
import re
import sqlite3
import json
from urllib.parse import urlparse
import base64
import hashlib
from haralyzer import HarParser
from collections import Counter

### 3rd party requests Database

In [2]:
def extractRequests(entry, host):
    request = entry.get('request', {})
    if 'url' in request:
        request_url_parts = urlparse(request['url']).hostname.rsplit('.')
        if (request_url_parts[-1] == 'uk' and request_url_parts[-2] == 'co'):
            request_url = request_url_parts[-3] + '.' + request_url_parts[-2] + '.' + request_url_parts[-1]
        elif len(request_url_parts) >= 2: 
            request_url = request_url_parts[-2] + '.' + request_url_parts[-1]
        else:
            return None    

        if host not in request_url:
            return request_url

    return None

def processFolder(path):
    allDomains = []
    for filename in os.listdir(path):
        if filename.endswith('.har'):
            with open(os.path.join(path, filename), 'r', encoding='utf-8') as file:
                harData = json.load(file)
                entries = harData.get('log', {}).get('entries', [])
                for entry in entries:
                    match = re.search(r'(m|www)\.(.*)(\.har)', filename)
                    host = match.group(2) if match else None
                    
                    domain = extractRequests(entry, host)
                    if domain:
                        allDomains.append((domain, host, re.search(r'/(desktop|mobile)/', path).group(1), re.search(r'/(shops|news)/', path).group(1)))
    return allDomains

dbConnect = sqlite3.connect('third_party_analysis.db')
dbCursor = dbConnect.cursor()

dbCursor.execute('''
    CREATE TABLE IF NOT EXISTS domain_counts (
        id INTEGER PRIMARY KEY,
        domain TEXT,
        host TEXT,
        type TEXT,
        website_type TEXT
    )
''')

paths = ['../archives/news/desktop/', '../archives/news/mobile/', '../archives/shops/desktop/', '../archives/shops/mobile/']

allDomains = []

for path in paths:
    allDomains += processFolder(path)

for domain, host, type, website_type in allDomains:
    dbCursor.execute('INSERT INTO domain_counts (domain, host, type, website_type) VALUES (?, ?, ?, ?)', (domain, host, type, website_type))

dbConnect.commit()
dbConnect.close()

### Sensitive Information Database

In [3]:


dbConnect = sqlite3.connect('third_party_analysis.db')
dbCursor = dbConnect.cursor()

dbCursor.execute('''
    CREATE TABLE IF NOT EXISTS EmailHashes (
        id INTEGER PRIMARY KEY,
        host TEXT NOT NULL,
        hashType TEXT NOT NULL,
        domain TEXT NOT NULL,
        Location TEXT NOT NULL,
        device_type TEXT,
        website_type TEXT
    )
''')

newsHosts = ["zeit.de", "cnn.com", "faz.net", "merkur.de", "n-tv.de", "sueddeutsche.de", "telegraph.co.uk", "theguardian.com", "thehindu.com", "zeit.de"]
shopsHosts = ["amazon.de", "depot-online.de", "douglas.de", "hm.com", "nike.com", "samsung.com", "saturn.de", "sephora.de", "uniqlo.com", "zalando.de"]
other = ["zalando.com", "nikecloud.com", "user.id" "depot.com"]

email = 'yannick.nastja@gmail.com'

def isElementSubstringOfX(array, x):
    for element in array:
        if element in x:
            return True
    return False

def contains_md5(host, text, domain, location, device_type, website_type):
    hashed_email = hashlib.md5(email.encode()).hexdigest()
    if hashed_email in text:
        dbCursor.execute("INSERT INTO EmailHashes (Host, HashType, Domain, Location, device_type, website_type) VALUES (?, ?, ?, ?, ?, ?)", (host, 'MD5', domain, location, device_type, website_type))

def contains_base64(host, text, domain, location, device_type, website_type):
    encoded_email = base64.b64encode(email.encode()).decode()
    if encoded_email in text:
        dbCursor.execute("INSERT INTO EmailHashes (Host, HashType, Domain, Location, device_type, website_type) VALUES (?, ?, ?, ?, ?, ?)", (host, 'base64', domain, location, device_type, website_type))

def contains_sha256(host, text, domain, location, device_type, website_type):
    hashed_email = hashlib.sha256(email.encode()).hexdigest()
    if hashed_email in text:
        dbCursor.execute("INSERT INTO EmailHashes (Host, HashType, Domain, Location, device_type, website_type) VALUES (?, ?, ?, ?, ?, ?)", (host, 'SHA256', domain, location,  device_type, website_type))

def contains_sha1(host, text, domain, location, device_type, website_type):
    hashed_email = hashlib.sha1(email.encode()).hexdigest()
    if hashed_email in text:
        dbCursor.execute("INSERT INTO EmailHashes (Host, HashType, Domain, Location, device_type, website_type) VALUES (?, ?, ?, ?, ?, ?)", (host, 'SHA1', domain, location, device_type, website_type))

def contains_sha224(host, text, domain, location, device_type, website_type):
    hashed_email = hashlib.sha224(email.encode()).hexdigest()
    if hashed_email in text:
        dbCursor.execute("INSERT INTO EmailHashes (Host, HashType, Domain, Location, device_type, website_type) VALUES (?, ?, ?, ?, ?, ?)", (host, 'SHA224', domain, location, device_type, website_type))

def contains_sha3_512(host, text, domain, location, device_type, website_type):
    hashed_email = hashlib.sha3_512(email.encode()).hexdigest()
    if hashed_email in text:
        dbCursor.execute("INSERT INTO EmailHashes (Host, HashType, Domain, Location, device_type, website_type) VALUES (?, ?, ?, ?, ?, ?)", (host, 'SHA512', domain, location, device_type, website_type))

def contains_mail(host, text, domain, location, device_type, website_type):
    if email in text:
        dbCursor.execute("INSERT INTO EmailHashes (Host, HashType, Domain, Location, device_type, website_type) VALUES (?, ?, ?, ?, ?, ?)", (host, 'plain', domain, location, device_type, website_type))

def findMail(entries, host, device_type, website_type):
    for entry in entries:
        request = entry.get('request', {})
        response = entry.get('response', {})
        domain = urlparse(request.get('url', '')).hostname

        for text in [request.get('url', ''), str(request.get('headers', '')), str(request.get('postData', ''))]:
            if not isElementSubstringOfX(newsHosts, domain) and not isElementSubstringOfX(shopsHosts, domain) and not isElementSubstringOfX(other, domain):
                contains_md5(host, text, domain, "REQUEST", device_type, website_type)
                contains_base64(host, text, domain, "REQUEST", device_type, website_type)
                contains_sha256(host ,text, domain, "REQUEST", device_type, website_type)
                contains_mail(host ,text, domain, "REQUEST", device_type, website_type)
                contains_sha1(host, text, domain, "REQUEST", device_type, website_type)
                contains_sha224(host,text, domain, "REQUEST", device_type, website_type)
                contains_sha3_512(host, text, domain, "REQUEST", device_type, website_type)

        for text in [str(response.get('content', {}).get('text', ''))]:
            if not isElementSubstringOfX(newsHosts, domain) and not isElementSubstringOfX(shopsHosts, domain) and not isElementSubstringOfX(other, domain):
                contains_md5(host,text, domain, "RESPONSE", device_type, website_type)
                contains_base64(host,text, domain, "RESPONSE", device_type, website_type)
                contains_sha256(host,text, domain, "RESPONSE", device_type, website_type)
                contains_mail(host,text, domain, "RESPONSE", device_type, website_type)
                contains_sha1(host, text, domain, "RESPONSE", device_type, website_type)
                contains_sha224(host, text, domain, "RESPONSE", device_type, website_type)
                contains_sha3_512(host, text, domain, "RESPONSE", device_type, website_type)


newsTypes = ["news/desktop/www.", "news/mobile/m."]
shopsTypes = ["shops/desktop/www.", "shops/mobile/m."]

def execute():
    for host in newsHosts:
        for type in newsTypes:
            path = "../archives/" + type + host + ".har"

            # Read the content of the HAR file and convert it to a dictionary
            with open(path, "r", encoding="utf-8") as har_file:
                harData = json.load(har_file)

            parser = HarParser(harData)
            entries = parser.har_data['entries']

            findMail(entries, host, re.search(r'/(desktop|mobile)/', path).group(1), re.search(r'/(shops|news)/', path).group(1))

    for host in shopsHosts:
        for type in shopsTypes:
            path = "../archives/" + type + host + ".har"

            # Read the content of the HAR file and convert it to a dictionary
            with open(path, "r", encoding="utf-8") as har_file:
                harData = json.load(har_file)

            parser = HarParser(harData)
            entries = parser.har_data['entries']

            findMail(entries, host, re.search(r'/(desktop|mobile)/', path).group(1), re.search(r'/(shops|news)/', path).group(1))
    
    dbConnect.commit()

execute()
dbCursor.close()

### Cookies Database

In [4]:
hosts = ["zeit.de", "cnn.com", "faz.net", "merkur.de", "n-tv.de", "sueddeutsche.de", "telegraph.co.uk", "theguardian.com", "thehindu.com", "amazon.de", "depot-online.de", "douglas.de", "hm.com", "nike.com", "samsung.com", "saturn.de", "sephora.de", "uniqlo.com", "zalando.de", "id5-sync.com", "shop.samsung.com"]


def extractDomains(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        harData = json.load(file)
        entries = harData.get('log', {}).get('entries', [])
        domains = []

        for entry in entries:
            response = entry.get('response', {})
            responseHeaders = response.get('headers', [])

            for header in responseHeaders:
                if header['name'].lower() == 'set-cookie':
                    cookies = header['value']

                    match = re.search(r'Domain=([^;]+)', cookies)
                    domain = match.group(1) if match else None
                    if domain:
                        if domain.startswith('.'):
                            domain = domain[1:]
                        match = re.search(r'(m|www)\.(.*)(\.har)',file_path)
                        host = match.group(2) if match else None
                        domains.append((domain, host, re.search(r'/(desktop|mobile)/', file_path).group(1), re.search(r'/(shops|news)/', file_path).group(1)))
        return domains

def processFolder(folder_path):
    domains = []

    for filename in os.listdir(folder_path):
        if filename.endswith('.har'):
            path = os.path.join(folder_path, filename)
            domains += extractDomains(path)

    return domains

paths = ['../archives/news/desktop/', '../archives/news/mobile/', '../archives/shops/desktop/', '../archives/shops/mobile/']

with sqlite3.connect('third_party_analysis.db') as dbConn:
    dbCursor = dbConn.cursor()

    dbCursor.execute('''
        CREATE TABLE IF NOT EXISTS cookie_counts (
            domain TEXT,
            host TEXT,
            device_type TEXT,
            website_type TEXT,
            id INTEGER PRIMARY KEY
        )
    ''')

    for folder_path in paths:
        domains = processFolder(folder_path)
        for domain, host, deviceType, websiteType in domains:
            if domain not in hosts:
                dbCursor.execute('INSERT INTO cookie_counts (domain, host, device_type, website_type) VALUES (?, ?, ?, ?)', (domain, host, deviceType, websiteType))