In [10]:
import tarfile
import boto3
from pathlib import Path
import logging
import sagemaker
from sagemaker import get_execution_role
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.xgboost import XGBoost




In [None]:
import os
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from pathlib import Path
import logging

def upload_directory_to_s3(local_path, bucket, prefix):
    """Upload directory to S3 with comprehensive error handling"""
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    s3_client = boto3.client('s3')
    local_path = Path(local_path)
    uploaded_count = 0
    
    try:
        # VERIFY PATH EXISTS
        if not local_path.exists():
            # Try case-insensitive search
            parent = local_path.parent
            possible_dirs = [d for d in parent.iterdir() 
                            if d.is_dir() and d.name.lower() == local_path.name.lower()]
            
            if not possible_dirs:
                raise FileNotFoundError(f"Local path {local_path} does not exist")
            
            # Use first matching directory
            local_path = possible_dirs[0]
            logger.warning(f"Using case-corrected path: {local_path}")

        # Walk through directory
        for root, _, files in os.walk(local_path):
            for file in files:
                file_path = Path(root) / file
                relative_path = file_path.relative_to(local_path)
                s3_key = f"{prefix}/{relative_path}"
                
                try:
                    s3_client.upload_file(
                        Filename=str(file_path),
                        Bucket=bucket,
                        Key=str(s3_key)
                    )
                    uploaded_count += 1
                    logger.info(f"Uploaded: {file_path} → s3://{bucket}/{s3_key}")
                except (ClientError, NoCredentialsError) as e:
                    logger.error(f"Failed to upload {file_path}: {str(e)}")
                    continue
        
        logger.info(f"✅ Successfully uploaded {uploaded_count} files to s3://{bucket}/{prefix}")
        return True
        
    except Exception as e:
        logger.exception(f"Critical upload error: {str(e)}")
        return False

# CORRECTED USAGE
BUCKET = "sentiment-analysis-workflow"
PREFIX = "imdb-sentiment-analysis/raw_data"
LOCAL_PATH = "../data/aclImdb"  # Note the capital "I"

# Verify path before uploading
print("Current directory:", os.getcwd())
# print("Data directory exists:", Path("data").exists())
print("aclImdb exists:", Path("../data/aclImdb").exists())
# print("Contents of data directory:", os.listdir("data"))

if upload_directory_to_s3(LOCAL_PATH, BUCKET, PREFIX):
    print("Upload successful!")
else:
    print("Upload failed. Check logs for details.")

INFO:__main__:Uploaded: ../data/aclImdb/imdb.vocab → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/imdb.vocab


Current directory: /home/ec2-user/SageMaker/e2e-IMDb-sentiment-analysis-sagemaker-workflow
aclImdb exists: True


INFO:__main__:Uploaded: ../data/aclImdb/imdbEr.txt → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/imdbEr.txt
INFO:__main__:Uploaded: ../data/aclImdb/README → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/README
INFO:__main__:Uploaded: ../data/aclImdb/test/urls_pos.txt → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/test/urls_pos.txt
INFO:__main__:Uploaded: ../data/aclImdb/test/urls_neg.txt → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/test/urls_neg.txt
INFO:__main__:Uploaded: ../data/aclImdb/test/labeledBow.feat → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/test/labeledBow.feat
INFO:__main__:Uploaded: ../data/aclImdb/test/neg/8557_3.txt → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/test/neg/8557_3.txt
INFO:__main__:Uploaded: ../data/aclImdb/test/neg/922_3.txt → s3://sentiment-analysis-workflow/imdb-sentiment-analysis/raw_data/test/neg/922_3.txt
INFO:__main__:Up

In [9]:
def upload_directory_to_s3(local_path, bucket, prefix):
    """Upload directory to S3 with comprehensive error handling"""
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    s3_client = boto3.client('s3')
    local_path = Path(local_path)
    uploaded_count = 0
    
    try:
        # Verify local path exists
        if not local_path.exists():
            raise FileNotFoundError(f"Local path {local_path} does not exist")
        
        # Walk through directory
        for root, _, files in os.walk(local_path):
            for file in files:
                file_path = Path(root) / file
                relative_path = file_path.relative_to(local_path)
                s3_key = f"{prefix}/{relative_path}"
                
                try:
                    s3_client.upload_file(
                        Filename=str(file_path),
                        Bucket=bucket,
                        Key=str(s3_key))
                    uploaded_count += 1
                    logger.info(f"Uploaded: {file_path} → s3://{bucket}/{s3_key}")
                except (ClientError, NoCredentialsError) as e:
                    logger.error(f"Failed to upload {file_path}: {str(e)}")
                    continue
        
        logger.info(f"✅ Successfully uploaded {uploaded_count} files to s3://{bucket}/{prefix}")
        return True
        
    except Exception as e:
        logger.exception(f"Critical upload error: {str(e)}")
        return False

# Usage example:
BUCKET = "sentiment-analysis-workflow"
PREFIX = "imdb-sentiment-analysis/raw_data"
LOCAL_PATH = "data/aclImdb"

if upload_directory_to_s3(LOCAL_PATH, BUCKET, PREFIX):
    print("Upload successful!")
