In [None]:
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True)
import json
from bson import ObjectId
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.webdriver.support.ui import Select
import polars as pl
from pymongo import MongoClient
import time
from bs4 import BeautifulSoup
import duckdb


MONGO_URI = "mongodb+srv://bulunangkostudios:RIIfhxNycTIe1k1O@bulunangko1.q7isr.mongodb.net/?retryWrites=true&w=majority&appName=bulunangko1"
DB_NAME = "scraping"
SOURCE_COLLECTION = "idx_list_perusahaan_tercatat"
TARGET_COLLECTION = "idx_list_perusahaan_tercatat_profile"

CLIENT = MongoClient(
    MONGO_URI,
    serverSelectionTimeoutMS=30000,
    connectTimeoutMS=30000,
    socketTimeoutMS=180000
)



# Inisialisasi driver
def initialize_driver():
    logging.info("Initializing WebDriver")
    edge_options = EdgeOptions()
    edge_options.add_argument("--headless=new")  # Mode headless terbaru
    edge_options.add_argument("--window-size=1920,1080")
    edge_options.add_argument("--disable-gpu")
    edge_options.add_argument("--disable-blink-features=AutomationControlled")
    edge_options.add_argument(
        "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
    )
    
    # Gunakan context manager untuk manajemen resource driver
    service = EdgeService(executable_path=r"D:\driver\msedgedriver.exe")
    return webdriver.Edge(service=service, options=edge_options)


def scroll_to_bottom(driver, pause_time=0.5):
    """Scroll bertahap sampai tidak ada perubahan tinggi halaman."""
    last_height = driver.execute_script("return document.body.scrollHeight")
    while True:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(pause_time)
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height:
            break
        last_height = new_height

def get_web_html_content(driver, url, timeout=30):
    driver.get(url)
    scroll_to_bottom(driver)
    logging.info("Halaman sudah di-scroll penuh.")
    try:
        select_elem = WebDriverWait(driver, 5).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, 'select[id^="vgt-select-rpp-"]'))
        )
        logging.info("Dropdown pagination ditemukan.")
        # Tunggu sampai opsi value "-1" tersedia
        WebDriverWait(driver, 5).until(
            lambda d: any(opt.get_attribute("value") == "-1" for opt in select_elem.find_elements(By.TAG_NAME, "option"))
        )

        select = Select(select_elem)
        select.select_by_value("-1")
        logging.info("Opsi 'All' berhasil dipilih.")

        # Tunggu loading tabel selesai
        try:
            WebDriverWait(driver, timeout).until(
                EC.invisibility_of_element_located((By.CSS_SELECTOR, ".vgt-loading"))
            )
            logging.info("Tabel selesai dimuat ulang.")
        except:
            logging.warning("Tidak ada indikator loading, lanjut.")

        # Ambil seluruh struktur HTML
        html_content = driver.page_source
        logging.info(f"Panjang HTML: {len(html_content)} karakter")

        return html_content  # return biar bisa dipakai di luar fungsi

    except Exception as e:
        logging.error(f"Gagal memilih 'All': {e}")
        return None


def clean_and_standardize_company_data(json_data):
    def clean_key(key):
        return key.replace(' ', '_').lower()

    def clean_number_string(s):
        s = s.replace('.', '').replace(',', '.').strip()
        return s

    def convert_to_boolean(s):
        s = s.strip().lower()
        if s in ['ya', 'y']:
            return True
        elif s in ['tidak', 't', 'no', 'n']:
            return False
        return None

    cleaned_data = {}
    
    # Memproses data utama
    for key, value in json_data.items():
        new_key = clean_key(key)
        if isinstance(value, str):
            value = value.strip()
            if value.endswith('%'):
                try:
                    value = float(clean_number_string(value.strip('%')))
                except ValueError:
                    value = value.strip('%') # Biarkan string jika tidak bisa diubah
            elif new_key in ['jumlah', 'aset_total']:
                try:
                    value = int(clean_number_string(value).replace('.', ''))
                except ValueError:
                    pass # Biarkan string jika tidak bisa diubah
            elif new_key in ['terafiliasi', 'independen']:
                value = convert_to_boolean(value)
            elif value == "":
                value = None
        cleaned_data[new_key] = value

    # Memproses list dari dictionary
    for list_key in ['secretary', 'director', 'committee', 'shareholders', 'subsidiary', 'public_accountant', 'commissioners']:
        if list_key in json_data and isinstance(json_data[list_key], list):
            cleaned_list = []
            for item in json_data[list_key]:
                cleaned_item = {}
                for key, value in item.items():
                    new_key = clean_key(key)
                    if isinstance(value, str):
                        value = value.strip()
                        if value.endswith('%'):
                            try:
                                value = float(clean_number_string(value.strip('%')))
                            except ValueError:
                                value = value.strip('%')
                        elif new_key in ['jumlah', 'aset_total']:
                            try:
                                value = int(clean_number_string(value).replace('.', ''))
                            except ValueError:
                                pass
                        elif new_key in ['terafiliasi', 'independen']:
                            value = convert_to_boolean(value)
                        elif value == "":
                            value = None
                    cleaned_item[new_key] = value
                cleaned_list.append(cleaned_item)
            cleaned_data[list_key] = cleaned_list

    return cleaned_data

