# ARCHI_PRODUCT SCRAPING SCRIPT

 # WITH IMAGES 

In [None]:
import os
import csv
import time
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# ─── CONFIG ────────────────────────────────────────────────────────────────────
OUTPUT_CSV = "poo.csv"
HEADERS = [
    "Title", "ProductName", "producturl", "Brand", "Type",
    "Category", "Description", "Tags", "ImageURLs",
    "DimensionText", "DimensionImages"
]
BRANDS = [
    "Ditre-Italia","Cattelan-Italia","Vondom","NICOLINE","Natuzzi-Italia","Talenti",
    "Cappellini","Opinion-Ciatti","DIESEL","Bontempi-Casa","Calligaris","Pianca",
    "Moroso","GUFRAM","FontanaArte","LODES","Vibia","LAGO","Seletti","Bonaldo",
    "Arketipo","SLIDE","Tonelli-Design","Kose","Foscarini","Zeus","Tomasella",
    "Lapalma","Sovet-italia","CAMERICH","Connubia","Mogg","Driade","Marset","Oluce",
    "Flos","Caracole","Reflex","Desalto","Lema","Flou","UNOPIU","Alias",
    ("Wiener GTV Designa","https://www.archiproducts.com/en/products?q=wiener%20gtv%20design"),
    ("MIDS","https://www.archiproducts.com/en/products?q=MIDS"),
    "Fiam-Italia","Il-Fanale","Qeeboo","Saba Italia","Magis","IMPACT-ACOUSTIC",
]

# ─── HELPERS ───────────────────────────────────────────────────────────────────
def init_driver():
    opts = uc.ChromeOptions()
    # opts.add_argument("--headless")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-dev-shm-usage")
    return uc.Chrome(options=opts)

def ensure_csv():
    if not os.path.isfile(OUTPUT_CSV):
        with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
            csv.DictWriter(f, fieldnames=HEADERS).writeheader()

def append_row(row):
    with open(OUTPUT_CSV, "a", newline="", encoding="utf-8") as f:
        csv.DictWriter(f, fieldnames=HEADERS).writerow(row)

def get_text(driver, xpath, timeout=5):
    try:
        el = WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((By.XPATH, xpath))
        )
        return el.text.strip()
    except:
        return ""

def get_texts(driver, xpath):
    try:
        return [el.text.strip() for el in driver.find_elements(By.XPATH, xpath) if el.text.strip()]
    except:
        return []

# ─── MAIN SCRAPER ───────────────────────────────────────────────────────────────
def scrape():
    driver = init_driver()
    ensure_csv()

    for b in BRANDS:
        if isinstance(b, tuple):
            brand_key, listing_url = b
        else:
            brand_key = b
            listing_url = f"https://www.archiproducts.com/en/{b}/products"

        print(f"\n→ Scraping brand: {brand_key}")
        driver.get(listing_url)
        time.sleep(2)

        while True:
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(1)

            cards = driver.find_elements(By.CSS_SELECTOR, "#productGrid > div.cell._search-item a")
            links = {a.get_attribute("href") for a in cards if a.get_attribute("href")}
            print(f" • Found {len(links)} products")

            for url in links:
                print("   -", url)
                driver.get(url)
                time.sleep(1)

                row = {
                    "Title": get_text(driver, "//article//h2"),
                    "ProductName": get_text(driver, "//hgroup/h1/span[2]"),
                    "producturl": url,
                    "Brand": get_text(driver, "//hgroup/h1/span[1]/a"),
                    "Type": get_text(driver, "//hgroup/h1/span[3]"),
                    "Category": get_text(driver, "//nav//li[4]/a/span"),
                    "Description": get_text(driver, "//section/div[1]/article/div[2]//div"),
                }

                # TAGS (dynamic by header, fallback)
                tags = []
                try:
                    hdr = driver.find_element(By.XPATH, "//a[normalize-space(text())='Tags']")
                    panel = hdr.find_element(By.XPATH, "./ancestor::div[contains(@class,'accordion-item')]")
                    for a in panel.find_elements(By.XPATH, ".//div[contains(@class,'accordion-content')]//a"):
                        txt = a.text.strip()
                        if txt:
                            tags.append(txt)
                except:
                    tags = get_texts(driver, "//section/div[1]/section/div[6]//a/span")
                row["Tags"] = ";".join(tags)

                # IMAGES — only .jpeg URLs
                imgs = []
                img_els = driver.find_elements(By.CSS_SELECTOR,
                    "div.productsheet__overview__gallery div.image-container img"
                )
                for img in img_els:
                    src = img.get_attribute("src") or img.get_attribute("data-src")
                    if src and src.startswith("http") and src.lower().endswith(".jpeg"):
                        if src not in imgs:
                            imgs.append(src)
                row["ImageURLs"] = ";".join(imgs)

                # DIMENSIONS (using provided XPath for text)
                if driver.find_elements(By.XPATH, "//a[normalize-space(text())='Dimensions']"):
                    row["DimensionText"] = get_text(driver,
                        "/html/body/section[2]/div[1]/div/div/div[2]/div[1]/section/div[2]/div"
                    ).replace("\n", " | ").strip()

                    dim_imgs = []
                    for e in driver.find_elements(By.XPATH,
                        "//a[normalize-space(text())='Dimensions']/ancestor::div[contains(@class,'accordion-item')]"
                        "//figure//a/img"
                    ):
                        src = e.get_attribute("src")
                        if src and src.startswith("http") and src.lower().endswith(".jpeg"):
                            dim_imgs.append(src)
                    row["DimensionImages"] = ";".join(dim_imgs)
                else:
                    row["DimensionText"] = ""
                    row["DimensionImages"] = ""

                append_row(row)
                driver.back()
                time.sleep(1)

            # Next page
            try:
                nxt = driver.find_element(By.LINK_TEXT, "Next")
                if "disabled" in nxt.get_attribute("class"):
                    break
                nxt.click()
                time.sleep(2)
            except:
                break

        print(f"✔ Completed brand: {brand_key}")

    driver.quit()