else:
    print("Upload failed. Check logs for details.")

ERROR:__main__:Critical upload error: Local path data/aclImdb does not exist
Traceback (most recent call last):
  File "/tmp/ipykernel_6338/1346181515.py", line 14, in upload_directory_to_s3
    raise FileNotFoundError(f"Local path {local_path} does not exist")
FileNotFoundError: Local path data/aclImdb does not exist


Upload failed. Check logs for details.


In [None]:


def upload_to_s3(local_path, s3_uri):
    """Upload local file/directory to S3"""
    session = boto3.Session()
    s3 = session.resource('s3')
    bucket, key_prefix = s3_uri.replace("s3://", "").split("/", 1)
    
    if Path(local_path).is_dir():
        for file in Path(local_path).rglob('*'):
            if file.is_file():
                s3_key = f"{key_prefix}/{file.relative_to(local_path)}"
                s3.Bucket(bucket).upload_file(str(file), s3_key)
    else:
        s3.Bucket(bucket).upload_file(local_path, f"{key_prefix}/{Path(local_path).name}")

def get_imdb_data_s3(config):
    """Ensure IMDb data is in S3, return S3 URI"""
    s3_data_uri = f"s3://{config['bucket']}/{config['prefix']}/raw_data"
    local_dir = Path('data/aclImdb')
    
    # Download and extract data if missing locally
    if not local_dir.exists():
        os.makedirs('data', exist_ok=True)
        tar_path = 'data/aclImdb_v1.tar.gz'
        if not Path(tar_path).exists():
            os.system(f'wget -P data/ https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz')
        with tarfile.open(tar_path) as tar:
            tar.extractall(path='data/')
    
    # Upload to S3
    upload_to_s3(str(local_dir), s3_data_uri)
    return f"{s3_data_uri}/aclImdb"




    

In [None]:
def create_processing_step(role, s3_data_uri, config):
    """Create processing job with S3 input"""
    # Upload processing script to S3
    s3_script_uri = f"s3://{config['bucket']}/{config['prefix']}/scripts/processing_job.py"
    upload_to_s3('scripts/processing_job.py', s3_script_uri)
    
    sklearn_processor = SKLearnProcessor(
        framework_version='1.2-1',
        role=role,
        instance_type='ml.m5.large',
        instance_count=1,
        base_job_name='imdb-processing'
    )
    
    return ProcessingStep(
        name='IMDBDataProcessing',
        processor=sklearn_processor,
        inputs=[
            ProcessingInput(
                source=s3_data_uri,  # S3 instead of local
                destination='/opt/ml/processing/input/data'
            )
        ],
        outputs=[
            ProcessingOutput(output_name='train', source='/opt/ml/processing/output/train'),
            ProcessingOutput(output_name='validation', source='/opt/ml/processing/output/validation'),
            ProcessingOutput(output_name='test', source='/opt/ml/processing/output/test'),
            ProcessingOutput(output_name='vectorizer', source='/opt/ml/processing/output/vectorizer')
        ],
        code=s3_script_uri  # Use S3 path for script
    )

In [None]:
def main():
    # Initialize SageMaker
    session = sagemaker.Session()
    role = get_execution_role()
    region = session.boto_region_name
    bucket = session.default_bucket()  # Use default bucket
    prefix = 'imdb-sentiment-analysis'
    
    config = {
        "session": session,
        "role": role,
        "region": region,
        "bucket": bucket,
        "prefix": prefix
    }
    
    # 1. Upload data to S3
    s3_data_uri = get_imdb_data_s3(config)
    
    # 2. Create pipeline steps
    # processing_step = create_processing_step(role, s3_data_uri, config)
    # train_s3 = f"{processing_step.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri}"
    # val_s3 = f"{processing_step.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri}"
    
    # # 3. Upload training script to S3
    # s3_train_script = f"s3://{bucket}/{prefix}/scripts/train.py"
    # upload_to_s3('scripts/train.py', s3_train_script)
    
    # # 4. Training step (using S3 script)
    # container = sagemaker.image_uris.retrieve('xgboost', region, '1.7-1')
    # xgb_estimator = XGBoost(
    #     entry_point=s3_train_script,  # S3 script path
    #     image_uri=container,
    #     role=role,
    #     instance_count=1,
    #     instance_type='ml.m5.2xlarge',
    #     framework_version='1.7-1',
    #     output_path=f's3://{bucket}/{prefix}/models'
    # )
    
    # training_step = TrainingStep(
    #     name='IMDBModelTraining',
    #     estimator=xgb_estimator,
    #     inputs={
    #         'train': TrainingInput(train_s3, content_type='text/csv'),
    #         'validation': TrainingInput(val_s3, content_type='text/csv')
    #     }
    # )
    

In [None]:
# 5. Create and run pipeline
    pipeline = Pipeline(
        name='IMDBSentimentPipeline',
        steps=[processing_step, training_step],
        sagemaker_session=session
    )
    
    pipeline.upsert(role_arn=role)
    execution = pipeline.start()
    execution.wait()
    
    # ... (deployment code remains the same)

if __name__ == "__main__":
    main()