<a href="https://colab.research.google.com/github/unawuyou-create/Dissertation/blob/main/1.data_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
!pip install pyquery
import requests
import json, ssl, time, math, warnings, os, re
from urllib.parse import unquote, urljoin
from pyquery import PyQuery as pq
import pandas as pd

# Disable SSL verification
ssl._create_default_https_context = ssl._create_unverified_context
warnings.filterwarnings("ignore")



In [14]:
def save_2_excel(df, file_path, check_keys=[], sheet_name='Sheet1', str_col=[]):
    # Save to Excel
    """
    Append data to the specified file.
    If the file already exists, new data will be added after the existing content.
    """
    if type(df) == dict:
        df = pd.DataFrame(df, index=[0])
    if not os.path.exists(os.path.join(os.getcwd(), get_file_path(file_path))):
        os.makedirs(os.path.join(os.getcwd(), get_file_path(file_path)))
    if not os.path.exists(file_path):
        for col in str_col:
            df[col] = df[col].astype(str)
        # Remove duplicates based on the specified primary key (for example, to prevent duplicate comments from being written)
        if len(check_keys) > 0:
            df.drop_duplicates(check_keys,
                               keep='first',
                               inplace=True)
        df.to_excel(file_path, index=False, sheet_name=sheet_name)
    else:
        # Read existing data, append new data and then save
        row_df = pd.read_excel(file_path)
        for col in str_col:
            row_df[col] = row_df[col].astype(str)
        for col in str_col:
            df[col] = df[col].astype(str)
        has_row_num = row_df.shape[0]
        row_num = has_row_num + df.shape[0]
        print(f'Existing rows={has_row_num}, After adding={row_num}, File path:{file_path}')

        final_df = pd.concat([row_df, df], ignore_index=True)
        if len(check_keys) > 0:
            final_df.drop_duplicates(check_keys,
                                     keep='first',
                                     inplace=True)
        final_df.to_excel(file_path, index=False, sheet_name=sheet_name)

In [15]:
def get_file_path(full_path):
    # Get directory path from full file path
    """
    Get the directory from a given file path.
    :param full_path:
    :return:
    """
    return full_path[0:full_path.rfind(os.path.sep) + 1]

In [16]:
def djson(obj, key_str, defaultVal=None):
    """
    A safe JSON parser that supports accessing nested keys with dot notation and index.
    """
    if key_str is None or key_str == '':
        return obj
    if type(obj) not in [list, dict, str]:
        return defaultVal

    key_list = key_str.split('.')
    if type(obj) == str and len(key_list) > 0:
        try:
            obj = json.loads(obj)
        except:
            return defaultVal
    if len(key_list) == 0:
        return defaultVal
    if len(key_list) == 1:
        # Parse [] at the end of key
        m = re.findall(r'\[(\d+)\]', key_str)
        if len(m) == 1:
            key_l = key_str[:key_str.index('[')]
            index = int(m[0])
            if key_l == '':
                return obj[index]
            if key_l not in obj.keys():
                return defaultVal
            try:
                return obj[key_l][index]
            except:
                return defaultVal
        if len(m) > 1:
            key_l = key_str[:key_str.index('[')]
            try:
                if key_l != '':
                    obj = obj.get(key_l)
                    if obj is None:
                        return defaultVal
                for i, a in enumerate(m):
                    obj = obj[int(a)]
                return obj
            except:
                return defaultVal

        if key_str not in obj.keys():
            return defaultVal
        if obj.get(key_str) is not None:
            return obj.get(key_str)
        else:
            return defaultVal
    key = key_list[0]

    if re.match(r'^\d+$', key):
        try:
            return djson(obj[key], '.'.join(key_list[1:]), defaultVal)
        except:
            return defaultVal

    m = re.findall(r'\[(\d+)\]', key)
    if len(m) > 0:
        # Parse [] in the middle of key
        key_l = key[:key.index('[')]
        if len(m) == 1:
            index = int(m[0])
            if key_l == '':
                return djson(obj[index], '.'.join(key_list[1:]))
            if obj.get(key_l) is None:
                return defaultVal
            try:
                return djson(obj.get(key_l)[index], '.'.join(key_list[1:]), defaultVal)
            except:
                return defaultVal
        else:
            try:
                if key_l != '':
                    obj = obj.get(key_l)
                    if obj is None:
                        return defaultVal
                for a in m:
                    obj = obj[int(a)]
                return djson(obj, '.'.join(key_list[1:]), defaultVal)
            except:
                return defaultVal
    else:
        try:
            return djson(obj.get(key), '.'.join(key_list[1:]), defaultVal)
        except:
            return defaultVal




