
# Import and install required libraries

In [1]:
# Import the required libraries
import sys
import requests
import sys
import os
import pandas as pd
import numpy as np
import time
import logging
import glob
import csv

# Set up the environment by executing the setup script
# Note: This script installs necessary dependencies for Chrome WebDriver
!sudo apt -y update
!sudo apt install -y wget curl unzip
!wget http://archive.ubuntu.com/ubuntu/pool/main/libu/libu2f-host/libu2f-udev_1.1.4-1_all.deb
!dpkg -i libu2f-udev_1.1.4-1_all.deb
!wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
!dpkg -i google-chrome-stable_current_amd64.deb
CHROME_DRIVER_VERSION = !curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE
!wget -N https://chromedriver.storage.googleapis.com/$CHROME_DRIVER_VERSION/chromedriver_linux64.zip -P /tmp/
!unzip -o /tmp/chromedriver_linux64.zip -d /tmp/
!chmod +x /tmp/chromedriver
!mv /tmp/chromedriver /usr/local/bin/chromedriver
!pip install selenium
!pip install python-dotenv

# Additional libraries for web scraping setup
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, ElementNotInteractableException, NoSuchElementException, WebDriverException
from selenium.webdriver import ActionChains
from datetime import datetime
from google.colab import files
from google.colab import drive
from dotenv import load_dotenv

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
[33m0% [Waiting for headers] [1 InRelease 14.2 kB/129 kB 11%] [Connected to cloud.r[0m                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [1 InRelease 129 kB/129 kB 100%] [Connected to cloud.r-project.org (65.9.86.[0m                                                                               Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https

# Set up Chrome Options

---



In [2]:
# Set up Chrome options
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')  # Run in headless mode
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36")
chrome_options.add_argument("accept='text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'")
chrome_options.add_argument("accept-encoding='gzip, deflate, br'")
chrome_options.add_argument("accept-language='en-US,en;q=0.5'")

# Initialize WebDriver instance
driver = webdriver.Chrome(options=chrome_options)

# Define utility functions
1.   *take_screenshot*: Takes a screen shot of the web page in case of any errors
2.   *handle_cookie_consent*: Handles cookie consent banners when opening up a product page
3.    *detect_delimiter*: Used to identify the correct delimiter for csv files
4.    *setup_google_drive*: Creates a save path for file storage in Google drive



In [5]:
"""
Define take_screenshot function.

Main Functionality: Takes a screenshot of the current browser window.
"""
def take_screenshot(driver, product_link, debug=False):
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    screenshot_name = f"screenshot_{product_link}_{timestamp}.png"
    screenshot_path = os.path.join(os.getcwd(), screenshot_name)
    driver.save_screenshot(screenshot_path)

    # Debug print statement to confirm where the screenshot is saved
    if debug:
      print(f"Screenshot saved: {screenshot_path}")

"""
Define handle_cookie_consent function.

Main Functionality: Handles cookie consent banners that may appear on web pages.
"""
def handle_cookie_consent(driver, debug=False):
    try:
        reject_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, "onetrust-reject-all-handler"))
        )

        driver.execute_script("arguments[0].click();", reject_button)

        # Debug print statement to confirm cookie consent has been rejected
        if debug:
          print("Cookie consent rejected.")

        # Ensure the cookie consent banner is no longer visible
        WebDriverWait(driver, 10).until(
            EC.invisibility_of_element_located((By.ID, "onetrust-reject-all-handler"))
        )

        # Debug print statement to confirm the cookie consent banner is invisible after being rejected
        if debug:
          print("Cookie consent banner is now invisible.")

    except TimeoutException:
      # Debug print statement to indicate no cookie consent banner was found
      if debug:
          print("No cookie consent banner found.")
      else:
        pass

    except WebDriverException as e:
      # Debug print statement to log WebDriverException details
      if debug:
        print(f"WebDriverException while handling cookie consent: {e}")
      else:
        pass

"""
Define detect_delimiter function.

Main Functionality: Detect the delimiter of a CSV file. Detects the delimiter of a CSV file by analyzing the first few lines.
                    If detection fails, falls back to a list of common delimiters and selects the best one.
"""
def detect_delimiter(file_path, default_delimiters=[',', ';', '\t', '|']):

    with open(file_path, 'r', encoding='utf-8') as file:
        sample = file.read(2048)
        sniffer = csv.Sniffer()

        # Try to detect delimiter
        try:
            dialect = sniffer.sniff(sample)
            delimiter = dialect.delimiter
        except csv.Error:
            # If the sniffer fails, try the default delimiters
            delimiter_counts = {delim: sample.count(delim) for delim in default_delimiters}
            # Select the delimiter with the highest count
            delimiter = max(delimiter_counts, key=delimiter_counts.get)

    return delimiter

"""
Define setup_google_drive function.

Main Functionality: Mounts Google Drive and ensures the specified save path exists for file storage and retrieval.
"""
# Function to mount Google Drive and ensure save path exists
def setup_google_drive(save_path='/content/drive/My Drive/scraped_files/', debug=False):
    # Attempt to mount Google Drive
    try:
        drive.mount('/content/drive', force_remount=True)
    except Exception as e:
        if debug:
          print(f"Error mounting Google Drive: {e}")
        raise

    # Ensure the directory exists
    os.makedirs(save_path, exist_ok=True)
    if debug:
      print(f"Save path {save_path} is ready.")

# Call the function to setup Google Drive
setup_google_drive()

Mounted at /content/drive


# Define main functions for scraping product data
Functions responsible for the main scraping process of product data suc as product name, price, manufacturer etc:
1.   *get_category_links*: Retrieves main category links from the specified page.
2.   *scrape_visible_products*: Retrieves currently visible product containers on the page.
3.   *scrape_product_data_from_containers*: Extracts product details from product containers.
4.   *scrape_all_categories*: Main function that orchestrates scraping of product data from multiple categories.
5.   *scrape_category_products*: Opens a category page, retrieves total number of products, and scrapes product data until all products are retrieved.








In [9]:
"""
Define get_category_links function.

Main Functionality: Retrieves main category links from a given main page URL.
"""
def get_category_links(driver, main_page_url, max_retries=3, debug=False):
    retry_count = 0

    while retry_count < max_retries:
        try:
            # Navigate to the main page and retrieve category links
            driver.get(main_page_url)
            WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'menu[data-testid="assortment-links"]')))
            assortment_menu = driver.find_element(By.CSS_SELECTOR, 'menu[data-testid="assortment-links"]')
            category_links = assortment_menu.find_elements(By.TAG_NAME, 'a')

            # Get the main category link and navigate to it
            main_category_links = [link.get_attribute('href') for link in category_links if not link.get_attribute('href').endswith('-')]

            return main_category_links

        except TimeoutException:
            retry_count += 1

            # Debug print statement to log if code failed to retrieve a specific category link
            if debug:
              print(f"Timeout: Failed to retrieve category links. Retrying {retry_count}/{max_retries}...")

            # Add a delay between retries
            time.sleep(5)

        except NoSuchElementException:
            retry_count += 1

            # Debug print statement to log if a specific category link could not be found
            if debug:
              print(f"Element not found: Assortment links menu. Retrying {retry_count}/{max_retries}...")

            # Add a delay between retries
            time.sleep(5)

        except WebDriverException as e:
            retry_count += 1

            # Debug print statement to log WebDriverException when trying to retrieve category link
            if debug:
              print(f"WebDriverException during category link retrieval: {e}. Retrying {retry_count}/{max_retries}...")

            # Add a delay between retries
            time.sleep(5)

    # Debug print statement if code failed to retrieve category link after retries
    if debug:
      print("Failed to retrieve category links after multiple attempts.")
    return []

