## 1.  Add your url and run the code to add it to the config.json for it to search.

In [9]:
url = "https://www.carousell.sg/search/apple%20pencil%20gen%201?addRecent=false&canChangeKeyword=false&includeSuggestions=false&sc=0a0208301a0408bbe17222160a126170706c652070656e63696c2067656e203178012a140a0b636f6c6c656374696f6e7312030a013078013204080078013a02180942060801100118004a08200128014001480150005a020801&searchId=ClqI8f&searchType=all&tab=marketplace"

import urllib.parse
import json

def parse_carousell_url(url):
    parsed_url = urllib.parse.urlparse(url)
    query_params = urllib.parse.parse_qs(parsed_url.query)
    
    # Extract category from path
    path_parts = parsed_url.path.strip('/').split('/')
    
    if path_parts[0] == 'categories':
        category = path_parts[1] if len(path_parts) > 1 else None
        search_query = query_params.get('search', [None])[0]
    else:
        category = None
        search_query = path_parts[1] if len(path_parts) > 1 else None
    
    sort_by = query_params.get('sort_by', ['3'])[0]  # Default to 3 if not present
    
    # Extract price range
    price_start = query_params.get('price_start', [None])[0]
    price_end = query_params.get('price_end', [None])[0]

    tab = query_params.get('tab', [None])[0]
   
    return {
        "category": category,
        "query": search_query,
        "sort_by": int(sort_by),
        "price_start": int(price_start) if price_start else None,
        "price_end": int(price_end) if price_end else None,
        "tab": tab
    }

def add_search_item_from_url(config_file, url):
    search_item = parse_carousell_url(url)
    
    with open(config_file, 'r+') as f:
        config = json.load(f)
        if 'SEARCH_ITEMS' not in config:
            config['SEARCH_ITEMS'] = []
        config['SEARCH_ITEMS'].append(search_item)
        f.seek(0)
        json.dump(config, f, indent=2)
        f.truncate()

    print(f"Added new search item: {search_item}")

# Example usage
add_search_item_from_url('config.json', url)

Added new search item: {'category': None, 'query': 'apple%20pencil%20gen%201', 'sort_by': 3, 'price_start': None, 'price_end': None, 'tab': 'marketplace'}


In [1]:
import time
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
import telebot
import openpyxl
from openpyxl import Workbook, load_workbook
import os
import traceback
import urllib.parse
import time
import signal
import random
import string

# Global flag to control the main loop
running = True

def signal_handler(signum, frame):
    global running
    print("\nReceived signal to stop. Finishing current iteration and then stopping...")
    running = False

# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)

step_counter = 0

def log(message):
    global step_counter
    step_counter += 1
    print(f"[Step {step_counter}: {message}")

# Load configuration
def load_config():
    log("Loading configuration...")
    try:
        with open('config.json', 'r') as f:
            config = json.load(f)
            log("Configuration loaded successfully.")
            return config
    except FileNotFoundError:
        log("Configuration file not found. Please create a config.json file.")
        sys.exit(1)
    except json.JSONDecodeError:
        log("Error decoding JSON from config file. Please check the format.")
        sys.exit(1)

config = load_config()
MAX_LISTINGS = config.get('MAX_LISTINGS_TO_SCRAPE', 48)  # Default to 48 if not specified

# Initialize the Telegram bot
log("Initializing Telegram bot...")
try:
    bot = telebot.TeleBot(config['TELEGRAM_BOT_TOKEN'])
    log("Telegram bot initialized successfully.")
except Exception as e:
    log(f"Error initializing Telegram bot: {str(e)}")
    bot = None

def send_telegram_message(message):
    if bot:
        try:
            log(f"Sending Telegram message: {message}")
            bot.send_message(config['TELEGRAM_CHAT_ID'], message)
            log("Telegram message sent successfully.")
        except Exception as e:
            log(f"Error sending Telegram message: {str(e)}")
    else:
        log("Telegram bot not initialized. Message not sent.")

def load_existing_ids(excel_path):
    log(f"Loading existing IDs from Excel file: {excel_path}")
    try:
        if os.path.exists(excel_path):
            log("Excel file found. Loading existing IDs...")
            workbook = load_workbook(excel_path)
            sheet = workbook.active
            existing_ids = {sheet.cell(row=row, column=1).value for row in range(2, sheet.max_row + 1)}
            log(f"Loaded {len(existing_ids)} existing IDs.")
        else:
            log("Excel file not found. Creating a new workbook...")
            workbook = Workbook()
            sheet = workbook.active
            sheet.append(["Listing ID", "Href", "Seller", "Time Posted", "Product Details", "Price", "Condition", "Image URL"])
            existing_ids = set()
            workbook.save(excel_path)
            log("New Excel file created and saved.")
        return existing_ids, workbook, sheet
    except Exception as e:
        log(f"Error handling Excel file: {str(e)}")
        return set(), Workbook(), Workbook().active