def get_company_profile(html_web):
    soup = BeautifulSoup(html_web, "html.parser")
    section = soup.find("section", class_="container mb-48")
    data_dict = {}

    if section:
        tables = section.find_all("table")
        for table in tables:
            for tr in table.find_all("tr"):
                tds = tr.find_all("td")
                if len(tds) >= 3:
                    label = tds[0].get_text(strip=True)
                    value = tds[2].get_text(strip=True)
                    data_dict[label] = value
    else:
        print("Section profil perusahaan tidak ditemukan")

    # Gunakan polars untuk membuat DataFrame dari dictionary
    df =  pl.DataFrame([data_dict])
    return duckdb.query("select distinct * from df").pl()

def get_table(html_web, id_table):
    soup = BeautifulSoup(html_web, "html.parser")

    # Cari elemen berdasarkan id (case-insensitive)
    section = soup.find(attrs={"id": lambda x: x and x.lower() == id_table.lower()})
    if not section:
        logging.info(f"Bagian dengan id={id_table} tidak ditemukan")
        # Minimal kolom dummy agar DuckDB tidak error
        df = pl.DataFrame({"_empty": []})
        result = duckdb.query("select distinct * from df").pl()
        return result.drop("_empty")

    df = pl.DataFrame({"_empty": []})  # Default dummy dataframe

    table = section.find_next("table")
    if table:
        # Tipe tabel dengan header & body
        if id_table in [
            "Subsidiary", 
            "Shareholders", 
            "Committee", 
            "Comissioners", 
            "Director",
            "PublicAccountant"
        ]:
            try:
                headers = [
                    th.get_text(strip=True) 
                    for th in table.find("thead").find_all("th")
                ]
                rows = []
                for tr in table.find("tbody").find_all("tr"):
                    tds = tr.find_all("td")
                    # Skip baris tidak sesuai schema
                    if len(tds) != len(headers):
                        if any("tidak ditemukan" in td.get_text(strip=True).lower() for td in tds):
                            rows = []  # kosongkan
                            break
                        continue
                    rows.append([td.get_text(strip=True) for td in tds])

                if rows and headers:
                    df = pl.DataFrame(rows, schema=headers)
                elif headers:  # header ada tapi data kosong
                    df = pl.DataFrame(schema=headers)

            except Exception as e:
                logging.error(f"Error saat mengambil tabel {id_table}: {e}")
                print(table.prettify())

        # Tipe tabel Secretary (label-value)
        elif id_table == "Secretary":
            try:
                data_dict = {}
                for tr in table.find_all("tr"):
                    tds = tr.find_all("td")
                    if len(tds) >= 3:
                        data_dict[tds[0].get_text(strip=True)] = tds[2].get_text(strip=True)
                df = pl.DataFrame([data_dict]) if data_dict else pl.DataFrame({"_empty": []})
            except Exception as e:
                logging.error(f"Error saat mengambil tabel {id_table}: {e}")

    else:
        logging.info(f"Tabel tidak ditemukan setelah id={id_table}")
        df = pl.DataFrame({"_empty": []})

    # Jalankan distinct lewat DuckDB
    result = duckdb.query("select distinct * from df").pl()

    # Bersihkan kolom dummy jika tidak ada data sebenarnya
    if "_empty" in result.columns:
        result = result.drop("_empty")

    return result

