In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m14.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl 

In [None]:
from datasets import load_dataset, get_dataset_config_names, load_from_disk
import pandas as pd
import numpy as np
import os
import time
import json
from datetime import datetime
from tqdm import tqdm
import shutil
from google.colab import drive
import sys
import gc

# Mount Google Drive
try:
    drive.mount('/content/drive')
    DRIVE_PATH = "/content/drive/MyDrive/amazon_reviews_backup"
    os.makedirs(DRIVE_PATH, exist_ok=True)
    print("\n" + "="*50)
    print("✓ Google Drive mounted successfully")
    print("✓ Backup directory:", DRIVE_PATH)
    print("="*50 + "\n")
except Exception as e:
    print("✗ Error mounting Google Drive:", str(e))
    sys.exit(1)

# Track total space freed
TOTAL_SPACE_FREED = 0

def load_progress():
    """Load progress from Drive"""
    progress_file = f'{DRIVE_PATH}/download_progress.json'
    if os.path.exists(progress_file):
        with open(progress_file, 'r') as f:
            return json.load(f)
    return {'completed': [], 'total_reviews': 0}

def save_progress(completed_categories, total_reviews):
    """Save progress to Drive"""
    progress_file = f'{DRIVE_PATH}/download_progress.json'
    with open(progress_file, 'w') as f:
        json.dump({
            'completed': completed_categories,
            'total_reviews': total_reviews,
            'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }, f)

def clear_memory():
    """Clear memory and run garbage collection"""
    gc.collect()
    if 'torch' in sys.modules:
        import torch
        torch.cuda.empty_cache()

def print_status(message, is_header=False):
    """Print formatted status messages"""
    if is_header:
        print("\n" + "="*50)
        print(message)
        print("="*50)
    else:
        print(message)

def check_storage_usage():
    """Check current storage usage"""
    print_status("STORAGE USAGE", is_header=True)

    # Check original dataset files
    dataset_size = 0
    if os.path.exists('amazon_reviews'):
        dataset_size = sum(
            sum(os.path.getsize(os.path.join(dirpath, filename))
                for filename in filenames)
            for dirpath, dirnames, filenames in os.walk('amazon_reviews')
        ) / (1024**3)  # Convert to GB
        print(f"Original datasets: {dataset_size:.1f}GB")

    # Check processed files
    parquet_size = 0
    if os.path.exists('amazon_reviews_processed'):
        parquet_size = sum(
            os.path.getsize(os.path.join('amazon_reviews_processed', f))
            for f in os.listdir('amazon_reviews_processed')
        ) / (1024**3)  # Convert to GB
        print(f"Processed parquet files: {parquet_size:.1f}GB")

    print(f"Total space freed so far: {TOTAL_SPACE_FREED:.1f}GB")
    return dataset_size, parquet_size

def verify_parquet_integrity(file_path):
    """Verify parquet file is complete and readable"""
    try:
        with pd.read_parquet(file_path, columns=['rating']) as df:
            return len(df) > 0
    except Exception as e:
        print(f"Integrity check failed: {str(e)}")
        return False

def cleanup_category(category):
    """Clean up local files for a category after successful backup"""
    global TOTAL_SPACE_FREED
    cleanup_info = {
        'dataset_deleted': False,
        'dataset_size': 0,
        'parquet_deleted': False,
        'parquet_size': 0
    }

    try:
        print(f"\nDEBUG - Starting cleanup for {category}")

        # Remove original dataset if it exists
        dataset_path = f'amazon_reviews/{category}.dataset'
        if os.path.exists(dataset_path):
            print(f"Found dataset at: {dataset_path}")
            if os.path.isdir(dataset_path):
                cleanup_info['dataset_size'] = sum(
                    os.path.getsize(os.path.join(dirpath, filename))
                    for dirpath, dirnames, filenames in os.walk(dataset_path)
                ) / (1024**3)
                print(f"Dataset size before deletion: {cleanup_info['dataset_size']:.1f}GB")
                shutil.rmtree(dataset_path)
                cleanup_info['dataset_deleted'] = True
                TOTAL_SPACE_FREED += cleanup_info['dataset_size']
            else:
                cleanup_info['dataset_size'] = os.path.getsize(dataset_path) / (1024**3)
                os.remove(dataset_path)
                cleanup_info['dataset_deleted'] = True
                TOTAL_SPACE_FREED += cleanup_info['dataset_size']
            print(f"✓ Removed original dataset for {category}")

        # Remove local parquet if it exists
        parquet_path = f'amazon_reviews_processed/{category}.parquet'
        if os.path.exists(parquet_path):
            print(f"Found parquet at: {parquet_path}")
            cleanup_info['parquet_size'] = os.path.getsize(parquet_path) / (1024**3)
            print(f"Parquet size before deletion: {cleanup_info['parquet_size']:.1f}GB")
            os.remove(parquet_path)
            cleanup_info['parquet_deleted'] = True
            TOTAL_SPACE_FREED += cleanup_info['parquet_size']
            print(f"✓ Removed local parquet for {category}")

        total_freed = cleanup_info['dataset_size'] + cleanup_info['parquet_size']
        print(f"DEBUG - Cleanup complete. Freed {total_freed:.1f}GB")
        print(f"Total space freed so far: {TOTAL_SPACE_FREED:.1f}GB")
        check_storage_usage()

        return cleanup_info

    except Exception as e:
        print(f"✗ Error during cleanup for {category}: {str(e)}")
        return cleanup_info

def backup_to_drive(category, is_success=True):
    """Backup parquet file to Google Drive with integrity check"""
    try:
        if is_success:
            source = f'amazon_reviews_processed/{category}.parquet'
            temp_dest = f'{DRIVE_PATH}/temp_{category}.parquet'
            final_dest = f'{DRIVE_PATH}/{category}.parquet'

            # Copy to temporary location first
            shutil.copy2(source, temp_dest)

            # Verify integrity
            if verify_parquet_integrity(temp_dest):
                if os.path.exists(final_dest):
                    os.remove(final_dest)
                os.rename(temp_dest, final_dest)
                print(f"✓ Backed up {category} to Drive")
            else:
                if os.path.exists(temp_dest):
                    os.remove(temp_dest)
                raise Exception("Backup verification failed")

        # Always backup the log file
        log_files = [f for f in os.listdir('.') if f.startswith('download_log_')]
        if log_files:
            latest_log = max(log_files)  # Get most recent log
            shutil.copy2(latest_log, f'{DRIVE_PATH}/{latest_log}')

    except Exception as e:
        print(f"✗ Error backing up to Drive: {str(e)}")
        return False
    return True

def check_disk_space():
    """Check available disk space"""
    total, used, free = shutil.disk_usage("/")
    free_gb = free // (2**30)
    return free_gb

def get_review_categories():
    """Get list of all available review categories"""
    print("Fetching available categories...")
    configs = get_dataset_config_names("McAuley-Lab/Amazon-Reviews-2023")
    review_categories = [
        config.replace('raw_review_', '')
        for config in configs
        if config.startswith('raw_review_')
    ]
    return review_categories

def verify_parquet_files():
    """Verify all raw parquet files are complete and readable"""
    print_status("VERIFYING FILES", is_header=True)

    # Get all parquet files in the backup directory
    parquet_files = [f for f in os.listdir(DRIVE_PATH) if f.endswith('.parquet')]

    print(f"Checking raw files in: {DRIVE_PATH}")
    for parquet_file in parquet_files:
        file_path = f"{DRIVE_PATH}/{parquet_file}"
        try:
            df = pd.read_parquet(file_path)
            print(f"✓ {parquet_file}: {len(df):,} reviews")
        except Exception as e:
            print(f"✗ Error reading {parquet_file}: {str(e)}")

    return True

def check_progress():
    """Check current progress across all locations with improved reporting"""
    print_status("CHECKING CURRENT PROGRESS", is_header=True)

    # Load saved progress
    progress = load_progress()
    completed_categories = set(progress['completed'])
    if completed_categories:
        print(f"Found saved progress - {len(completed_categories)} categories completed")
        print(f"Total reviews processed: {progress['total_reviews']:,}")
        print(f"Last updated: {progress.get('last_updated', 'Unknown')}")

    # Check Drive backup
    drive_files = set()
    if os.path.exists(DRIVE_PATH):
        drive_files = {
            f.replace('.parquet', '')
            for f in os.listdir(DRIVE_PATH)
            if f.endswith('.parquet') and not f.startswith('temp_')
        }
        print(f"\nFound in Drive backup: {len(drive_files)} categories")

    # Check local processed files
    local_files = set()
    if os.path.exists('amazon_reviews_processed'):
        local_files = {
            f.replace('.parquet', '')
            for f in os.listdir('amazon_reviews_processed')
            if f.endswith('.parquet')
        }
        print(f"Found in local processed: {len(local_files)} categories")

    # Check original datasets
    dataset_files = set()
    if os.path.exists('amazon_reviews'):
        dataset_files = {
            f.replace('.dataset', '')
            for f in os.listdir('amazon_reviews')
            if f.endswith('.dataset')
        }
        print(f"Found original datasets: {len(dataset_files)} categories")

    # Get total categories
    all_categories = set(get_review_categories())
    remaining = all_categories - drive_files

    # Identify inconsistencies
    inconsistencies = drive_files - completed_categories
    if inconsistencies:
        print("\nWARNING: Found files in Drive not marked as completed:")
        for cat in inconsistencies:
            print(f"- {cat}")

    print("\nSUMMARY:")
    print(f"Total categories: {len(all_categories)}")
    print(f"Completed: {len(all_categories - remaining)}")
    print(f"Remaining: {len(remaining)}")

    return drive_files, local_files, dataset_files, remaining

def process_category(category, needed_columns, is_new_download=False):
    """Process a single category with improved error handling and retries"""
    max_retries = 3

    for attempt in range(max_retries):
        try:
            # Check disk space before processing
            free_gb = check_disk_space()
            if free_gb < 20:  # Set minimum required space to 20GB
                print_status("WARNING: LOW DISK SPACE", is_header=True)
                print(f"Only {free_gb}GB remaining")
                print("Waiting for user input...")
                response = input("Continue anyway? (yes/no): ")
                if response.lower() != 'yes':
                    print("Stopping process to prevent disk space issues")
                    return 0

            print(f"\nDEBUG - Starting processing for {category} (Attempt {attempt + 1}/{max_retries})")

            if is_new_download:
                print("Downloading new dataset...")
                config_name = f"raw_review_{category}"
                dataset = load_dataset("McAuley-Lab/Amazon-Reviews-2023",
                                     config_name,
                                     trust_remote_code=True)
                ds = dataset['full']
            else:
                print("Loading existing dataset...")
                ds = load_from_disk(f'amazon_reviews/{category}.dataset')['full']

            print("Converting to pandas DataFrame...")
            df = pd.DataFrame({
                col: ds[col]
                for col in needed_columns
                if col in ds.column_names
            })

            df['category'] = category

            print("Saving as parquet...")
            os.makedirs('amazon_reviews_processed', exist_ok=True)
            output_file = f'amazon_reviews_processed/{category}.parquet'
            df.to_parquet(output_file)

            num_reviews = len(df)
            print(f"Processed {num_reviews:,} reviews")

            # Clear memory
            del ds
            del df
            clear_memory()

            print("Backing up to Drive...")
            if backup_to_drive(category):
                print("\nVerifying backup and cleaning up...")
                cleanup_result = cleanup_category(category)
                if cleanup_result['dataset_deleted'] or cleanup_result['parquet_deleted']:
                    freed = cleanup_result['dataset_size'] + cleanup_result['parquet_size']
                    print(f"✓ Successfully cleaned up {freed:.1f}GB")
                else:
                    print("! No files were deleted")
                return num_reviews
            else:
                if attempt < max_retries - 1:
                    print("Backup failed, retrying...")
                    continue
                else:
                    print("All backup attempts failed")
                    return 0

        except Exception as e:
            print(f"Error on attempt {attempt + 1}: {str(e)}")
            if attempt < max_retries - 1:
                print(f"Retrying in 5 seconds...")
                time.sleep(5)
            else:
                print(f"All attempts failed for {category}")
                backup_to_drive(category, is_success=False)
                return 0

def transition_and_continue():
    """Main function with improved error handling and progress tracking"""
    start_time = time.time()

    # Load previous progress
    progress = load_progress()
    total_reviews = progress['total_reviews']
    completed = set(progress['completed'])

    print_status("STARTING PROCESSING", is_header=True)

    # Check current progress
    drive_files, local_files, dataset_files, remaining = check_progress()

    needed_columns = {
        'text', 'rating', 'asin', 'title', 'helpful_vote'
    }

    os.makedirs('amazon_reviews_processed', exist_ok=True)
    log_filename = f"download_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"

    try:
        # First, convert existing .dataset files
        if dataset_files:
            print_status(f"\nProcessing {len(dataset_files)} existing downloads...", is_header=True)

            for category in dataset_files:
                if category in completed:
                    print(f"Skipping {category} - already completed")
                    continue

                print(f"\nConverting {category}...")
                category_start = time.time()

                num_reviews = process_category(category, needed_columns, is_new_download=False)
                if num_reviews > 0:
                    completed.add(category)
                    total_reviews += num_reviews
                    save_progress(list(completed), total_reviews)

                    elapsed = time.time() - category_start
                    log_message = (
                        f"✓ Converted {category} in {elapsed:.1f} seconds\n"
                        f"  Reviews: {num_reviews:,}\n"
                        f"  Running total: {total_reviews:,} reviews"
                    )
                    print(log_message)

                    with open(log_filename, 'a') as f:
                        f.write(f"{datetime.now()}: {log_message}\n")

                clear_memory()

        # Continue with remaining categories
        if remaining:
            print_status(f"\nProcessing {len(remaining)} remaining categories...", is_header=True)

            for idx, category in enumerate(remaining, 1):
                if category in completed:
                    print(f"Skipping {category} - already completed")
                    continue

                print(f"\n[{idx}/{len(remaining)}] Processing {category}...")
                category_start = time.time()

                num_reviews = process_category(category, needed_columns, is_new_download=True)
                if num_reviews > 0:
                    completed.add(category)
                    total_reviews += num_reviews
                    save_progress(list(completed), total_reviews)

                    elapsed = time.time() - category_start
                    log_message = (
                        f"✓ Completed {category} in {elapsed:.1f} seconds\n"
                        f"  Reviews: {num_reviews:,}\n"
                        f"  Running total: {total_reviews:,} reviews"
                    )
                    print(log_message)

                    with open(log_filename, 'a') as f:
                        f.write(f"{datetime.now()}: {log_message}\n")

                if idx % 5 == 0:
                    print_status("\nPROGRESS UPDATE", is_header=True)
                    elapsed_total = (time.time() - start_time) / 60
                    print(f"Completed {idx}/{len(remaining)} new categories")
                    print(f"Total time: {elapsed_total:.1f} minutes")
                    print(f"Total reviews so far: {total_reviews:,}")
                    print(f"Free disk space: {check_disk_space()} GB")
                    check_storage_usage()

                clear_memory()

    except KeyboardInterrupt:
        print("\nProcess interrupted by user")
        save_progress(list(completed), total_reviews)
        raise
    except Exception as e:
        print(f"\nUnexpected error: {str(e)}")
        save_progress(list(completed), total_reviews)
        raise

    total_time = (time.time() - start_time) / 60
    print_status("\nPROCESS COMPLETE", is_header=True)
    print(f"Total time: {total_time:.1f} minutes")
    print(f"Total reviews processed: {total_reviews:,}")
    print(f"Log file: {log_filename}")

if __name__ == "__main__":
    print_status("AMAZON REVIEWS PROCESSOR", is_header=True)

    # Show initial storage status
    check_storage_usage()

    # Process categories
    try:
        transition_and_continue()
    except KeyboardInterrupt:
        print("\nProcess interrupted by user")
    except Exception as e:
        print(f"\nProcess failed: {str(e)}")
    finally:
        # Always verify files and show storage status at the end
        verify_parquet_files()
        check_storage_usage()
        print(f"\nTotal space freed: {TOTAL_SPACE_FREED:.1f}GB")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

✓ Google Drive mounted successfully
✓ Backup directory: /content/drive/MyDrive/amazon_reviews_backup


AMAZON REVIEWS PROCESSOR

STORAGE USAGE
Processed parquet files: 0.0GB
Total space freed so far: 0.0GB

STARTING PROCESSING

CHECKING CURRENT PROGRESS

Found in Drive backup: 34 categories
Found in local processed: 0 categories
Fetching available categories...

- Sports_and_Outdoors
- Software
- CDs_and_Vinyl
- Amazon_Fashion
- Automotive
- Health_and_Household
- Video_Games
- Beauty_and_Personal_Care
- Subscription_Boxes
- Baby_Products
- Tools_and_Home_Improvement
- Magazine_Subscriptions
- Handmade_Products
- Arts_Crafts_and_Sewing
- Books
- Cell_Phones_and_Accessories
- Office_Products
- Kindle_Store
- Movies_and_TV
- Clothing_Shoes_and_Jewelry
- Digital_Music
- Musical_Instruments
- Electronics
- All_Beauty
- Patio_Lawn_and_Garden
- Toys_and_Games
- Pe

KeyboardInterrupt: 