# Web Data Scraping and Cleaning Pipeline (Data Processing Pipeline)

Before starting, please ensure that all necessary dependencies are installed:
`pip install requests tqdm warcio trafilatura datasketch ray fasttext kenlm`

In [None]:
import os
import json
import gzip
import re
import time
import requests
from tqdm.notebook import tqdm

# ================= 1. Global Path Configuration =================
CURRENT_DIR = os.getcwd() 
PROJECT_ROOT = CURRENT_DIR 

DATA_DIR = os.path.join(PROJECT_ROOT, "data")
RAW_DIR = os.path.join(DATA_DIR, "raw")
PROCESSED_DIR = os.path.join(DATA_DIR, "processed")
MODEL_DIR = os.path.join(PROJECT_ROOT, "models")

# Ensure base directories exist
for d in [RAW_DIR, PROCESSED_DIR, MODEL_DIR]:
    os.makedirs(d, exist_ok=True)

print(f"üìÅ Working directory set to: {CURRENT_DIR}")
print(f"üìÅ Raw data directory: {RAW_DIR}")
print(f"üìÅ Processed data directory: {PROCESSED_DIR}")

In [None]:
# ================= 2. Download Common Crawl Data =================
CRAWL_ID = "CC-MAIN-2023-50" 
NUM_FILES_TO_DOWNLOAD = 1
BASE_URL = "https://data.commoncrawl.org"

def get_warc_file_paths(crawl_id, num_files):
    paths_url = f"{BASE_URL}/crawl-data/{crawl_id}/warc.paths.gz"
    print(f"üì° Fetching file index: {paths_url} ...")
    try:
        response = requests.get(paths_url, stream=True, timeout=10)
        response.raise_for_status()
        paths = []
        with gzip.open(response.raw, 'rt', encoding='utf-8') as f:
            for i, line in enumerate(f):
                if i >= num_files: break
                paths.append(line.strip())
        return paths
    except Exception as e:
        print(f"‚ùå Failed to fetch index: {e}")
        return []

def download_file(url, output_dir):
    local_filename = url.split('/')[-1]
    local_path = os.path.join(output_dir, local_filename)
    if os.path.exists(local_path):
        print(f"‚ö†Ô∏è File already exists, skipping: {local_filename}")
        return local_path

    print(f"‚¨áÔ∏è Starting download: {local_filename}")
    try:
        with requests.get(url, stream=True, timeout=30) as r:
            r.raise_for_status()
            total_size = int(r.headers.get('content-length', 0))
            with open(local_path, 'wb') as f, tqdm(
                desc=local_filename, total=total_size, unit='iB',
                unit_scale=True, unit_divisor=1024
            ) as bar:
                for chunk in r.iter_content(chunk_size=8192):
                    size = f.write(chunk)
                    bar.update(size)
        print(f"‚úÖ Download complete: {local_path}")
        return local_path
    except Exception as e:
        print(f"‚ùå Download failed {url}: {e}")
        if os.path.exists(local_path): os.remove(local_path)
        return None

warc_paths = get_warc_file_paths(CRAWL_ID, NUM_FILES_TO_DOWNLOAD)
if warc_paths:
    print(f"üéØ Planning to download {len(warc_paths)} files to {RAW_DIR} ...")
    for relative_path in warc_paths:
        full_url = f"{BASE_URL}/{relative_path}"
        download_file(full_url, RAW_DIR)
    print("\nüéâ Data preparation stage complete!")

In [None]:
# ================= 3. Extract WARC Content =================
from warcio.archiveiterator import ArchiveIterator
import trafilatura

LIMIT_RECORDS = 10000 
OUTPUT_FILE_STEP2 = os.path.join(PROCESSED_DIR, "extracted_data.jsonl")

def extract_text_from_warc(warc_path, output_path, limit=None):
    print(f"üöÄ Starting processing: {warc_path}")
    counter, success_count = 0, 0
    
    with open(output_path, 'w', encoding='utf-8') as out_f:
        with open(warc_path, 'rb') as stream:
            for record in tqdm(ArchiveIterator(stream), desc="Processing Records"):
                if record.rec_type == 'response':
                    content_type = record.http_headers.get_header('Content-Type')
                    if not content_type or 'text/html' not in content_type:
                        continue
                    try:
                        content = record.content_stream().read()
                    except Exception:
                        continue
                        
                    text = trafilatura.extract(
                        content, include_comments=False, 
                        include_tables=False, no_fallback=False
                    )
                    
                    if text and len(text.strip()) > 0:
                        url = record.rec_headers.get_header('WARC-Target-URI')
                        data = {"url": url, "text": text}
                        out_f.write(json.dumps(data, ensure_ascii=False) + '\n')
                        success_count += 1
                
                counter += 1
                if limit and counter >= limit: break
    
    print(f"\n‚úÖ Processing complete! Records scanned: {counter}, Successfully extracted: {success_count}")

