### Description
This script will be responsible for downloading the data from a dynamoDB table in chuncks. The chunks will be a JSON with 10k items, and saved under data/

In [1]:
## imports cell
import os
import json
import time
import boto3
from decimal import Decimal
from concurrent.futures import ThreadPoolExecutor

In [2]:
## Constants
CHUNK_SIZE = 10000
OUTPUT_DIR = "data"
LAST_KEY_FILE = os.path.join(OUTPUT_DIR, "last_evaluated_key.json")

In [None]:
## Initalise client
profile_name = ''
table_name = ''

session = boto3.Session(profile_name=profile_name)
dynamodb = session.resource("dynamodb")
table = dynamodb.Table(table_name)

In [4]:
## Helpers
class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return super().default(obj)

def load_last_evaluated_key():
    if os.path.exists(LAST_KEY_FILE):
        with open(LAST_KEY_FILE, "r") as f:
            return json.load(f)
    return None

def save_last_evaluated_key(key):
    print(f"saving last evaluated key {key}")
    with open(LAST_KEY_FILE, "w") as f:
        json.dump(key, f, cls=DecimalEncoder)

def write_chunk_file(items, index):
    chunk_file = os.path.join(OUTPUT_DIR, f"chunk{index}.json")
    with open(chunk_file, 'w') as f:
        json.dump(items, f, indent=2, cls=DecimalEncoder)
    print(f"Saved {len(items)} items to {chunk_file}.")

In [5]:
## main implementation
def download_dynamodb_table_in_chunks():
    start_time = time.time()  # ⏱️ Start timer
    
    # if the previous execution failed, go back from the previously saved last evaluated key
    last_evaluated_key = load_last_evaluated_key()
    print(f"last evaluated key: {last_evaluated_key}")
    
    # We'll accumulate items in a list until we reach CHUNK_SIZE
    chunk_items = []
    chunk_index = 1
    total_items_downloaded = 0
    
    while True:
        print(f"loop iteration. chunk items: {len(chunk_items)}, chunk index: {chunk_index}, total items downloaded: {total_items_downloaded}")
        # Build the scan parameters
        scan_params = { 'Limit': CHUNK_SIZE }
        if last_evaluated_key:
            scan_params['ExclusiveStartKey'] = last_evaluated_key

        response = table.scan(**scan_params)
        page_items = response.get('Items', [])

        # Accumulate
        for item in page_items:
            chunk_items.append(item)

            # If we have enough items for one chunk, write it out
            if len(chunk_items) == CHUNK_SIZE:
                print("chunk filled - writting output")
                write_chunk_file(chunk_items, chunk_index)
                total_items_downloaded += len(chunk_items)
                chunk_items.clear()
                chunk_index += 1

                # Save progress
                last_evaluated_key = response.get('LastEvaluatedKey', None)
                if last_evaluated_key:
                    save_last_evaluated_key(last_evaluated_key)
                print("chunk and last evaluated key saved")
        
        # Update last evaluated key for next iteration
        last_evaluated_key = response.get('LastEvaluatedKey', None)

        # If no more pages to read, break out
        if not last_evaluated_key:
            break

    # After the loop, we might have leftover items smaller than CHUNK_SIZE
    if chunk_items:
        print("writing last chunk")
        write_chunk_file(chunk_items, chunk_index)
        total_items_downloaded += len(chunk_items)
        chunk_items.clear()

    # Clear out the last key file, since we've read the entire table
    save_last_evaluated_key({})

    elapsed = time.time() - start_time  # ⏱️ End timer
    print(f"Download complete. Total items downloaded: {total_items_downloaded}")
    print(f"⏱️ Execution time: {elapsed:.2f} seconds")
    

In [None]:
download_dynamodb_table_in_chunks()