In [17]:
class DzSpider(object):
    def __init__(self):
        # Temporary cache
        self.data = {}
        # Data storage directory
        self.folder = fr'{os.getcwd()}'
        # Start page number
        self.start_page = 1
        # End page number
        self.end_page = 10
        # Counter for total scraped reviews
        self.spider_num = 1
        # Reviews per page
        self.page_size = 20
        # Whether crawling is finished
        self.has_finish = False
        # Whether to auto-detect the last page
        self.reset_end_page = True
        # Request headers
        self.headers_str = '''
        accept: */*
		accept-language: en-US,en;q=0.9
		cache-control: no-cache
		pragma: no-cache
		priority: u=1, i
		referer: https://www.trustpilot.com/review/www.comparethemarket.com?page=2
		sec-ch-ua: "Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"
		sec-ch-ua-mobile: ?0
		sec-ch-ua-platform: "Windows"
		sec-fetch-dest: empty
		sec-fetch-mode: cors
		sec-fetch-site: same-origin
		user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36
		x-nextjs-data: 1
        '''
        self.headers = dict(
            [[y.strip() for y in x.strip().split(':', 1)] for x in self.headers_str.strip().split('\n') if x.strip()])

    def run_task(self):
        # Iterate through all pages and scrape reviews
        page_index = self.start_page
        self.end_page = 1000
        self.reset_end_page = True
        self.has_finish = False
        while page_index < self.end_page + 1 and not self.has_finish:
            self.get_one_page(page_index)
            page_index += 1
            time.sleep(3)

    def get_one_page(self, page_index):
        req_url = f'https://www.trustpilot.com/_next/data/businessunitprofile-consumersite-2.4713.0/review/www.comparethemarket.com.json'
        params = {
            "businessUnit": "www.comparethemarket.com"
        }
        if page_index > 1:
            params['page'] = page_index
        res = requests.get(req_url, headers=self.headers, params=params, verify=False).json()
        if self.reset_end_page:
            self.end_page = math.ceil(djson(res, 'pageProps.filters.pagination.totalCount') / self.page_size)
            self.reset_end_page = False
        df_page = pd.DataFrame()
        for item in djson(res, 'pageProps.reviews', []):
            title = item.get('title')
            text = item.get('text')
            rating = item.get('tating')
            publishedDate = djson(item, 'dates.publishedDate')[:10]
            one_data = {
                'User': djson(item, 'consumer.displayName'),
                'Title': title,
                'Rating': rating,
                'Review': text,
                'Date': publishedDate,
            }
            print(f'========================================\n'
                  f'Page {page_index}/{self.end_page}, Total #{self.spider_num}\n'
                  f'{one_data}\n'
                  f'========================================\n')
            df_one = pd.DataFrame(one_data, index=[0])
            df_page = pd.concat([df_page, df_one])
            self.spider_num += 1

        save_2_excel(df_page, fr'{self.folder}\ComparetheMarket_reviews.xlsx')

In [18]:
# Run the crawler and start the data collection
if __name__ == '__main__':
    spider = DzSpider()
    spider.run_task()

Page 1/4819, Total #1
{'User': 'Martyn Schofield', 'Title': 'Quick & easy transaction with a good…', 'Rating': None, 'Review': 'Quick & easy transaction with a good price compared to my previous insurer.', 'Date': '2025-08-31'}

Page 1/4819, Total #2
{'User': 'Swifty', 'Title': 'Makes looking for insurance so much…', 'Rating': None, 'Review': 'Makes looking for insurance so much easier ', 'Date': '2025-08-31'}

Page 1/4819, Total #3
{'User': 'Joanne Colligan', 'Title': 'Website was so easy to navigate', 'Rating': None, 'Review': 'Website was so easy to navigate. Completed my holiday insurance in no time ', 'Date': '2025-08-31'}

Page 1/4819, Total #4
{'User': 'Ibrahim', 'Title': 'hello was great experience with you…', 'Rating': None, 'Review': 'hello was great experience with you guys .. thank you for everything ', 'Date': '2025-08-31'}

Page 1/4819, Total #5
{'User': 'Lucy', 'Title': 'So easy to use and find a good deal', 'Rating': None, 'Review': 'So easy to use and find a good deal'

KeyboardInterrupt: 