files = [f for f in os.listdir(RAW_DIR) if f.endswith('.warc.gz')]
if files:
    input_warc_path = os.path.join(RAW_DIR, files[0])
    extract_text_from_warc(input_warc_path, OUTPUT_FILE_STEP2, LIMIT_RECORDS)
else:
    print("‚ùå No warc.gz files found in the raw directory. Please check if the previous step was successful.")

In [None]:
# ================= 4. Basic Rule-Based Cleaning =================
INPUT_FILE_STEP3 = OUTPUT_FILE_STEP2
OUTPUT_FILE_STEP3 = os.path.join(PROCESCESSED_DIR, "clean_data.jsonl")

def is_high_quality(text):
    if len(text) < 100 or len(text) > 2_000_000: return False
        
    words = text.split()
    if len(words) == 0: return False
    mean_word_len = sum(len(w) for w in words) / len(words)
    if mean_word_len > 15: return False

    code_symbols = {'{', '}', '[', ']', '<', '>', '\\'}
    symbol_count = sum(1 for char in text if char in code_symbols)
    if symbol_count / len(text) > 0.1: return False

    bad_phrases = ["lorem ipsum", "javascript is disabled", "enable cookies",
                   "403 forbidden", "404 not found", "access denied", "rights reserved"]
    
    text_lower = text.lower()
    for phrase in bad_phrases:
        if phrase in text_lower and len(text) < 500:
            return False
    return True

if os.path.exists(INPUT_FILE_STEP3):
    stats = {"total": 0, "kept": 0, "dropped": 0}
    with open(INPUT_FILE_STEP3, 'r', encoding='utf-8') as f_in, \
         open(OUTPUT_FILE_STEP3, 'w', encoding='utf-8') as f_out:
        
        for line in tqdm(f_in, desc="Cleaning Data"):
            stats["total"] += 1
            try:
                item = json.loads(line)
                if is_high_quality(item.get("text", "")):
                    f_out.write(line)
                    stats["kept"] += 1
                else:
                    stats["dropped"] += 1
            except json.JSONDecodeError: continue

    print(f"\n‚úÖ Cleaning complete! Total: {stats['total']}, Dropped: {stats['dropped']}, Kept: {stats['kept']}")
else:
    print(f"‚ùå Input file not found: {INPUT_FILE_STEP3}")

In [None]:
# ================= 5. Global Ray LSH Deduplication =================
import ray
from datasketch import MinHash, MinHashLSH

INPUT_FILE_STEP4 = OUTPUT_FILE_STEP3
OUTPUT_FILE_STEP4 = os.path.join(PROCESSED_DIR, "deduplicated_data.jsonl")

NUM_PERM = 128 
THRESHOLD = 0.8  

ray.init(ignore_reinit_error=True)

@ray.remote
def process_batch(lines, batch_id):
    results = []
    for line in lines:
        try:
            item = json.loads(line)
            m = MinHash(num_perm=NUM_PERM)
            for w in item['text'].split():
                m.update(w.encode('utf8'))
            results.append((item['url'], m, item['text']))
        except Exception:
            continue
    return results

if os.path.exists(INPUT_FILE_STEP4):
    print("üöÄ Phase 1: Parallel computing MinHash signatures...")
    with open(INPUT_FILE_STEP4, 'r', encoding='utf-8') as f:
        all_lines = f.readlines()
    
    batch_size = 1000
    batches = [all_lines[i:i + batch_size] for i in range(0, len(all_lines), batch_size)]
    futures = [process_batch.remote(batch, i) for i, batch in enumerate(batches)]
    
    processed_batches = ray.get(futures)
    results = [item for batch in processed_batches for item in batch]

    print("\nüöÄ Phase 2: Building LSH index and deduplicating...")
    lsh = MinHashLSH(threshold=THRESHOLD, num_perm=NUM_PERM)
    unique_records, duplicate_count = [], 0
    
    for url, minhash, text in tqdm(results, desc="LSH Deduplication"):
        if len(lsh.query(minhash)) > 0:
            duplicate_count += 1
        else:
            lsh.insert(url, minhash)
            unique_records.append({"url": url, "text": text})

    print(f"\n‚úÖ Deduplication complete! Duplicates found: {duplicate_count}, Valid remaining: {len(unique_records)}")
    
    with open(OUTPUT_FILE_STEP4, 'w', encoding='utf-8') as f:
        for item in unique_records:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')
    ray.shutdown()