"""
Define scrape_visible_products function.

Main Functionality: Retrieves currently visible product containers on a page.
"""
def scrape_visible_products(driver, debug=False):
    try:
        # Retrieve product containers that are currently visible
        product_containers = driver.find_elements(By.CSS_SELECTOR, 'div[data-testid="product-container"]')

        return product_containers

    except Exception as e:
      if debug:
        # Print debug error statement
        print(f"Error while scraping visible products: {e}")

      return []

"""
Define scrape_product_data_from_containers function.

Main Functionality: Extracts product details from containers (e.g., product name, manufacturer, volume, prices).
"""
def scrape_product_data_from_containers(product_containers, category_link, viewed_product_links, debug=False):
    product_data = []

    for container in product_containers:
        try:
            # Get product link element
            product_link_element = container.find_element(By.CSS_SELECTOR, 'p[data-testid="product-title"] a')
            product_link = product_link_element.get_attribute("href")

            if debug:
              # Print the current product link being processed
              print("product_link:", product_link)

            # Check if product link has been seen before
            if product_link in viewed_product_links:
                continue  # Skip this container if link has been seen

            # Add product link to seen links set
            viewed_product_links.add(product_link)

            # Get product name element
            product_name_element = container.find_element(By.CSS_SELECTOR, 'p[data-testid="product-title"] a')
            product_name = product_name_element.get_attribute("title")

            # Try to get manufacturer, set NaN if not found
            try:
                manufacturer_element = container.find_element(By.CSS_SELECTOR, 'span[data-testid="display-manufacturer"]')
                manufacturer_name = manufacturer_element.text.replace(",", "")
            except NoSuchElementException:
                manufacturer_name = np.nan  # Set to NaN if manufacturer is not found

            # Try to get volume, set NaN if not found
            try:
                volume_element = container.find_element(By.CSS_SELECTOR, 'span[data-testid="display-volume"]')
                volume = volume_element.text
            except NoSuchElementException:
                volume = np.nan  # Set to NaN if volume is not found

            # Try to get promotional compare price, then regular compare price, else NaN
            try:
                compare_price_element = container.find_element(By.CSS_SELECTOR, 'p[data-testid="promotion-compare-price"]')
                compare_price = compare_price_element.text.replace("Jmf pris ", "")
            except NoSuchElementException:
                try:
                    compare_price_element = container.find_element(By.CSS_SELECTOR, 'p[data-testid="compare-price"]')
                    compare_price = compare_price_element.text.replace("Jmf pris ", "")
                except NoSuchElementException:
                    compare_price = np.nan  # Set to NaN if compare price is not found

            # Try to retrieve the price from the current container
            try:
                price_element = container.find_element(By.CSS_SELECTOR, "p[data-testid='price-text'] span[data-testid='price-container']")
                price = price_element.text
            except NoSuchElementException:
                price = np.nan  # Set to NaN if price is not found

            # Save basic product information to the list
            product_data.append({
                "CategoryLink": category_link,
                "ProductLink": product_link,
                "ProductName": product_name,
                "ManufacturerName": manufacturer_name,
                "Volume": volume,
                "ComparePrice": compare_price,
                "Price": price,
                "Ingredients": ""  # Empty for now, will fill later
            })

        except StaleElementReferenceException:
          if debug:
            # Debug print statement to log stale element exception when processing a product container
            print("Stale element reference exception caught while extracting product data. Skipping this container.")

          continue

    return product_data

