In [None]:
import os
import subprocess
import boto3
import pandas as pd
import dask.dataframe as dd
import awswrangler as wr

# Set AWS credentials
os.environ.update({
    "AWS_ACCESS_KEY_ID": "<YOUR_AWS_ACCESS_KEY_ID>",
    "AWS_SECRET_ACCESS_KEY": "<YOUR_AWS_SECRET_ACCESS_KEY>",
    "AWS_DEFAULT_REGION": "<YOUR_AWS_DEFAULT_REGION>"
})

# Initialize Boto3 client
s3 = boto3.client("s3")

In [None]:
# Set Pandas display options

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

In [None]:
# List the contents of a folder

def list_folder_contents(bucket, folder):
    result = subprocess.run(
        ["aws", "s3", "ls", f"s3://{bucket}/{folder}", "--recursive"],
        capture_output=True, text=True
    )

    if result.returncode != 0:
        print(f"Error: {result.stderr}")
        return pd.DataFrame(columns=["File Name", "Size (Bytes)"])

    return pd.DataFrame([
        {"File Name": " ".join(line.split()[3:]), "Size (Bytes)": int(line.split()[2])}
        for line in result.stdout.strip().split("\n") if len(line.split()) >= 4
    ])

# Run and display
bucket_name = "<YOUR_BUCKET_NAME>"
folder_path = "<YOUR_FOLDER_PATH>/"

df = list_folder_contents(bucket_name, folder_path)
print(df.to_string(index=False) if not df.empty else "Folder is empty.")

In [None]:
# Read a file dynamically based on extension

def read_s3_file(bucket, path):
    full_path = f"s3://{bucket}/{path}"

    # Read CSV or TSV file
    if path.endswith(('.csv', '.tsv')):
        return dd.read_csv(full_path, sep='\t' if path.endswith('.tsv') else ',', dtype=str, low_memory=False)

    # Read Excel file
    if path.endswith(('.xlsx', '.xls')):
        df = wr.s3.read_excel(full_path, sheet_name=0, dtype=str)
        return dd.from_pandas(df, npartitions=10)  # Convert Pandas DF to Dask DF for scalability

    raise ValueError("Unsupported file format")

def process_data(df):
    inferred_dtypes = df.head(10000).convert_dtypes().dtypes.to_dict()  # Sampling 10K rows for better inference
    return df.map_partitions(lambda part: part.astype(inferred_dtypes, errors='ignore'))

# Usage
bucket_name = "<YOUR_BUCKET_NAME>"
s3_file_path = "<YOUR_S3_FILE_PATH>"

df = process_data(read_s3_file(bucket_name, s3_file_path))
df.head()

In [None]:
# Download file/folder from S3 and save locally

bucket_name = "<YOUR_BUCKET_NAME>"
s3_path = "<YOUR_S3_PATH>"  # Ensure it ends with '/' if it's a folder
local_path = "<YOUR_LOCAL_PATH>"

# Set the AWS_MAX_CONCURRENT_REQUESTS environment variable for parallelism
os.environ["AWS_MAX_CONCURRENT_REQUESTS"] = str(10)  # Set concurrent requests

# AWS CLI command
command = [
    "aws", "s3", "sync" if s3_path.endswith('/') else "cp",  # Use sync for folder, cp for file
    f"s3://{bucket_name}/{s3_path}",
    local_path,
    "--only-show-errors"
]

# Run the command
try:
    subprocess.run(command, check=True)
    print("Download complete.")
except subprocess.CalledProcessError as e:
    print("Error downloading:", e)

In [None]:
# Upload file/folder to S3

def upload_to_s3(local_path, bucket_name, s3_path, num_processes=10):
    local_path = os.path.abspath(local_path)

    # Set AWS CLI to use multipart upload for large files
    os.environ["AWS_MAX_CONCURRENT_REQUESTS"] = str(num_processes)

    # Check if the local path is a file or a directory
    if os.path.isfile(local_path):
        command = f'aws s3 cp "{local_path}" s3://{bucket_name}/{s3_path}{os.path.basename(local_path)}'
    elif os.path.isdir(local_path):
        # If it's a directory, use find to list files and upload them
        command = (
            f'cd "{local_path}" && find . -type f | sed "s|^./||" | '
            f'xargs -P {num_processes} -I {{}} aws s3 cp "{{}}" s3://{bucket_name}/{s3_path}{{}}'
        )
    else:
        print("Invalid path provided. Please provide a valid file or directory.")
        return

    try:
        subprocess.run(command, shell=True, check=True)
        print("Upload completed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"Error occurred: {e}")

# Usage
bucket_name = "<YOUR_BUCKET_NAME>"
s3_path = "<YOUR_S3_PATH>"
local_path = "<YOUR_LOCAL_PATH>"

upload_to_s3(local_path, bucket_name, s3_path)

In [None]:
# For deleting file/folder

def delete_from_s3(bucket_name, s3_path):
    if s3_path.endswith('/'):
        command = f'aws s3 rm "s3://{bucket_name}/{s3_path}" --recursive'
        print(f"Deleting folder: s3://{bucket_name}/{s3_path}")
    else:
        command = f'aws s3 rm "s3://{bucket_name}/{s3_path}"'
        print(f"Deleting file: s3://{bucket_name}/{s3_path}")

    try:
        subprocess.run(command, shell=True, check=True)
        print(f"Successfully deleted {s3_path} from S3.")
    except subprocess.CalledProcessError as e:
        print(f"Error occurred while deleting {s3_path} from S3: {e}")

# Usage
bucket_name = "<YOUR_BUCKET_NAME>"
s3_path = "<YOUR_S3_PATH>"

delete_from_s3(bucket_name, s3_path)