else:
    print(f"‚ùå File not found: {INPUT_FILE_STEP4}")

In [None]:
# ================= 6. Language Identification with FastText =================
import fasttext

INPUT_FILE_STEP5 = OUTPUT_FILE_STEP4
FASTTEXT_MODEL_PATH = os.path.join(MODEL_DIR, 'lid.176.ftz')

if os.path.exists(FASTTEXT_MODEL_PATH) and os.path.exists(INPUT_FILE_STEP5):
    print(f"Loading language model: {FASTTEXT_MODEL_PATH}")
    fasttext.FastText.eprint = lambda x: None
    model = fasttext.load_model(FASTTEXT_MODEL_PATH)
    
    files_out = {
        'en': open(os.path.join(PROCESSED_DIR, 'data_en.jsonl'), 'w', encoding='utf-8'),
        'zh': open(os.path.join(PROCESSED_DIR, 'data_zh.jsonl'), 'w', encoding='utf-8'),
        'others': open(os.path.join(PROCESSED_DIR, 'data_others.jsonl'), 'w', encoding='utf-8')
    }

    count = 0
    with open(INPUT_FILE_STEP5, 'r', encoding='utf-8') as f:
        for line in tqdm(f, desc="Splitting Languages"):
            try:
                data = json.loads(line)
                text = data.get('text', '').replace('\n', ' ')
                if not text: continue

                predictions = model.predict(text, k=1) 
                lang = predictions[0][0].replace('__label__', '')
                
                if lang == 'en': files_out['en'].write(json.dumps(data, ensure_ascii=False) + '\n')
                elif lang == 'zh': files_out['zh'].write(json.dumps(data, ensure_ascii=False) + '\n')
                else:
                    data['detected_lang'] = lang 
                    files_out['others'].write(json.dumps(data, ensure_ascii=False) + '\n')
                count += 1
            except Exception as e:
                pass

    for f in files_out.values(): f.close()
    print("‚úÖ Processing complete! Generated data_en.jsonl, data_zh.jsonl, and data_others.jsonl")
else:
    print(f"‚ùå FastText model file or input file not found. Please verify.")

In [None]:
# ================= 7. KenLM-Based Quality Filtering =================
import kenlm

INPUT_FILE_STEP6 = os.path.join(PROCESSED_DIR, "data_en.jsonl")
OUTPUT_FILE_STEP6 = os.path.join(PROCESSED_DIR, "final_data.jsonl")
KENLM_MODEL_PATH = os.path.join(MODEL_DIR, "en.arpa.bin")
PERPLEXITY_THRESHOLD = -6.0

if os.path.exists(INPUT_FILE_STEP6) and os.path.exists(KENLM_MODEL_PATH):
    print(f"üöÄ Loading KenLM model: {KENLM_MODEL_PATH} ...")
    lm_model = kenlm.Model(KENLM_MODEL_PATH)
    
    stats = {"total": 0, "kept": 0, "dropped": 0}
    print(f"üîÑ Starting quality filtering (Threshold: {PERPLEXITY_THRESHOLD})...")
    
    with open(INPUT_FILE_STEP6, 'r', encoding='utf-8') as f_in, \
         open(OUTPUT_FILE_STEP6, 'w', encoding='utf-8') as f_out:
        
        for line in tqdm(f_in, desc="KenLM Filtering"):
            stats["total"] += 1
            try:
                item = json.loads(line)
                text = item.get("text", "")
                words = text.split()
                num_words = len(words)
                
                if num_words < 3:
                    stats["dropped"] += 1
                    continue

                normalized_score = lm_model.score(text) / num_words
                
                if normalized_score > PERPLEXITY_THRESHOLD:
                    item["perplexity_score"] = normalized_score
                    f_out.write(json.dumps(item, ensure_ascii=False) + '\n')
                    stats["kept"] += 1
                else:
                    stats["dropped"] += 1
            except Exception:
                continue

    print("\nüéâ Pipeline execution finished!")
    print(f"   Total Input: {stats['total']}, Kept (High Quality): {stats['kept']}, Dropped: {stats['dropped']}")
else:
    print(f"‚ùå Input data or KenLM model not found: {KENLM_MODEL_PATH}.")