if __name__ == "__main__":
    scrape()


#  Images  script  for  high resolutions images

In [None]:
import os
import csv
import time
import random
import undetected_chromedriver as uc
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# ─── CONFIG ───────────────────────────────────────────────────────────────────
INPUT_CSV  = "do_final_merged_complete_mapped.csv"
OUTPUT_CSV = "do_rescraping_image_url.csv"
RETRY_DELAY = 5  # seconds

# ─── SELENIUM HELPERS ───────────────────────────────────────────────────────────
def init_driver():
    opts = uc.ChromeOptions()
    # opts.add_argument("--headless")  # Optional
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-dev-shm-usage")
    driver = uc.Chrome(options=opts)
    driver.set_page_load_timeout(60)
    return driver

def wait_for_xpath(driver, xpath, timeout=20):
    WebDriverWait(driver, timeout).until(
        EC.presence_of_element_located((By.XPATH, xpath))
    )

def load_with_retry(driver, url, xpaths):
    if isinstance(xpaths, str):
        xpaths = [xpaths]
    while True:
        try:
            driver.get(url)
            for xp in xpaths:
                try:
                    wait_for_xpath(driver, xp)
                    return
                except TimeoutException:
                    continue
            raise TimeoutException(f"None of {xpaths} found")
        except TimeoutException as e:
            print(f"  ⚠ Timeout loading {url} ({e}), retrying in {RETRY_DELAY}s...")
            try:
                driver.execute_script("window.stop();")
            except:
                pass
            time.sleep(RETRY_DELAY)

# ─── URL HELPERS (ensure high-res) ─────────────────────────────────────────────
ALLOWED_EXTS = (".jpg", ".jpeg", ".png", ".webp")

def strip_query(u: str) -> str:
    return (u or "").split("?")[0].strip()

def promote_variant(u: str) -> str:
    # Archiproducts thumbnails often use `/g/`. Prefer `/p/`.
    if "/g/" in u:
        return u.replace("/g/", "/p/")
    return u

def parse_srcset(srcset: str) -> str:
    try:
        parts = [p.strip() for p in (srcset or "").split(",") if p.strip()]
        if not parts:
            return ""
        # take the last (largest) candidate
        return strip_query(parts[-1].split()[0])
    except:
        return ""

def is_valid_image(u: str) -> bool:
    return u.lower().startswith("http") and u.lower().endswith(ALLOWED_EXTS)

def best_src(el) -> str:
    # Prefer the largest/best attributes first
    candidates = [
        el.get_attribute("data-src-big"),
        el.get_attribute("data-zoom"),
        el.get_attribute("data-large"),
        el.get_attribute("data-full"),
        parse_srcset(el.get_attribute("srcset")),
        el.get_attribute("data-src"),
        el.get_attribute("src"),
    ]
    for c in candidates:
        u = promote_variant(strip_query(c or ""))
        if is_valid_image(u):
            return u

    # last resort: nearest anchor href
    try:
        anchor = el.find_element(By.XPATH, "./ancestor::a[1]")
        href = promote_variant(strip_query(anchor.get_attribute("href") or ""))
        if is_valid_image(href):
            return href
    except:
        pass

    return ""