"""
Define scrape_all_categories function.

Main Functionality: Main function that orchestrates scraping of product data (name, manufacturer, volume, prices, etc.) from multiple categories.
"""
def scrape_all_categories(save_path='/content/drive/My Drive/scraped_files/', debug=False):
    # Initialize a WebDriver instance with Chrome options
    driver = webdriver.Chrome(options=chrome_options)

    # Initialize an empty list to store all scraped product data
    all_product_data = []

    # Define a list of excluded categories to skip during scraping
    excluded_categories = [
        "https://www.hemkop.se/sortiment/hem-och-hushall",
        "https://www.hemkop.se/sortiment/blommor-och-tillbehor",
        "https://www.hemkop.se/sortiment/halsa-och-skonhet",
        "https://www.hemkop.se/sortiment/apotek-och-lakemedel",
        "https://www.hemkop.se/sortiment/djur",
        "https://www.hemkop.se/sortiment/kiosk"
    ]

    try:
        # Retrieve main category links from the main page
        main_category_links = get_category_links(driver, 'https://www.hemkop.se/handla')

        # Filter out excluded categories from the main category links
        filtered_category_links = [link for link in main_category_links if link not in excluded_categories]

        # Iterate over each filtered category link to scrape product data
        for category_link in filtered_category_links:

            # Debug print statement to log the category being processed
            if debug:
              print("Processing category:", category_link)

            # Retrieve product data for the current category
            product_df  = scrape_category_products(driver, category_link)

            # Save the product data to a CSV file for the current category
            if not product_df.empty:
              # Extract the category name from the URL
              category_name = category_link.split("/sortiment/")[-1]

              # Get today's date
              today_date = datetime.now().strftime("%Y-%m-%d_%H%M")

              # Define the file name for the CSV file
              csv_file_name = f"{category_name}_{today_date}.csv"

               # Save locally in Colab
              local_save_path = os.path.join('/content/', csv_file_name)
              product_df.to_csv(local_save_path, index=False, encoding='utf-8')

              # Debug print statement
              if debug:
                print(f"Saved {category_name} data locally to {local_save_path}")

              # Save in Google Drive
              drive_save_path = os.path.join(save_path, csv_file_name)
              product_df.to_csv(drive_save_path, index=False, encoding='utf-8')

              # Debug print statement
              if debug:
                print(f"Saved {category_name} data to Google Drive: {drive_save_path}")

              # Append the product_df to all_product_data list
              all_product_data.append(product_df)

        # Concatenate all DataFrames in all_product_data into one DataFrame
        if all_product_data:
            product_df = pd.concat(all_product_data, ignore_index=True)
            return product_df
        else:
          # Debug print statement if no data were scraped
          if debug:
            print("No product data scraped.")

          return pd.DataFrame([])

    finally:
        # Ensure WebDriver instance is properly closed after scraping
        driver.quit()

