In [None]:
# Upload Class

In [1]:
import os
import boto3
import logging
import base64
import hashlib

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class S3MultipartUpload:
    PART_MINIMUM = int(5 * 1024 * 1024)

    def __init__(self, bucket, key, local_path, guid, part_size=int(25 * 1024 * 1024), profile_name=None, region_name="ap-southeast-1", verbose=True):
        self.bucket = bucket
        self.key = key
        self.path = local_path
        self.guid = guid
        self.total_bytes = os.stat(local_path).st_size
        self.part_bytes = part_size
        assert part_size > self.PART_MINIMUM
        self.s3 = boto3.session.Session(profile_name=profile_name, region_name=region_name).client("s3")
        if verbose:
            boto3.set_stream_logger(name="botocore")

    def get_all_parts(self, upload_id):
        parts = self.s3.list_parts(Bucket=self.bucket, Key=self.key, UploadId=upload_id)
        rparts = [{"PartNumber": part["PartNumber"], "ETag": part["ETag"], "ChecksumSHA256": part.get("ChecksumSHA256")} for part in parts.get("Parts", [])]
        return rparts

    def get_next_part(self, upload_id):
        parts = self.s3.list_parts(Bucket=self.bucket, Key=self.key, UploadId=upload_id)
        next_part_marker = parts.get("NextPartNumberMarker", 0)
        return next_part_marker + 1

    def abort_resume(self, action):
        mpus = self.s3.list_multipart_uploads(Bucket=self.bucket)
        upload_parts_exists = False
        logger.info(f"FileUpload action {action}")
        if "Uploads" in mpus:
            for u in mpus["Uploads"]:
                upload_id = u["UploadId"]
                if u["Key"] != self.key:
                    continue
                upload_parts_exists = True
                if action == "abort":
                    self.s3.abort_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=upload_id)
                elif action == "resume":
                    try:
                        next_part = self.get_next_part(upload_id)
                        self.update_progress(next_part - 1)
                        new_parts = self.upload(upload_id, next_part)
                        new_parts = self.get_all_parts(upload_id)
                        self.complete(upload_id, new_parts)
                    except self.s3.exceptions.NoSuchUpload:
                        mpu_id = self.create()
                        new_parts = self.upload(mpu_id, 1)
                        self.complete(mpu_id, new_parts)

        if action == "resume" and not upload_parts_exists:
            mpu_id = self.create()
            new_parts = self.upload(mpu_id, 1)
            self.complete(mpu_id, new_parts)

    def create(self):
        mpu = self.s3.create_multipart_upload(Bucket=self.bucket, Key=self.key, ChecksumAlgorithm='SHA256')
        mpu_id = mpu["UploadId"]
        return mpu_id

    def upload(self, mpu_id, part_number):
        parts = []
        with open(self.path, "rb") as f:
            for i in range(1, part_number):
                f.read(self.part_bytes)
            while True:
                data = f.read(self.part_bytes)
                if not data:
                    break
                if not (1 <= part_number <= 10000):
                    logger.error(f"Invalid part number: {part_number}")
                    raise ValueError(f"Part number must be between 1 and 10000, got {part_number}")

                # Calculate SHA256 checksum
                sha256_checksum = self.calculate_sha256(data)
                logger.info("Data-SHA-256")
                logger.info(sha256_checksum)

                upload_kwargs = {
                    "Body": data,
                    "Bucket": self.bucket,
                    "Key": self.key,
                    "UploadId": mpu_id,
                    "PartNumber": part_number,
                    "ChecksumAlgorithm": 'SHA256',
                    "ChecksumSHA256": sha256_checksum,
                }

                part = self.s3.upload_part(**upload_kwargs)
                parts.append({"PartNumber": part_number, "ETag": part["ETag"], "ChecksumSHA256": sha256_checksum})
                part_number += 1
                self.update_progress(part_number)
        return parts

    def complete(self, mpu_id, parts):
        result = self.s3.complete_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=mpu_id, MultipartUpload={"Parts": parts})
        self.verify_checksum(mpu_id)
        return result

    def verify_checksum(self,upload_id):
        local_checksum = self.calculate_sha256_for_parts(upload_id)
        s3_checksum = self.calculate_s3_sha256(self.bucket, self.key)
        if local_checksum == s3_checksum:
            logger.info(f"Checksum verification passed for {self.path}")
            os.remove(self.path)
        else:
            logger.error(f"Checksum verification failed for {self.path}")

    @staticmethod
    def calculate_sha256(data):
        sha256 = hashlib.sha256()
        sha256.update(data)
        return base64.b64encode(sha256.digest()).decode()

    def calculate_sha256_for_parts(self,upload_id):
        parts = self.get_all_parts(upload_id)
        sha256 = hashlib.sha256()
        for part in parts:
            sha256.update(part["ChecksumSHA256"].encode())
        return base64.b64encode(sha256.digest()).decode()

    def calculate_s3_sha256(self, bucket, key):
        s3_object = self.s3.get_object_attributes(Bucket=bucket, Key=key, ObjectAttributes=['Checksum'])
        s3_sha256 = s3_object['Checksum']['ChecksumSHA256']
        return s3_sha256

    def update_progress(self, part_number):
        progress = min(100, (part_number * self.part_bytes / self.total_bytes) * 100)
        logger.info(f"FileUpload progress {self.guid} : {progress}")


