In [None]:
import os
import re
import asyncio
import aiohttp
import aiofiles
import requests
import zipfile
import logging
from bs4 import BeautifulSoup
from tqdm import tqdm
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings

from langchain.vectorstores import Chroma
import torch

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [None]:
def download_and_extract():
    """
    Downloads a ZIP file containing legal documents from the given URL,
    extracts its contents, and deletes the ZIP file afterward.
    """
    url = "https://phapdien.moj.gov.vn/TraCuuPhapDien/Files/BoPhapDienDienTu.zip"
    zip_file = 'BoPhapDienDienTu.zip'
    extract_to = 'BoPhapDienDienTu'
    
    response = requests.get(url)
    with open(zip_file, 'wb') as f:
        f.write(response.content)
        
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    
    os.remove(zip_file)
    logging.info("ZIP file downloaded and extracted successfully")

download_and_extract()

In [None]:
base_dir = 'BoPhapDienDienTu'
subdirs = ['vbpl', 'property', 'history', 'related', 'pdf']
for sub in subdirs:
    os.makedirs(os.path.join(base_dir, sub), exist_ok=True)

In [None]:
async def fetch_with_retry(session, url, retries=3, delay=1):
    """
    Asynchronously fetches data from a given URL with retry logic.
    
    Args:
        session (aiohttp.ClientSession): The active HTTP session.
        url (str): The target URL to fetch data from.
        retries (int, optional): Number of retry attempts. Defaults to 3.
        delay (int, optional): Delay in seconds between retries. Defaults to 1.
    
    Returns:
        str or bytes: The response text or binary data, or None on failure.
    """
    for attempt in range(retries):
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.read() if 'pdf' in url else await response.text()
                elif response.status == 429:
                    await asyncio.sleep(delay * (attempt + 1))
                    continue
        except Exception as e:
            if attempt == retries - 1:
                logging.error(f"Failed to fetch {url}: {e}")
                return None
            await asyncio.sleep(delay)
    return None

In [None]:
async def save_content(content, save_path):
    """
    Asynchronously saves content to a file.
    
    Args:
        content (str or bytes): The content to be saved. If None, the function exits.
        save_path (str): The file path where the content will be saved.
    """
    if content is None:
        return
    try:
        mode = 'wb' if isinstance(content, bytes) else 'w'
        encoding = None if isinstance(content, bytes) else 'utf-8'
        async with aiofiles.open(save_path, mode=mode, encoding=encoding) as f:
            await f.write(content)
    except Exception as e:
        logging.error(f"Error saving {save_path}: {e}")

In [None]:
async def process_item_id(session, item_id):
    """
    Asynchronously processes a single item ID by fetching and saving its content.
    
    Args:
        session (aiohttp.ClientSession): The active HTTP session.
        item_id (str): The item ID to process.
    """
    urls = {
        'vbpl': f"https://vbpl.vn/TW/Pages/vbpq-toanvan.aspx?ItemID={item_id}&Keyword=",
        'property': f"https://vbpl.vn/tw/Pages/vbpq-thuoctinh.aspx?dvid=13&ItemID={item_id}&Keyword=",
        'history': f"https://vbpl.vn/tw/Pages/vbpq-lichsu.aspx?dvid=13&ItemID={item_id}&Keyword=",
        'related': f"https://vbpl.vn/TW/Pages/vbpq-vanbanlienquan.aspx?ItemID={item_id}&Keyword=",
        'pdf': f"https://vbpl.vn/tw/Pages/vbpq-van-ban-goc.aspx?ItemID={item_id}"
    }
    
    tasks = []
    for doc_type, url in urls.items():
        prefix = {'vbpl': 'full_', 'property': 'p_', 'history': 'h_', 'related': 'r_', 'pdf': 'pdf_'}[doc_type]
        ext = '.pdf' if doc_type == 'pdf' else '.html'
        save_path = os.path.join(base_dir, doc_type, f"{prefix}{item_id}{ext}")
        
        if not os.path.exists(save_path):
            content = await fetch_with_retry(session, url)
            if content:
                tasks.append(save_content(content, save_path))
    
    if tasks:
        await asyncio.gather(*tasks)
        await asyncio.sleep(0.1)

In [None]:
async def process_index_file(session, index_path):
    """
    Asynchronously processes an index file by extracting item IDs and processing each one.
    
    Args:
        session (aiohttp.ClientSession): The active HTTP session.
        index_path (str): The path to the index file to process.
    """
    try:
        async with aiofiles.open(index_path, 'r', encoding='utf-8') as f:
            content = await f.read()
        
        soup = BeautifulSoup(content, 'html.parser')
        item_ids = set()
        for link in soup.find_all('a', href=re.compile(r"ItemID=")):
            match = re.search(r"ItemID=(\d+)", link['href'])
            if match:
                item_ids.add(match.group(1))
        
        tasks = [process_item_id(session, item_id) for item_id in item_ids]
        await asyncio.gather(*tasks)
        
    except Exception as e:
        logging.error(f"Error processing {index_path}: {e}")

In [None]:
async def main():
    """
    Asynchronous entry point for the crawler.
    
    This function processes all index files in the 'demuc' directory.
    """
    demuc_dir = os.path.join(base_dir, "demuc")
    index_files = [f for f in os.listdir(demuc_dir) if f.endswith(".html")]
    
    connector = aiohttp.TCPConnector(limit=5)
    timeout = aiohttp.ClientTimeout(total=300)
    
    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        for index_file in tqdm(index_files, desc="Processing index files"):
            await process_index_file(session, os.path.join(demuc_dir, index_file))

# Run the crawler
await main()