"""
Define scrape_category_products function.

Main Functionality: Opens a category page, retrieves total number of products, including scrolling to load more products,
                    and scrapes product data until all products are retrieved.
"""
def scrape_category_products(driver, category_link, debug=False):
    try:
        # Initialize set to keep track of seen product links for this category
        viewed_product_links = set()

        # Initialize an empty list to store product data
        product_data = []

        # Navigate to the category page
        driver.get(category_link)

        # Debug print statement to log which category that was opened
        if debug:
          print(f"Category opened: {category_link}")

        # Add small delay
        time.sleep(30)

        # Wait for the product containers to be loaded
        WebDriverWait(driver, 25).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'div[data-testid="product-container"]')))

        # Get the total number of products
        total_products_element = driver.find_element(By.CSS_SELECTOR, 'p.sc-85dd906a-0.jBHeLY')
        total_products = int(total_products_element.text.split()[0])
        if debug:
          print(f"Total number of products found: {total_products}")

        # Retrieve initial product containers
        product_containers = scrape_visible_products(driver)
        initial_product_data = scrape_product_data_from_containers(product_containers, category_link, viewed_product_links)
        product_data.extend(initial_product_data)

        # Debug print statement to log how many initial products that were scraped
        if debug:
          print(f"Scraped {len(viewed_product_links)} out of {total_products} products initially.")

        # Continue scrolling and scraping until all products are retrieved from the category
        while len(viewed_product_links) < total_products:
            previous_seen_links_count = len(viewed_product_links)

            # Scroll down to load more products
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # Debug print statement to log that more products are being loaded
            if debug:
              print("Scrolling to load more products...")

            # Wait for a short time to allow new products to load
            time.sleep(30)

            # Wait for new products to load
            WebDriverWait(driver, 25).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'div[data-testid="product-container"]')))

            # Retrieve newly loaded product containers
            new_product_containers = scrape_visible_products(driver)
            new_product_data = scrape_product_data_from_containers(new_product_containers, category_link, viewed_product_links)
            product_data.extend(new_product_data)

            # Debug print statement to log how many products that have been scraped
            if debug:
              print(f"Scraped {len(viewed_product_links)} out of {total_products} products.")

            if len(viewed_product_links) == previous_seen_links_count:

                # Debug print statement to log that no new products will be loaded
                if debug:
                  print("No new products loaded. Breaking the loop.")

                break

            # Break if we have scraped all products
            if len(viewed_product_links) >= total_products:
                break

        # Convert product_data to DataFrame
        product_df = pd.DataFrame(product_data)

        return product_df

    except TimeoutException as e:

        # Debug print statement to log timeout error when loading products
        if debug:
          print(f"Timeout waiting for products: {e}")

        return pd.DataFrame([])  # Return an empty DataFrame on timeout

    except Exception as e:

        # Debug print statement to log error when loading products
        if debug:
          print(f"An unexpected error occurred: {e}")

        return pd.DataFrame([])  # Return an empty DataFrame on any other exception



# Define main functions for scraping ingredients data
Functions responsible for the main scraping process of ingredients data:
1.   *navigate_and_attempt(driver, product_link)*: Navigates to a product page and attempts to click the "Produktfakta" button.
2.   *get_ingredient_info(driver, product_link)*: Retrieves ingredients information from a product page.
3.   *retry_missing_ingredients(product_df, threshold)*: Checks for products without ingredients and retries to fetch missing information.
4.   *scrape_ingredients_data(product_df)*: Main function that orchestrates scraping of ingredients data for products.