BUCKET_NAME = 'cdk-hnb659fds-assets-182426352951-ap-southeast-1'
uploader = S3MultipartUpload(BUCKET_NAME, "/tmp/test/m3.zip", "/tmp/test/m3.zip", "m3.zip")
uploader.abort_resume("resume")


INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
2024-06-28 16:56:05,871 botocore.hooks [DEBUG] Event before-parameter-build.s3.ListMultipartUploads: calling handler <function validate_bucket_name at 0x7f74b0272310>
DEBUG:botocore.hooks:Event before-parameter-build.s3.ListMultipartUploads: calling handler <function validate_bucket_name at 0x7f74b0272310>
2024-06-28 16:56:05,872 botocore.hooks [DEBUG] Event before-parameter-build.s3.ListMultipartUploads: calling handler <function remove_bucket_from_url_paths_from_model at 0x7f74b02771f0>
DEBUG:botocore.hooks:Event before-parameter-build.s3.ListMultipartUploads: calling handler <function remove_bucket_from_url_paths_from_model at 0x7f74b02771f0>
2024-06-28 16:56:05,874 botocore.hooks [DEBUG] Event before-parameter-build.s3.ListMultipartUploads: calling handler <bound method S3RegionRedirectorv2.annotate_request_context of <botocore.utils.S3RegionRedirectorv2 object at 0x7f749b6405b0>>
DEBUG:botoc

KeyboardInterrupt: 

In [None]:
!pip install requests

In [None]:
import requests

In [None]:

# Set the base URL for your API
BASE_URL = 'http://127.0.0.1:8001/uploads/file_uploads'

# Define the GUID and file information
guid = 'd1.zip'  # Replace with an actual GUID from your database

# Headers for the requests
headers = {
    'Content-Type': 'application/json',
}


In [None]:

# Function to update the priority
def update_priority(guid, priority):    
    url = f'{BASE_URL}/update_priority/'
    data = {
        'guid': guid,
        'priority': priority,
    }
    response = requests.post(url, json=data, headers=headers)
    print('Update Priority Response:', response.json())


In [None]:

# Function to update the status
def update_status(guid, status):
    url = f'{BASE_URL}/update_status/'
    data = {
        'guid': guid,
        'status': status,
    }
    response = requests.post(url, json=data, headers=headers)
    print('Update Status Response:', response.json())


In [None]:

# Test updating the priority
print("Testing Update Priority API")
update_priority(guid, 1)  # Set priority to 1


In [None]:

# Test pausing the upload
print("\nTesting Pause Upload API")
update_status(guid, 'paused')


In [None]:

# Test resuming the upload
print("\nTesting Resume Upload API")
update_status(guid, 'resume')



In [None]:

# # Test canceling the upload
# print("\nTesting Cancel Upload API")
# update_status(guid, 'cancel')

In [None]:
import pickle