def dedupe_preserve(seq):
    seen = set()
    out = []
    for x in seq:
        if x and x not in seen:
            seen.add(x)
            out.append(x)
    return out

def drop_thumbs_if_hires(urls):
    # If any non-/g/ URLs exist, drop /g/ thumbnails
    hires_exists = any("/g/" not in u for u in urls)
    if hires_exists:
        return [u for u in urls if "/g/" not in u]
    return urls

# ─── MAIN IMAGE SCRAPER ─────────────────────────────────────────────────────────
def scrape_images(driver):
    # Scroll to bottom for lazy-loaded images
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    time.sleep(2)

    collected = []

    # 1) MAIN GALLERY (#product-image)
    try:
        gallery = driver.find_element(By.ID, "product-image")
        imgs = gallery.find_elements(By.TAG_NAME, "img")
    except:
        imgs = []
    for img in imgs:
        u = best_src(img)
        if u:
            collected.append(u)

    # 2) FLICKITY CAROUSEL (fallback)
    if not collected:
        flicks = driver.find_elements(By.CSS_SELECTOR, "div.flickity-slider .carousel-cell img")
        for img in flicks:
            u = best_src(img)
            if u:
                collected.append(u)

    # 3) OVERVIEW GALLERY (fallback)
    if not collected:
        ov_containers = []
        ov_containers += driver.find_elements(By.CSS_SELECTOR, "div.productsheet_overview_gallery")
        ov_containers += driver.find_elements(By.CSS_SELECTOR, "div.productsheet__overview__gallery")
        imgs2 = []
        for ov in ov_containers:
            try:
                imgs2 += ov.find_elements(By.TAG_NAME, "img")
            except:
                pass
        for img in imgs2:
            u = best_src(img)
            if u:
                collected.append(u)

    # 4) Single direct image fallback
    if not collected:
        try:
            img = driver.find_element(By.ID, "imgCarousel")
            u = best_src(img)
            if u:
                collected.append(u)
        except:
            pass

    # Clean up + drop thumbnails if any hi-res exists
    collected = dedupe_preserve(collected)
    collected = drop_thumbs_if_hires(collected)
    return collected

# ─── DIMENSION IMAGE SCRAPER ───────────────────────────────────────────────────
def scrape_dimensions(driver):
    raw = []
    if driver.find_elements(By.XPATH, "//a[normalize-space(text())='Dimensions']"):
        els = driver.find_elements(
            By.XPATH,
            "//div[contains(@class,'accordion-item') and .//a[text()='Dimensions']]//figure//a/img"
        )
        for img in els:
            u = best_src(img)
            if u:
                raw.append(u)
    raw = dedupe_preserve(raw)
    raw = drop_thumbs_if_hires(raw)
    return raw