In [7]:
"""
Define navigate_and_attempt function.

Main Functionality: Navigates to a product page and attempts to find and click the "Produktfakta" button
                    to later on retrieve the ingredients data from the product page.
"""
def navigate_and_attempt(driver, product_link, debug=False):

    # Debug print statement to log which product that is being processed
    if debug:
      print("Navigating to product page:", product_link)

    # Navigate to the product page
    driver.get(product_link)

    # Add small delay
    time.sleep(10)

    # Handle potential cookie consent banner
    handle_cookie_consent(driver)

    try:
        # Debug print statement to log that the produktfakta button is being located
        if debug:
          print(f"Attempt to locate 'Produktfakta' button.")

        # Ensure the page is fully loaded
        WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.TAG_NAME, "body")))

        # Scroll down to ensure the button is in the viewport
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight / 3);")

        # Attempt to locate and click the "Produktfakta" button
        produktfakta_button = WebDriverWait(driver, 30).until(
            EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='tab' and .//div[contains(text(), 'Produktfakta')]]"))
        )

        # Scroll to ensure the button is in the viewport
        driver.execute_script("arguments[0].scrollIntoView();", produktfakta_button)

        # Click the button
        produktfakta_button.click()

        # Debug print statement to log that the produktfakta button has been clicked on
        if debug:
          print("'Produktfakta' button clicked.")

        return True  # Indicate success

    except TimeoutException:

        # Debug print statement to log timeout error when trying to locate the produktfakta button
        if debug:
          print("Timeout: 'Produktfakta' button not found")

        # Call the take_screenshot function to take a screenshot of the current error shown on web page
        take_screenshot(driver, product_link)

        # Check if a 403 error occurred
        if "403" in str(driver.page_source):

            # Debug print statement to log a 403 error being detected
            if debug:
              print("403 error detected")

            return False  # Indicate 403 error

    return None  # Indicate other types of failures

"""
Define get_ingredient_info function.

Main Functionality: Navigates to a product page and retrieves ingredient information.
"""
def get_ingredient_info(driver, product_link, debug=False):

    try:
        attempt_result = navigate_and_attempt(driver, product_link)
        if attempt_result is False:


            if debug:
              print("Refreshing page due to 403 error")

            # Refresh driver
            driver.refresh()

            # Add a small delay to mimic human behavior
            time.sleep(10)

            # Call the handle cookie consent function to reject the cookie consent banner
            handle_cookie_consent(driver)

            # Call the navigate_and_attemo function to attempt to locate and click the "Produktfakta" button to retrieve ingredients data
            attempt_result = navigate_and_attempt(driver, product_link)

        if attempt_result:
            try:
                ingredient_text = WebDriverWait(driver, 20).until(
                    EC.presence_of_element_located((By.XPATH, "//p[contains(text(), 'Innehållsdeklaration')]"))
                )
                innehallsdeklaration_text = driver.find_element(By.XPATH, "//p[contains(text(), 'Innehållsdeklaration')]/following-sibling::p").text

                # Debug print statement to log that ingredients have been found for a specific product
                if debug:
                  print("Ingredients found for product:", product_link)

                return innehallsdeklaration_text

            except TimeoutException:
                # Debug print statement to log timeout error for a specific product
                if debug:
                  print(f"Error: Couldn't find ingredient data on {product_link}")

                return np.nan
        else:
            # Debug print statement to log that produkfakta button could not be found for a specific product
            if debug:
              print(f"Error: 'Produktfakta' button not found on {product_link}")
            # If no ingredients could be found, return NaN
            return np.nan

    except WebDriverException as e:

        # Debug print statement to log WebDriverException for a specific product
        if debug:
          print("Error while processing product link:", product_link)
          print("Error details:", e)

        # If no ingredients could be found due to WebDriverException, return NaN
        return np.nan

"""
Define retry_missing_ingredients function.

Main Functionality: Retries scraping ingredient data for products with missing information.
"""
def retry_missing_ingredients(product_df, threshold, debug=False):

    while True:
        missing_ingredients = product_df[product_df["Ingredients"].isna()]
        count_missing = len(missing_ingredients)

        # Debug print statement to log how many products that have missing ingredients data
        if debug:
          print(f"Number of products with missing ingredients: {count_missing}")

        if count_missing <= threshold:

            # Debug print statement to log when number of products with misisng ingredients are below the threshold
            if debug:
              print(f"The number of missing ingredients is now {count_missing}, which is below the threshold of {threshold}.")
            break

        driver = webdriver.Chrome(options=chrome_options)

        try:
            for index, product in missing_ingredients.iterrows():
                product_link = product["ProductLink"]
                product_df.at[index, "Ingredients"] = get_ingredient_info(driver, product_link)
        finally:
            driver.quit()

    return product_df

