<a href="https://colab.research.google.com/github/talhahk24/Data_Scraping_Compilation/blob/main/(June_2024)_Food_Data_Scraping_(Center_for_Food_and_Safety).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Scraping Data

## Setup


In [None]:
# =============================================================================
# SETUP CELL - All Installation Commands and Selenium Initialization
# =============================================================================

!pip install selenium
!pip install pandas
!pip install transformers
!pip install sentencepiece
!pip install sacremoses
!pip install --upgrade google-cloud-translate

!apt-get update
!apt install chromium-chromedriver
!cp /usr/lib/chromium-browser/chromedriver /usr/bin
import sys
sys.path.insert(0, '/usr/lib/chromium-browser/chromedriver')

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException, ElementClickInterceptedException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import time
import os
import pandas as pd
import re
import transformers as pipeline
from google.colab import files, auth
from google.cloud import translate_v3 as translate

chrome_options = Options()

chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')

driver = webdriver.Chrome(options=chrome_options)

print(" Setup complete! All packages installed and WebDriver initialized.")

## Scraping Functions

In [None]:
# =============================================================================
# SCRAPING FUNCTIONS
# =============================================================================

class FoodDataScraper:
    """
    A modular class for scraping food data from the Center for Food and Safety website.
    Handles navigation, data extraction, and data processing for multiple languages.
    """

    def __init__(self, driver, wait_timeout=20):
        self.driver = driver
        self.wait = WebDriverWait(driver, wait_timeout)
        self.all_data = pd.DataFrame()

        self.category_ids = ["17", "13", "29", "01", "20", "09", "15", "12", "11", "04",
                           "08", "19", "02", "06", "10", "16", "05", "07", "40", "22",
                           "26", "21", "18", "03"]
        self.grp_ids = ['grp17', 'grp13', 'grp29', 'grp01', 'grp20', 'grp09', 'grp15',
                       'grp12', 'grp11', 'grp04', 'grp08', 'grp19', 'grp02', 'grp06',
                       'grp10', 'grp16', 'grp05', 'grp07', 'grp40', 'grp22', 'grp26',
                       'grp21', 'grp18', 'grp03']

    def navigate_to_category(self, category_id, grp_id):
        """Navigate to a specific food category and its group page."""
        try:
            category_link = self.wait.until(
                EC.element_to_be_clickable((By.XPATH, f"//td[@id='{category_id}']//a"))
            )
            category_name = category_link.text.strip()
            ActionChains(self.driver).move_to_element(category_link).click().perform()

            self.wait.until(EC.presence_of_element_located((By.ID, grp_id)))

            return category_name
        except Exception as e:
            print(f"Error navigating to category {category_id}: {e}")
            return None

    def open_data_tab(self, grp_id):
        """Open the data tab for a specific group."""
        try:
            grp_link = self.wait.until(
                EC.element_to_be_clickable((By.XPATH, f"//td[@id='{grp_id}']/a"))
            )

            self.driver.execute_script("arguments[0].scrollIntoView(true);", grp_link)

            original_window = self.driver.current_window_handle
            windows_before = self.driver.window_handles

            ActionChains(self.driver).move_to_element(grp_link).click().perform()

            self.wait.until(EC.new_window_is_opened(windows_before))

            windows_after = self.driver.window_handles
            new_window = [window for window in windows_after if window not in windows_before][0]
            self.driver.switch_to.window(new_window)

            self.wait.until(EC.presence_of_element_located((By.ID, 'content')))

            return original_window
        except Exception as e:
            print(f"Error opening data tab for {grp_id}: {e}")
            return None

    def extract_category_name(self, language='en'):
        """Extract the category name from the page based on language."""
        try:
            category_table = self.driver.find_element(By.CSS_SELECTOR, 'table.colorTable1')

            patterns = {
                'en': 'Food Group :',
                'tc': '食物類別 :',
                'sc': '食物类别 :'
            }

            pattern = patterns.get(language, patterns['en'])
            food_group_label_td = category_table.find_element(
                By.XPATH, f".//td[contains(text(),'{pattern}')]"
            )
            food_group_value_td = food_group_label_td.find_element(
                By.XPATH, 'following-sibling::td'
            )

            return food_group_value_td.text.strip()
        except Exception as e:
            print(f"Error extracting category name: {e}")
            return "Unknown Category"

    def extract_table_data(self):
        """Extract data from the colorTable2 table."""
        try:
            data_table = self.driver.find_element(By.CSS_SELECTOR, 'table.colorTable2')
            rows = data_table.find_elements(By.XPATH, ".//tbody/tr")

            headers = []
            data_rows = []
            header_collected = False

            for row in rows:
                th_elements = row.find_elements(By.XPATH, ".//th")

                if th_elements and not header_collected:
                    headers = [th.text.strip() for th in th_elements]
                    if 'Category' not in headers:
                        headers.append('Category')
                    if 'FoodID' not in headers:
                        headers.append('FoodID')
                    header_collected = True
                elif not th_elements:
                    td_elements = row.find_elements(By.XPATH, ".//td")
                    data = [td.text.strip() for td in td_elements]

                    if data:
                        food_id = self._extract_food_id(row)

                        data_dict = dict(zip(headers[:-2], data))
                        data_dict['Category'] = None
                        data_dict['FoodID'] = food_id
                        data_rows.append(data_dict)

            return pd.DataFrame(data_rows)
        except Exception as e:
            print(f"Error extracting table data: {e}")
            return pd.DataFrame()

    def _extract_food_id(self, row):
        """Extract food ID from a table row."""
        try:
            a_element = row.find_element(By.XPATH, ".//a")
            href = a_element.get_attribute('href')
            match = re.search(r"javascript:tosubmit\('\d+','-1','([^']+)'\)", href)
            return match.group(1) if match else None
        except:
            return None

    def close_tab_and_return(self, original_window):
        """Close current tab and return to original window."""
        try:
            self.driver.close()
            self.driver.switch_to.window(original_window)
            self.driver.back()
            self.wait.until(EC.presence_of_element_located((By.ID, '17')))
        except Exception as e:
            print(f"Error closing tab: {e}")

    def merge_dataframes(self, new_df, category_name):
        """Merge new DataFrame with existing data."""
        try:
            new_df['Category'] = category_name

            for col in new_df.columns:
                if col not in self.all_data.columns:
                    self.all_data[col] = None

            for col in self.all_data.columns:
                if col not in new_df.columns:
                    new_df[col] = None

            self.all_data = pd.concat([self.all_data, new_df], ignore_index=True, sort=False)

        except Exception as e:
            print(f"Error merging dataframes: {e}")

    def scrape_single_category(self, category_id, grp_id, language='en'):
        """Scrape data for a single category."""
        try:
            print(f"Processing category {category_id}...")

            category_name = self.navigate_to_category(category_id, grp_id)
            if not category_name:
                return False

            original_window = self.open_data_tab(grp_id)
            if not original_window:
                return False

            self.driver.save_screenshot(f'Dataset_{category_name}.png')

            page_category_name = self.extract_category_name(language)

            category_df = self.extract_table_data()

            if not category_df.empty:
                self.merge_dataframes(category_df, page_category_name)
                print(f"Scraping Complete: {page_category_name}")
            else:
                print(f"No data found for {page_category_name}")

            self.close_tab_and_return(original_window)
            return True

        except Exception as e:
            print(f"Error scraping category {category_id}: {e}")
            return False

    def scrape_all_categories(self, main_url, language='en', filename=None):
        """Scrape all categories for a given language."""
        try:
            print(f"Starting scraping for {language} version...")

            self.driver.get(main_url)
            self.driver.save_screenshot('Main.png')

            self.all_data = pd.DataFrame()

            for i, category_id in enumerate(self.category_ids):
                grp_id = self.grp_ids[i]
                success = self.scrape_single_category(category_id, grp_id, language)

                if not success:
                    print(f"Failed to scrape category {category_id}")

            if filename:
                self.all_data.to_csv(filename, index=False)
                print(f"Data saved to {filename}")

            return self.all_data

        except Exception as e:
            print(f"Error in scrape_all_categories: {e}")
            return pd.DataFrame()