def save_to_excel(data, excel_path, sheet):
    try:
        log(f"Saving {len(data)} new listings to Excel...")
        for listing in data:
            sheet.append(listing)
        sheet.parent.save(excel_path)
        log("Excel file updated and saved successfully.")
    except PermissionError:
        log(f"Error: Permission denied when trying to save to {excel_path}")
        log("Please ensure the file is not open in another program and that you have write permissions.")
        alternative_path = os.path.join(os.path.expanduser("~"), "Desktop", "carousell_listings.xlsx")
        log(f"Attempting to save to alternative location: {alternative_path}")
        try:
            sheet.parent.save(alternative_path)
            log(f"Successfully saved to alternative location: {alternative_path}")
        except Exception as e:
            log(f"Error saving to alternative location: {str(e)}")
    except Exception as e:
        log(f"Error saving to Excel: {str(e)}")
        log(f"Current working directory: {os.getcwd()}")
        log(f"Full path of excel file: {os.path.abspath(excel_path)}")
        log(f"File exists: {os.path.exists(excel_path)}")
        if os.path.exists(excel_path):
            log(f"File is writable: {os.access(excel_path, os.W_OK)}")
            log(f"Directory is writable: {os.access(os.path.dirname(excel_path), os.W_OK)}")

def build_url(search_item):
    base_url = config['BASE_URL']
    
    params = {
        "search": search_item['query'],
        "price_start": search_item['price_start'],
        "price_end": search_item['price_end'],
        "sort_by": search_item['sort_by'],
    }
    
    if 'tab' in search_item and search_item['tab'] != 'all':
        params['tab'] = search_item['tab']
    
    category = search_item['category']
    url = f"{base_url}categories/{category}/"
    
    # Encode the query parameters
    encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
    
    url += f"?{encoded_params}"
    
    return url

def find_element_with_fallback(driver, selectors):
    for selector in selectors:
        try:
            element = driver.find_element(*selector)
            if element:
                return element
        except NoSuchElementException:
            continue
    return None

def extract_text_content(element):
    return element.text.strip() if element else 'Not found'

def find_element_by_text(driver, text):
    try:
        return driver.find_element(By.XPATH, f"//p[contains(text(), '{text}')]")
    except NoSuchElementException:
        return None

def analyze_listing_card(card):
    title_selectors = [
        (By.XPATH, ".//p[contains(@class, 'D_lQ')]"),
        (By.XPATH, ".//p[not(@data-testid) and not(contains(@class, 'D_mc')) and not(contains(@class, 'D_pc'))]")
    ]
    price_selectors = [
        (By.XPATH, ".//p[contains(@class, 'D_mc')]"),
        (By.XPATH, ".//p[contains(text(), 'S$')]")
    ]
    time_selectors = [
        (By.XPATH, ".//p[contains(@class, 'D_pc')]"),
        (By.XPATH, ".//p[contains(text(), 'ago')]")
    ]

    title = find_element_with_fallback(card, title_selectors)
    price = find_element_with_fallback(card, price_selectors)
    seller_name = card.find_element(By.XPATH, ".//p[@data-testid='listing-card-text-seller-name']")
    time = find_element_with_fallback(card, time_selectors)

    condition_types = ['Brand new', 'Like new', 'Lightly used', 'Well used', 'Heavily used']
    condition = next((find_element_by_text(card, type) for type in condition_types if find_element_by_text(card, type)), None)

    listing_id = card.get_attribute('data-testid').replace('listing-card-', '')
    image = card.find_element(By.XPATH, ".//img[contains(@class, 'D_SC')]")

    return {
        'id': listing_id,
        'title': extract_text_content(title),
        'price': extract_text_content(price),
        'seller_name': extract_text_content(seller_name),
        'time': extract_text_content(time),
        'condition': extract_text_content(condition),
        'image_url': image.get_attribute('src') if image else 'Not found',
        'href': f"https://www.carousell.sg/p/{listing_id}"
    }