# ─── MAIN FUNCTION ─────────────────────────────────────────────────────────────
def main():
    # Read input rows
    with open(INPUT_CSV, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        rows = list(reader)
        fieldnames = reader.fieldnames.copy()

    # Ensure columns exist
    if "ImageURLs" not in fieldnames:
        fieldnames.append("ImageURLs")
    if "DimensionImages" not in fieldnames:
        fieldnames.append("DimensionImages")

    driver = init_driver()

    # Overwrite output and start fresh
    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        url_cache = {}  # producturl -> (img_str, dim_str)

        total = len(rows)
        try:
            for idx, row in enumerate(rows, start=1):
                url = (row.get("producturl") or "").strip()

                if not url:
                    print(f"[{idx}/{total}] ⚠ Empty producturl, skipping row (not written).")
                    continue

                # Skip scraping duplicates; reuse first result
                if url in url_cache:
                    img_str, dim_str = url_cache[url]
                    print(f"[{idx}/{total}] ⏭ Duplicate URL, reusing cached result: {url}")
                    row["ImageURLs"] = img_str
                    row["DimensionImages"] = dim_str
                    writer.writerow(row)
                    f.flush(); os.fsync(f.fileno())
                    continue

                print(f"[{idx}/{total}] Processing: {url}")

                try:
                    load_with_retry(driver, url, ["//*[@id='product-image']", "//hgroup/h1/span[2]"])
                    imgs = scrape_images(driver)
                    dims = scrape_dimensions(driver)

                    # Live update line (restored)
                    print(f"    → Found {len(imgs)} main images, {len(dims)} dimension images")

                    if not imgs:
                        print(f"    → 0 images found. Skipping write for this row.")
                        continue

                    img_str = ";".join(imgs)
                    dim_str = ";".join(dims)
                    row["ImageURLs"] = img_str
                    row["DimensionImages"] = dim_str

                    # cache for duplicates
                    url_cache[url] = (img_str, dim_str)

                    writer.writerow(row)
                    f.flush(); os.fsync(f.fileno())
                    time.sleep(random.uniform(1, 2)

                except Exception as e:
                    print(f"    ❌ Error on {url}: {e}")
                    # do not write empty/failed rows; continue to next

        except KeyboardInterrupt:
            print("\n🛑 Stopped by user. Exiting cleanly without writing partial/empty rows...")

    driver.quit()
    print(f"\n✅ Done! Output saved to: {OUTPUT_CSV}")

if __name__ == "__main__":
    main()


# Downloding images script of products and placing in folders

In [None]:
import os
import csv
import shutil
import cloudscraper
import re

# ─── CONFIG ───────────────────────────────────────────────
CSV_FILE = "final_merged_complete.csv"
IMAGE_ROOT = "images"

# ─── Initialize Cloudflare Scraper ────────────────────────
scraper = cloudscraper.create_scraper()

# ─── Sanitize folder/file names ───────────────────────────
def sanitize_filename(name):
    return re.sub(r'[<>:"/\\|?*]', '', name.strip()).replace(" ", "")

# ─── Download image bypassing Cloudflare ──────────────────
def download_image(url, save_path):
    try:
        if not url.lower().endswith((".jpg", ".jpeg", ".png")):
            print(f"⏩ Skipped non-image URL: {url}")
            return

        if os.path.exists(save_path):
            print(f"⏩ Already exists: {save_path}")
            return

        response = scraper.get(url, stream=True, timeout=15)
        if response.status_code == 200:
            with open(save_path, 'wb') as f:
                shutil.copyfileobj(response.raw, f)
            print(f"✅ Saved: {save_path}")
        else:
            print(f"❌ Failed: {url} → {response.status_code}")
    except Exception as e:
        print(f"❌ Error downloading {url}: {e}")

# ─── Check if all expected images exist ───────────────────
def all_images_downloaded(folder, num_main, num_dim):
    main_found = sum(1 for f in os.listdir(folder) if f.startswith("main_") and f.lower().endswith((".jpg", ".jpeg", ".png")))
    dim_found = sum(1 for f in os.listdir(folder) if f.startswith("dimension_") and f.lower().endswith((".jpg", ".jpeg", ".png")))
    return main_found >= num_main and dim_found >= num_dim

# ─── Main Execution ───────────────────────────────────────
with open(CSV_FILE, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)

    for row in reader:
        brand = sanitize_filename(row["Brand"])
        product = sanitize_filename(row["ProductName"])
        category = sanitize_filename(row["Category"])
        type_ = sanitize_filename(row["Type"])

        # Final folder structure: Brand/Product/Category/Type
        folder_path = os.path.join(IMAGE_ROOT, brand, product, category, type_)
        os.makedirs(folder_path, exist_ok=True)

        image_urls = row["ImageURLs"].split(";") if row.get("ImageURLs") else []
        dim_urls = row["DimensionImages"].split(";") if row.get("DimensionImages") else []

        if all_images_downloaded(folder_path, len(image_urls), len(dim_urls)):
            print(f"🔁 Skipped (all images already downloaded): {brand}/{product}/{category}/{type_}")
            continue

        print(f"\n📂 Processing: {brand}/{product}/{category}/{type_}")

        # ─── Download main images ─────
        for idx, url in enumerate(image_urls):
            if url.strip():
                filename = f"main_{idx+1}.jpg"
                save_path = os.path.join(folder_path, filename)
                download_image(url.strip(), save_path)

        # ─── Download dimension images ─────
        for idx, url in enumerate(dim_urls):
            if url.strip():
                filename = f"dimension_{idx+1}.jpg"
                save_path = os.path.join(folder_path, filename)
                download_image(url.stri p(), save_path)

# Images mapping 

In [None]:
import pandas as pd
import re
import os
from urllib.parse import urlparse

# ─── CONFIG ─────────────────────────────────────────────
INPUT_CSV = "Archi_product_scraping_moiz.csv"
OUTPUT_CSV = "MAPPED_ARCHI.csv"
DOMAIN = "https://italcasa.us"
IMAGE_FOLDER_ROOT = "images"
MAIN_IMAGE_COUNT = 5           # main_1.jpg ... main_5.jpg
MAX_DIM_IMAGES = 5             # limit mapped dimension images

# ─── SANITIZE FUNCTION ─────────────────────────────────
def sanitize(text):
    if pd.isna(text) or text is None:
        return "Unknown"
    text = str(text).strip()
    text = re.sub(r'[<>:\"/\\|?*]', "_", text)
    return text.replace(" ", "_")

# ─── CLEAN (no garbage removal) ────────────────────────
def clean_text(text):
    if pd.isna(text) or text is None:
        return ""
    return str(text).strip()

# ─── HELPERS ───────────────────────────────────────────
def split_urls(maybe_urls: str):
    """Split a string of URLs on ',', ';', '|', or whitespace."""
    if not maybe_urls or not isinstance(maybe_urls, str):
        return []
    raw = re.split(r"[,\;\|\s]+", maybe_urls.strip())
    return [u for u in (s.strip() for s in raw) if u]

# ─── MAIN ──────────────────────────────────────────────
def generate_updated_csv():
    df = pd.read_csv(INPUT_CSV)

    # Expected input columns
    required = [
        "Title", "ProductName", "producturl", "Brand", "Type", "Category",
        "Description", "tax:product_tag", "ImageURLs",
        "attribute:Dimensions:", "attribute_data:Dimensions:", "DimensionImages"
    ]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise KeyError(f"Missing required column(s): {missing}")

    out_rows = []

    for _, row in df.iterrows():
        brand = sanitize(row.get("Brand", ""))
        product = sanitize(row.get("ProductName", ""))
        category = sanitize(row.get("Category", ""))
        ptype_raw = row.get("Type", "")
        ptype = sanitize(ptype_raw) if str(ptype_raw).strip() else "Unknown"

        base_url = f"{DOMAIN}/wp-content/uploads/{IMAGE_FOLDER_ROOT}/{brand}/{product}/{category}/{ptype}"

        # Main images: main_1.jpg ... main_N.jpg
        main_urls = [f"{base_url}/main_{i}.jpg" for i in range(1, MAIN_IMAGE_COUNT + 1)]

        # Dimension images: always dimension_1.jpg, dimension_2.jpg ... (force .jpg)
        src_dim = clean_text(row.get("DimensionImages", ""))
        src_dim_urls = split_urls(src_dim)
        mapped_dim_urls = []
        for i, _ in enumerate(src_dim_urls, start=1):
            if i > MAX_DIM_IMAGES:
                break
            mapped_dim_urls.append(f"{base_url}/dimension_{i}.jpg")

        # Copy/clean other fields
        title = clean_text(row.get("Title", ""))
        productname = clean_text(row.get("ProductName", ""))
        producturl = clean_text(row.get("producturl", ""))
        brand_txt = clean_text(row.get("Brand", ""))
        type_txt = clean_text(row.get("Type", ""))
        category_txt = clean_text(row.get("Category", ""))
        desc = clean_text(row.get("Description", ""))

        # Tags column is 'tax:product_tag'
        tags_raw = clean_text(row.get("tax:product_tag", ""))
        tags_norm = tags_raw.replace(";", "|") if tags_raw else ""

        # Dimension text & data copied as-is
        dim_text = clean_text(row.get("attribute:Dimensions:", ""))
        dim_data = clean_text(row.get("attribute_data:Dimensions:", ""))

        out_rows.append({
            "Title": title,
            "ProductName": productname,
            "producturl": producturl,
            "Brand": brand_txt,
            "Type": type_txt,
            "Category": category_txt,
            "Description": desc,
            "tax:product_tag": tags_norm,
            "ImageURLs": ",".join(main_urls),
            "attribute:Dimensions:": dim_text,
            "attribute_data:Dimensions:": dim_data,
            "DimensionImages": ",".join(mapped_dim_urls)
        })

    out_cols = [
        "Title", "ProductName", "producturl", "Brand", "Type", "Category",
        "Description", "tax:product_tag", "ImageURLs",
        "attribute:Dimensions:", "attribute_data:Dimensions:", "DimensionImages"
    ]
    pd.DataFrame(out_rows, columns=out_cols).to_csv(OUTPUT_CSV, index=False)
    print(f"✅ Mapped CSV saved as '{OUTPUT_CSV}'.")

if __name__ == "__main__":
    generate_updated_csv()