"""
Define scrape_ingredients_data function.

Main Functionality: Scrapes ingredient information for each product in a DataFrame.
"""
def scrape_ingredients_data(product_df, debug=False):

    # Initialize a WebDriver instance with Chrome options
    driver = webdriver.Chrome(options=chrome_options)

    try:
        # Iterate over each row (product) in the product_data DataFrame
        for index, row in product_df.iterrows():
            product_link = row["ProductLink"]
            product_df.at[index, "Ingredients"] = get_ingredient_info(driver, product_link)

        # Retry to fill missing ingredients if needed
        if "Ingredients" in product_df.columns:
            final_missing_count = product_df["Ingredients"].isna().sum()

            # Define threshold for products with missing ingredients data per category
            threshold = 5

            if final_missing_count > threshold:

                # Debug print statement to log how many prodcts that are still missing ingredients
                if debug:
                  print(f"Final count of products with missing ingredients: {final_missing_count}")
                  print("Retry to fill missing ingredients...")

                # Call the retry_missing_ingredients function to retrieve ingredients data for products where it is missing
                product_df = retry_missing_ingredients(product_df, threshold)

            else:
                # Debug print statement to log that count of products missing ingredients data is below the threshold
                if debug:
                  print("Count of products missing ingredients data is below threshold.")

        return product_df

    finally:
        # Ensure WebDriver instance is properly closed after scraping
        driver.quit()

# Execution of Web Scraping Process for Product Data

In [10]:
# Call scrape_product_data function to scrape product data
product_df = scrape_all_categories()

print(product_df.head())



KeyboardInterrupt: 

# Execution of Web Scraping Process for Ingredients Data

In [13]:
"""
Define process_and_update_ingredients_csv_files function.

Main Functionality: Processes all CSV files in the current directory, scrapes ingredient information for each product, and saves the updated DataFrame both locally and to a specified Google Drive path.
"""

def process_and_update_ingredients_csv_files(save_path='/content/drive/My Drive/scraped_files/', debug=False):
  # List all CSV files in the current directory
  csv_files = [f for f in glob.glob("*.csv") if "_with_ingredients" not in f]

  # Check if there are any CSV files found
  if not csv_files:
      # Debug print statement to log if no CSV files were found
      if debug:
        print("No CSV files found in the current directory.")
  else:
    # Call the get_ingredients_info function to scrape data for each category
    try:
        for category_csv in csv_files:

          # Debug print statement to log which csv is currently being processed
          if debug:
            print(f"Processing CSV file: {category_csv}")

          # Detect the delimiter
          delimiter = detect_delimiter(category_csv)

          # Debug print statement to log the identified delimiter
          if debug:
              print(f"Detected delimiter for {category_csv}: {delimiter}")

          # Read product data for the current category
          product_df = pd.read_csv(category_csv, encoding='utf-8', sep=delimiter)

          # Scrape ingredient data for the current DataFrame
          updated_product_df = scrape_ingredients_data(product_df)

          # Get today's date
          today_date = datetime.now().strftime("%Y-%m-%d_%H%M")

          # Extract the base filename (i.e. category name before the first underscore)
          base_filename = os.path.basename(category_csv).split('_')[0]

          # Define the file name for the CSV file
          updated_csv_file = f"{base_filename}_with_ingredients_{today_date}.csv"

          # Save the updated DataFrame to a new CSV file with ingredients information to Colab
          updated_local_csv_file = updated_csv_file
          updated_product_df.to_csv(updated_local_csv_file, index=False)

          # Debug print statement to log where data is being saved
          if debug:
              print(f"Updated product data with ingredients information saved to {updated_local_csv_file}")

          # Save the updated DataFrame to a new CSV file with ingredients information to Google Drive
          updated_drive_csv_file = os.path.join(save_path, updated_csv_file)
          updated_product_df.to_csv(updated_drive_csv_file, index=False)

          if debug:
              print(f"Saved updated product data with ingredients information to {updated_drive_csv_file}")

    finally:
        # Debug print statement to log that processing is complete
        if debug:
          print("All processing complete.")

# Call the function
process_and_update_ingredients_csv_files(debug=False)

# Set up and configure OpenAI

In [14]:
!pip install openai==0.28.0
import openai