def check_carousell_listings():
    log("Starting to check Carousell listings...")
    chrome_options = Options()
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")

    try:
        log("Initializing Chrome driver...")
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
        
        excel_path = "carousell_listings.xlsx"
        log(f"Loading existing IDs from Excel: {excel_path}")
        existing_ids, workbook, sheet = load_existing_ids(excel_path)
        new_listings = []

        for search_item in config['SEARCH_ITEMS']:
            url = build_url(search_item)
            log(f"Navigating to URL: {url}")
            driver.get(url)

            log(f"Waiting for page to load (timeout: 45 seconds)...")
            
            try:
                WebDriverWait(driver, 45).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )
                log("Initial page content loaded.")

                WebDriverWait(driver, 45).until(
                    EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'browse-listings')]"))
                )
                log("Listing cards container found.")

            except TimeoutException:
                log("Timeout occurred while waiting for the page to load.")
                log("Current page source:")
                log(driver.page_source)
                continue

            listing_cards = driver.find_elements(By.XPATH, "//div[starts-with(@data-testid, 'listing-card-')]")
            log(f"Found {len(listing_cards)} listing cards.")

            if not listing_cards:
                log("No listing cards found. Possible page structure change.")
                log("Current page source:")
                log(driver.page_source)
                continue

            for index, card in enumerate(listing_cards[:MAX_LISTINGS]):
                log(f"Processing listing {index + 1}...")
                try:
                    listing_data = analyze_listing_card(card)
                    log(f"Extracted details - Title: {listing_data['title']}, Price: {listing_data['price']}, Condition: {listing_data['condition']}")

                    if listing_data['id'] in existing_ids:
                        log("Listing already exists in database. Skipping.")
                        continue

                    price_value = float(listing_data['price'].replace("S$", "").replace(",", ""))

                    if (search_item['query'].lower() in listing_data['title'].lower() and 
                        search_item['price_start'] <= price_value <= search_item['price_end']):
                        log("Listing matches criteria. Adding to new listings.")
                        new_listings.append([
                            listing_data['id'], listing_data['href'], listing_data['seller_name'],
                            listing_data['time'], listing_data['title'], listing_data['price'],
                            listing_data['condition'], listing_data['image_url']
                        ])
                        message = f"New suitable listing found!\nTitle: {listing_data['title']}\nPrice: {listing_data['price']}\nCondition: {listing_data['condition']}\nSeller: {listing_data['seller_name']}\nPosted: {listing_data['time']}\nLink: {listing_data['href']}"
                        send_telegram_message(message)
                    else:
                        log("Listing does not match criteria. Skipping.")

                except Exception as e:
                    log(f"Error processing listing: {str(e)}")
                    log(f"Listing HTML: {card.get_attribute('outerHTML')}")

        if new_listings:
            log(f"Saving {len(new_listings)} new listings to Excel...")
            save_to_excel(new_listings, excel_path, sheet)
        else:
            log("No new listings found.")

    except Exception as e:
        log(f"Unexpected error: {str(e)}")
        log(traceback.format_exc())
        return False
    finally:
        driver.quit()
        log("Chrome driver closed.")
    return True

def main():
    global running
    log("Starting main loop. Press Ctrl+C to stop safely.")
    while running:
        try:
            log("Checking Carousell listings...")
            success = check_carousell_listings()
            if success:
                log("Successfully checked listings.")
            else:
                log("Failed to check listings.")
            
            if running:
                log("Waiting for 1 hour before next check...")
                for _ in range(3600):  # 3600 seconds = 1 hour
                    if not running:
                        break
                    time.sleep(1)
        except Exception as e:
            log(f"Unexpected error in main loop: {str(e)}")
            log("Continuing to next iteration...")

    log("Script stopped gracefully.")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        log(f"Critical error: {str(e)}")
    finally:
        log("Script execution completed.")

[Step 1: Loading configuration...
[Step 2: Configuration loaded successfully.
[Step 3: Initializing Telegram bot...
[Step 4: Telegram bot initialized successfully.
[Step 5: Starting main loop. Press Ctrl+C to stop safely.
[Step 6: Checking Carousell listings (Attempt 1)...
[Step 7: Starting to check Carousell listings...
[Step 8: Initializing Chrome driver...
[Step 9: Loading existing IDs from Excel: carousell_listings.xlsx
[Step 10: Loading existing IDs from Excel file: carousell_listings.xlsx
[Step 11: Excel file found. Loading existing IDs...
[Step 12: Loaded 181 existing IDs.
[Step 13: Navigating to URL: https://www.carousell.sg/categories/5704/?search=apple%20pencil%20gen%201&price_start=30&price_end=145&sort_by=3&tab=None
[Step 14: Waiting for page to load (timeout: 45 seconds)...
[Step 15: Initial page content loaded.
[Step 16: Listing cards container found.
[Step 17: Found 47 listing cards.
[Step 18: Processing listing 1...
[Step 19: Extracted details - Title: Apple Pencil Gen 

In [17]:
import json
import urllib.parse

def load_config(config_file='config.json'):
    with open(config_file, 'r') as f:
        return json.load(f)

def build_url(search_item, base_url):
    params = {
        "search": search_item['query'],
        "price_start": search_item['price_start'],
        "price_end": search_item['price_end'],
        "sort_by": search_item['sort_by'],
    }
    
    if 'tab' in search_item and search_item['tab'] != 'all':
        params['tab'] = search_item['tab']
    
    category = search_item['category']
    url = f"{base_url}categories/{category}/"
    
    encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
    
    url += f"?{encoded_params}"
    
    return url

def test_url_building():
    config = load_config()
    base_url = config['BASE_URL']
    
    for index, search_item in enumerate(config['SEARCH_ITEMS'], 1):
        url = build_url(search_item, base_url)
        print(f"URL {index}: {url}")
        print(f"Search Item {index}:", json.dumps(search_item, indent=2))
        print()

if __name__ == "__main__":
    test_url_building()

URL 1: https://www.carousell.sg/categories/5704/?search=apple%20pencil%20gen%201&price_start=30&price_end=145&sort_by=3&tab=None
Search Item 1: {
  "category": "5704",
  "query": "apple pencil gen 1",
  "sort_by": 3,
  "price_start": 30,
  "price_end": 145,
  "tab": null
}