serialized_data = b"\x80\x04\x95\xe7\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\tfile_path\x94\x8c\x13/tmp/test/test4.zip\x94\x8c\x0bobject_name\x94\x8c\ttest4.zip\x94\x8c\x04guid\x94h\x04\x8c\x0cinstance_uid\x94\x8c\x11some_instance_uid\x94\x8c\bpriority\x94J}Byf\x8c\x06status\x94\x8c\tuploading\x94\x8c\ncreated_at\x94GA\xd9\x9eP\x9f@\xdc>\x8c\nupdated_at\x94GA\xd9\x9eP\xa8\xd0\xc6}\x8c\ttimestamp\x94GA\xd9\x9eP\x9f@\xdcR\x8c\bprogress\x94G@QJ\n\x9b[\xc6\x9cu."

# Deserialize the data
data = pickle.loads(serialized_data)

# Print the resulting object
print(data)


In [None]:
import time

In [None]:

guids = ['m1.zip', 'm2.zip','m3.zip','m4.zip','m5.zip','m6.zip','m7.zip','m8.zip','m9.zip']

# Test Case 1: Change priority to the highest
print("Test Case 1: Change priority to the highest")
update_priority(guids[6], 1)
time.sleep(1)


In [None]:

# Test Case 2: Change priority of multiple files
print("Test Case 2: Change priority of multiple files")
update_priority(guids[1], 1)
update_priority(guids[2], 3)
update_priority(guids[3], 4)
time.sleep(1)


In [None]:

# Test Case 3: Change priority to a lower level
print("Test Case 3: Change priority to a lower level")
update_priority(guids[0], 4)
time.sleep(1)

# Test Case 4: Change priority and resume
print("Test Case 4: Change priority and resume")
update_status(guids[1], 'paused')
update_priority(guids[1], 1)
update_status(guids[1], 'resume')
time.sleep(1)

# Test Case 5: Ensure unique priorities
print("Test Case 5: Ensure unique priorities")
update_priority(guids[0], 1)
update_priority(guids[1], 2)
update_priority(guids[2], 3)
update_priority(guids[3], 4)
time.sleep(1)


In [None]:

# Test Case 6: Swap priorities between files
print("Test Case 6: Swap priorities between files")
update_priority(guids[0], 2)
update_priority(guids[1], 1)
time.sleep(1)


In [None]:

# Test Case 7: Handle maximum uploads
print("Test Case 7: Handle maximum uploads")
update_priority(guids[0], 1)
update_priority(guids[1], 1)
update_priority(guids[2], 1)
update_priority(guids[3], 1)  # Assuming this is more than MAX_UPLOADS
time.sleep(1)


In [None]:

# Test Case 8: Change priority of a non-existent file
print("Test Case 8: Change priority of a non-existent file")
update_priority('non_existent_guid', 1)
time.sleep(1)


In [None]:

# Test Case 9: Change priority with invalid data
print("Test Case 9: Change priority with invalid data")
update_priority(guids[0], 'invalid_priority')
update_priority(guids[0], -1)  # Assuming negative values are invalid
time.sleep(1)


In [None]:

# Test Case 10: Pause high priority task
print("Test Case 10: Pause high priority task")
update_priority(guids[0], 1)
update_status(guids[1], 'paused')
time.sleep(1)

In [None]:
import hashlib
import boto3

In [None]:
def calculate_sha256(file_path):
    sha256 = hashlib.sha256()
    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b''):
            sha256.update(chunk)
    return sha256.hexdigest()



In [None]:
def calculate_s3_sha256( bucket, key):
    s3_client = boto3.client('s3')
    # Get the object and its metadata
    s3_object = s3_client.head_object(Bucket=bucket, Key=key)
    # Get the SHA256 checksum from the metadata
    s3_checksum = s3_object['ETag']
    return s3_checksum

In [None]:
def calculate_s3_sha256( bucket, key):
    s3_client = boto3.client('s3')
    s3_object = s3_client.get_object(Bucket=bucket, Key=key)
    s3_sha256 = hashlib.sha256(s3_object['Body'].read()).hexdigest()
    return s3_sha256


In [None]:
def calculate_s3_sha256( bucket, key):
    s3_client = boto3.client('s3')
    response=s3_client.get_object_attributes(
            Bucket=bucket,
            Key=key,
            ObjectAttributes=['Checksum']
        )
    s3_checksum = response['Checksum']['ChecksumSHA256']
    return s3_checksum

In [None]:
calculate_s3_sha256("cdk-hnb659fds-assets-182426352951-ap-southeast-1","m8.zip")

In [None]:
calculate_sha256("/tmp/test/m8.zip")