# Initialize the scraper
scraper = FoodDataScraper(driver)
print("Modular scraper initialized!")

## DataScraping Mains

In [None]:
# =============================================================================
# ENGLISH VERSION SCRAPING
# =============================================================================

english_data = scraper.scrape_all_categories(
    main_url='https://www.cfs.gov.hk/english/nutrient/search1.php',
    language='en',
    filename='Eng_With_ID_All_Categories_Data.csv'
)

print(f"English scraping complete! Total records: {len(english_data)}")


In [None]:
# After all categories are processed, save the DataFrame to CSV
filename = 'Eng_With_ID_All_Categories_Data.csv'
all_data.to_csv(filename, index=False)
print(f"Data saved to {filename}")

In [None]:
# =============================================================================
# TRADITIONAL CHINESE VERSION SCRAPING
# =============================================================================

tc_data = scraper.scrape_all_categories(
    main_url='https://www.cfs.gov.hk/tc_chi/nutrient/search1.php',
    language='tc',
    filename='TC_With_ID_All_Categories_Data.csv'
)

print(f"Traditional Chinese scraping complete! Total records: {len(tc_data)}")


In [None]:
# After all categories are processed, save the DataFrame to CSV
filename = 'TC_With_ID_All_Categories_Data.csv'
all_data.to_csv(filename, index=False)
print(f"Data saved to {filename}")