Collecting openai==0.28.0
  Downloading openai-0.28.0-py3-none-any.whl (76 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/76.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.5/76.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: openai
Successfully installed openai-0.28.0


In [15]:
"""
Define set_openai_key function.

Main Functionality: Loads and sets the OpenAI API key from an environment file, ensuring the key is available for use in the OpenAI module.
"""
def set_openai_key(debug=False):

    try:
        # Path to .env file in Colab environment
        env_path = '/content/openai_api_key.env'

        # Load environment variables from .env file
        load_dotenv(env_path)
        # Debug print statement
        if debug:
          print(f"Environment variables loaded successfully from {env_path} file")

        # Access environment variables
        openai_api_key = os.getenv('OPENAI_API_KEY')

        # Check if the API key was retrieved successfully
        if openai_api_key is None:
            raise ValueError("No API key found in environment variables")

        # Set the OpenAI API key as an environment variable
        os.environ['OPENAI_API_KEY'] = openai_api_key

        # Set the API key for the OpenAI module
        openai.api_key = openai_api_key

        if debug:
          print("OpenAI API key set successfully")

    except Exception as e:
        if debug:
            print(f"An error occurred while setting the OpenAI API key: {e}")
        raise

# Define function for NOVA classification of data

In [16]:
"""
Define classify_nova_food function.

Main Functionality: Classifies food items into one of the four NOVA groups based on their ingredients and processing level using an OpenAI model.
"""
# Define the function to classify food into NOVA groups
def classify_nova_food(product_name, ingredients, debug=False):

    # Do not run the function if ingredients are missing
    if pd.isna(ingredients) or ingredients.strip() == '' or ingredients == 'nan':
        return None, None

    prompt = (
        "Classify the following food item according to the NOVA classification system and decide which of the 4 groups the food item should be categorized as. "
        "Provide the classification on the first line in your response. The 4 groups are as follows: "
        "Group 1 - Unprocessed or minimally processed foods "
        "Group 2 - Processed culinary ingredients "
        "Group 3 - Processed foods "
        "Group 4 - Ultra-processed foods "
        "Set the classification to Unknown if it is not possible to categorize the item. "

        "The 4 groups in the NOVA Classification system are the following: "
        "Group 1 - Unprocessed or minimally processed foods like fruit, vegetables, eggs, meat, milk, etc. "
        "Group 2 - Foods processed in the kitchen with the aim of extending their shelf life. In practice, these are ingredients to be used in the kitchen such as fats, "
        "aromatic herbs, etc. to be kept in jars or in the refrigerator to be able to use them later. "
        "Group 3 - Processed foods: Foods made by adding sugar, oil, salt, or other Group 2 ingredients to Group 1 foods. These processes include canning, bottling, and non-alcoholic fermentation. "
        "Examples include canned vegetables, salted meats, cheese, and, importantly, tofu, which involves processing soybeans into soy milk, coagulating it, and pressing the curds into blocks. "
        "These foods are made up of a few ingredients and are typically recognizable as modified versions of whole foods."
        "Group 4 - Ultra-processed foods. They are the ones that use many ingredients including food additives that improve palatability, processed raw materials (hydrogenated fats, modified starches, etc.) "
        "and ingredients that are rarely used in home cooking such as soy protein or mechanically separated meat. These foods are mainly of industrial origin and are characterized by a good pleasantness and the fact "
        "that they can be stored for a long time. "

        "When classifying, carefully consider both the type of product by looking at the product name, the ingredients and the extent of processing involved, including any additives, preservatives, "
        "or industrial processes used, with a preference for foods with less processing, fewer preservatives, and minimal added sugars that support good metabolic health. "

        f"Given the food item {product_name} with the following ingredients {ingredients} and carefully considering the type of product, its production process and ingredients, "
        "identify the NOVA group and give a brief explanation of why the food item was placed in the specific category. "

        "Provide the classification on the first line, followed by a brief description on the subsequent lines. "
        "Please keep the description brief, you do not need to repeat the product name of the food item in the description. "
    )

    try:

        # Debug print statement to log the prompt
        if debug:
          print(f"Sending prompt to OpenAI: {prompt}")

        response = openai.ChatCompletion.create(
            model="gpt-4o",  # Use the appropriate model
            messages=[
                {"role": "system", "content": "You are a food classification assistant who wants to support metabolic health."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=500,  # Adjust the token limit as needed
            temperature=0  # Set to 0 for more deterministic output
        )

        if debug:
          # Debug print to log the received response
          print(f"Response received: {response}")

        # Extract the text from the response
        classification_description = response['choices'][0]['message']['content'].strip()

        # Split the response text into lines
        classification_lines = classification_description.split('\n')

        # The first line is the classification
        classification = classification_lines[0].strip()

        # The rest of the lines form the brief description
        brief_description = ' '.join(classification_lines[1:]).strip()

    except Exception as e:
        classification = f"Error: {e}"
        brief_description = f"Error: {e}"

        # Debug print statement to log error
        if debug:
          print(f"Error occurred: {classification}")

    # Return the classification and the brief description
    return classification, brief_description


# Execution of NOVA Classification

In [18]:

"""
Define the process_and_classify_nova_csv_files function.

Main Functionality: Processes CSV files to classify food items into NOVA groups, then saves the updated data both locally and to Google Drive.
"""
def process_and_classify_nova_csv_files(save_path='/content/drive/My Drive/scraped_files/', debug=False):
  # Set the OpenAI API key by calling the set_openai_key function
  set_openai_key(debug=debug)

  # List all CSV files in the current directory with ingredients data
  csv_files = [f for f in glob.glob("*.csv") if "_with_ingredients" in f]

  # Check if there are any CSV files found
  if not csv_files:
      # Debug print statement to log if no csv files were found
      if debug:
        print("No CSV files found in the current directory.")

  else:
      all_classified_data = []  # List to store all classified DataFrames

      try:
          for category_csv in csv_files:
              # Skip already classified files
              if "_classified_data" in category_csv:
                  if debug:
                      print(f"Skipping already classified file: {category_csv}")
                  continue

              # Debug print statement to log which category is being processed
              if debug:
                  print(f"Processing CSV file: {category_csv}")

              # Detect the delimiter
              delimiter = detect_delimiter(category_csv)

              # Debug print statement to log the identified delimiter
              if debug:
                  print(f"Detected delimiter for {category_csv}: {delimiter}")

              # Read product data for the current category
              grocery_data_df = pd.read_csv(category_csv, encoding='utf-8', sep=delimiter)

              # Ensure Ingredients column is treated as strings, handle NaNs appropriately
              grocery_data_df['Ingredients'] = grocery_data_df['Ingredients'].astype(str)

              # Apply the classification function to each row in the DataFrame
              grocery_data_df['NOVA Classification'], grocery_data_df['NOVA Classification Description'] = zip(
                  *grocery_data_df.apply(lambda row: classify_nova_food(row['ProductName'], row['Ingredients']), axis=1)
              )

              # Get today's date
              today_date = datetime.now().strftime("%Y-%m-%d_%H%M")

              # Extract the base filename (i.e. category name before the first underscore)
              base_filename = os.path.basename(category_csv).split('_')[0]

              # Define the file name for the updated CSV file with classified data
              final_csv_file = f"{base_filename}_classified_data_{today_date}.csv"

              # Save the final DataFrame with classified data to a new CSV file to Colab
              final_local_csv_file = final_csv_file
              grocery_data_df.to_csv(final_local_csv_file, index=False)

              # Debug print statement to log where data is being saved
              if debug:
                  print(f"Updated product data with NOVA classification saved to {final_local_csv_file}")

              # Save the final DataFrame with classified data to a new CSV file to Google Drive
              final_drive_csv_file = os.path.join(save_path, final_csv_file)
              grocery_data_df.to_csv(final_drive_csv_file, index=False)

              # Debug print statement to log where data is being saved
              if debug:
                  print(f"Saved updated product data with NOVA classification information to {final_drive_csv_file}")

              # Append the classified DataFrame to all_classified_data
              all_classified_data.append(grocery_data_df)

      finally:
          # Concatenate all DataFrames in all_classified_data into one final DataFrame
          if all_classified_data:
              final_combined_df = pd.concat(all_classified_data, ignore_index=True)

              # Define the file name for the final combined CSV file
              final_combined_csv_file = f"all_categories_classified_data_{today_date}.csv"

              # Save the final combined DataFrame to Colab
              final_local_combined_csv_file = final_combined_csv_file
              final_combined_df.to_csv(final_local_combined_csv_file, index=False)

              # Debug print statement to log where final combined data is being saved (Colab)
              if debug:
                  print(f"Saved final combined classified data to Colab: {final_local_combined_csv_file}")

              # Save the final combined DataFrame to Google Drive
              final_drive_combined_csv_file = os.path.join(save_path, final_combined_csv_file)
              final_combined_df.to_csv(final_drive_combined_csv_file, index=False)

              # Debug print statement to log where final combined data is being saved (Drive)
              if debug:
                  print(f"Saved final combined classified data to Google Drive: {final_drive_combined_csv_file}")
          else:
              # Debug print statement if no data were processed
              if debug:
                  print("No data processed for classification.")

          # Debug print statement to log that the classification is complete
          if debug:
              print("All NOVA classification complete.")

# Call the function
process_and_classify_nova_csv_files(debug=False)