def scrape_idx_list_profile(driver, url):
    logging.info(f"Scraping URL: {url}")
    html_web = get_web_html_content(driver, url)
    # Pemanggilan fungsi tetap sama
    logging.info("Mengambil profil perusahaan...")
    company_info_df = get_company_profile(html_web)
    logging.info("Mengambil tabel Secretary")
    Secretary_section = get_table(html_web, "Secretary")
    logging.info("Mengambil tabel Director")
    Director_section = get_table(html_web, "Director")
    logging.info("Mengambil tabel Comissioners")
    Comissioners_section = get_table(html_web, "Comissioners")
    logging.info("Mengambil tabel Committee")
    Committee_section = get_table(html_web, "Committee")
    logging.info("Mengambil tabel Shareholders")
    Shareholders_section = get_table(html_web, "Shareholders")
    logging.info("Mengambil tabel Subsidiary")
    Subsidiary_section = get_table(html_web, "Subsidiary")
    logging.info("Mengambil tabel PublicAccountant")
    PublicAccountant_section = get_table(html_web, "PublicAccountant")


    logging.info("Mengubah struktur DataFrame untuk Comissioners_section")
    rows = [Comissioners_section[col].to_list() for col in Comissioners_section.columns]
    Comissioners_section = pl.DataFrame(list(map(list, zip(*rows))), schema=Comissioners_section.columns)
    # Tampilkan hasil (gunakan print() atau display() di environment Anda)


    # Ambil baris pertama company_info_df jadi dict
    logging.info("Menyatukan informasi perusahaan dan sub-seksi")
    company_info_dict = company_info_df.to_dicts()[0]
    
    # Tambahkan sub-seksi
    company_info_dict["secretary"] = Secretary_section.to_dicts()
    company_info_dict["director"] = Director_section.to_dicts()
    company_info_dict["commissioners"] = Comissioners_section.to_dicts()
    company_info_dict["committee"] = Committee_section.to_dicts()
    company_info_dict["shareholders"] = Shareholders_section.to_dicts()
    company_info_dict["subsidiary"] = Subsidiary_section.to_dicts()
    company_info_dict["public_accountant"] = PublicAccountant_section.to_dicts()


    # Panggil fungsi untuk membersihkan data
    logging.info("Membersihkan dan menstandarkan data perusahaan")
    cleaned_data = clean_and_standardize_company_data(company_info_dict)
  
    return cleaned_data




# def get_mongo_collection(mongo_uri):
#     client = MongoClient(mongo_uri)
#     return client#[db_name][collection_name]

def process_data():
    # Konfigurasi MongoDB
    logging.info("Connecting to MongoDB")
    

    
    db = CLIENT[DB_NAME]
    source_coll = db[SOURCE_COLLECTION]
    
    

    # Ambil data langsung dari MongoDB tanpa Polars
    pending_docs = list(source_coll.find(
        {"fl_get_profil": False}, 
        {"_id": 1, "profil_url": 1}
    )#.limit(3) # Batasi jumlah dokumen
    )  
    
    if not pending_docs:
        logging.info("No pending documents found")
        return

    logging.info(f"Found {len(pending_docs)} documents to process")

  
    
    with initialize_driver() as driver:
        document = []
        for doc in pending_docs:
            doc_id = doc["_id"]
            url = doc["profil_url"]
            logging.info(f"Processing document ID: {doc_id}")
            
            data = scrape_idx_list_profile(driver, url)
            if not data:
                continue
                
            try:
                # display(data)  # Tampilkan data yang sudah dibersihkan
                data['ref_id'] = doc_id
                document.append(data)  # Simpan data ke list

                
            except Exception as e:
                logging.error(f"Error processing document {doc_id}: {e}")

        return document


        
if __name__ == "__main__":
    start_time = time.time()
    document = process_data()
    logging.info(f"Process completed in {time.time() - start_time:.2f} seconds")


2025-09-10 16:42:59,574 - INFO - Connecting to MongoDB
2025-09-10 16:43:00,192 - INFO - Found 954 documents to process
2025-09-10 16:43:00,193 - INFO - Initializing WebDriver
2025-09-10 16:43:01,605 - INFO - Processing document ID: 68c1411455559c3c01ab6bc7
2025-09-10 16:43:01,606 - INFO - Scraping URL: https://www.idx.co.id/id/perusahaan-tercatat/profil-perusahaan-tercatat/AADI
2025-09-10 16:43:04,725 - INFO - Halaman sudah di-scroll penuh.
2025-09-10 16:43:04,750 - INFO - Dropdown pagination ditemukan.
2025-09-10 16:43:04,916 - INFO - Opsi 'All' berhasil dipilih.
2025-09-10 16:43:04,931 - INFO - Tabel selesai dimuat ulang.
2025-09-10 16:43:04,955 - INFO - Panjang HTML: 322884 karakter
2025-09-10 16:43:04,956 - INFO - Mengambil profil perusahaan...
2025-09-10 16:43:05,037 - INFO - Mengambil tabel Secretary
2025-09-10 16:43:05,117 - INFO - Mengambil tabel Director
  df = pl.DataFrame(rows, schema=headers)
2025-09-10 16:43:05,196 - INFO - Mengambil tabel Comissioners
2025-09-10 16:43:05,

In [5]:
import time

for d in document:
    if "_id" in d:
        del d["_id"]

collection = CLIENT[DB_NAME][TARGET_COLLECTION]
try:
    clean_documents = json.loads(json.dumps(document, default=str))
    # insert
    result = collection.insert_many(clean_documents)
    print("Berhasil insert bulk. ID dokumen:")
    print(result.inserted_ids)
except Exception as e:
    print("Error saat insert:", e)
# display(document)
# display(clean_documents)




Berhasil insert bulk. ID dokumen:
[ObjectId('68c1479c31fff83545fc6718'), ObjectId('68c1479c31fff83545fc6719'), ObjectId('68c1479c31fff83545fc671a')]