In [None]:
# =============================================================================
# SIMPLIFIED CHINESE VERSION SCRAPING
# =============================================================================

sc_data = scraper.scrape_all_categories(
    main_url='https://www.cfs.gov.hk/sc_chi/nutrient/search1.php',
    language='sc',
    filename='SC_With_ID_All_Categories_Data.csv'
)

print(f"Simplified Chinese scraping complete! Total records: {len(sc_data)}")


In [None]:
# After all categories are processed, save the DataFrame to CSV
filename = 'SC_With_ID_All_Categories_Data.csv'
all_data.to_csv(filename, index=False)
print(f"Data saved to {filename}")

In [None]:
# =============================================================================
# DOWNLOAD FILES
# =============================================================================

files.download('Eng_With_ID_All_Categories_Data.csv')
files.download('TC_With_ID_All_Categories_Data.csv')
files.download('SC_With_ID_All_Categories_Data.csv')

print("All files downloaded successfully!")

In [None]:
# =============================================================================
# SUMMARY
# =============================================================================

# Display sample data
print("\n📊 Sample Data Preview:")
english_data.head(10) if 'english_data' in locals() else print("Run the scraping cells first!")

# Translating Data

In [None]:
uploaded = files.upload()

In [None]:
all_data = pd.read_csv('All_Categories_Data.csv')

translator = pipeline("translation", model="Helsinki-NLP/opus-mt-zh-en")

def translate_text(text):
    if pd.isnull(text) or text.strip() == '':
        return ''
    result = translator(text, max_length=512)
    print(f"{text} = {result}")
    return result[0]['translation_text']

all_data['Food Name (English)'] = all_data['食物名称'].apply(translate_text)

In [None]:
all_data.head(10)

In [None]:
service_account_key_path = '/content/credentials.json'

client = translate.TranslationServiceClient.from_service_account_file(service_account_key_path)

project_id = 'foodnametranslation-436816'

location = 'global'

parent = f'projects/{project_id}/locations/{location}'

def translate_text(text):
    if pd.isnull(text) or text.strip() == '':
        return ''
    response = client.translate_text(
        parent=parent,
        contents=[text],
        mime_type='text/plain',
        source_language_code='zh',
        target_language_code='en'
    )
    for translation in response.translations:
        print(f"{text} = {translation.translated_text}")
        return translation.translated_text
all_data['Food Name (English)'] = all_data['食物名称'].apply(translate_text)

all_data.to_csv('All_Categories_Data_with_English_Names.csv', index=False)

print("Translation complete. Data saved to 'All_Categories_Data_with_English_Names.csv'.")


In [None]:
def translate_text_nullable(text):
    if pd.isnull(text) or text.strip() == '':
        return text
    else:
        response = client.translate_text(
            parent=parent,
            contents=[text],
            mime_type='text/plain',
            source_language_code='zh',
            target_language_code='en'
        )
        translation = response.translations[0]
        print(f"{text} = {translation.translated_text}")
        return translation.translated_text

all_data['Alias (English)'] = all_data['別名'].apply(translate_text_nullable)

all_data.to_csv('All_Categories_Data_with_Alias_English_Names.csv', index=False)

print("Translation of '別名' column complete. Translated values are stored in 'Alias (English)' column.")


In [None]:
all_data.to_csv('All_Categories_Data_with_Alias_English_Names.csv', index=False)