In [28]:
import boto3
import time
import concurrent.futures


# Configure your AWS profile and table name
aws_profile = 'feed-staging-admin'
table_name = 'staging-v2-community-feed-features'


In [13]:
limit = 10000

In [14]:
session = boto3.Session(profile_name=aws_profile)

# Create a DynamoDB service client
dynamodb = session.client('dynamodb')

# scanning for keys
items = []

# Initialize the scan operation
response = dynamodb.scan(
    TableName=table_name,
    Limit=1000  # Adjust this to manage read throughput more effectively
)

# Add the items from the first scan to the list
items.extend(response.get('Items', []))

# Continue scanning if more items are available and you haven't reached 1000 items yet
while 'LastEvaluatedKey' in response and len(items) < limit:
    response = dynamodb.scan(
        TableName=table_name,
        ExclusiveStartKey=response['LastEvaluatedKey'],
        Limit=1000  # Adjusting this limit may help manage read throughput
    )
    items.extend(response.get('Items', []))
    if len(items) >= limit:
        break

In [15]:
len(items)

2695

In [16]:
item = items[0]
item["sk"]["S"]

'FEATURE_NAME#CFMediaAddedCommentSignal'

In [17]:
sks = set([item["sk"]["S"] for item in items])
sks

{'FEATURE_NAME#CFMediaAddedCommentSignal',
 'FEATURE_NAME#CFMediaAddedReactionSignal',
 'FEATURE_NAME#CFMediaDwellTimeSignal',
 'FEATURE_NAME#CFMediaTappedTakenBySignal',
 'FEATURE_NAME#CFMediaViewedSignal',
 'FEATURE_NAMECFMediaAddedCommentSignal',
 'FEATURE_NAMECFMediaAddedReactionSignal',
 'FEATURE_NAMECFMediaDwellTimeSignal',
 'FEATURE_NAMECFMediaViewedSignal'}

In [18]:
pks = [item["pk"]["S"] for item in items]
pks[:5]

['MEDIA_ID#01HQ8J9GP57HBQF4ZKZ0DWHJMP',
 'MEDIA_ID#01GZJYSEJWDHAN6ACVJ1WZ66YA',
 'MEDIA_ID#01GZJYSEJWDHAN6ACVJ1WZ66YA',
 'MEDIA_ID#01GZJYSEJWDHAN6ACVJ1WZ66YA',
 'USER_ID#80b92820-cb18-41aa-9131-c1c5922d2f13']

In [19]:
mediaIds = set([pk for pk in pks if "MEDIA_ID" in pk and "CROSS" not in pk])
len(mediaIds), list(mediaIds)[:5]

(400,
 ['MEDIA_ID#01H2B7FZW98XBZVX3YNW2E3Q6A',
  'MEDIA_ID#01GZJYP8710GWK65TXRPK4KRCD',
  'MEDIA_ID#01HQ63A0324XGVVGXH9494887T',
  'MEDIA_ID#01HQ639G3R4GHQH0W3EZYWEGY8',
  'MEDIA_ID01GTP5R224XKCM29V0P1M2S7F4'])

In [20]:
takenByIds = set([pk for pk in pks if "TAKEN_BY" in pk and "CROSS" not in pk])
len(takenByIds), list(takenByIds)[:5]

(35,
 ['TAKEN_BY_ID#12c954c0-2c46-4e65-9bea-49d0c91e18e6',
  'TAKEN_BY_ID#db195dfd-87a4-4544-b15a-7795c1488be1',
  'TAKEN_BY_ID#b9ee36f5-be85-46f8-802f-a2b72de2ccbb',
  'TAKEN_BY_ID#8e44af8b-de96-49cf-8683-b94ac6bd149a',
  'TAKEN_BY_ID#ef842ed5-b89f-4143-a64d-159037ef321e'])

In [21]:
userIds = set([pk for pk in pks if "USER_ID" in pk and "CROSS" not in pk])
len(userIds), list(userIds)[:5]

(89,
 ['USER_ID#09622a3d-ca7c-45c0-b27d-b103a587c600',
  'USER_ID#ff2e8247-ad77-4f67-9c6d-e21a9f26526a',
  'USER_ID#dc415d7a-8863-4a68-a5a7-82a6cdefdc1e',
  'USER_ID#8ba12d31-b71f-4ec6-8a51-6cf90433de52',
  'USER_ID#076a42a2-69d4-4809-9910-3b1d42ca78f5'])

In [23]:
keys_to_get = []
for mediaId in mediaIds:
    for sk in sks:
        elem = {"pk": {"S": mediaId}, "sk": {"S": sk}}
        keys_to_get.append(elem)
print(len(keys_to_get))

3600


In [27]:
%%timeit
request_items = {
    table_name: {
        'Keys': keys_to_get[:100]
    }
}
response = dynamodb.batch_get_item(RequestItems=request_items)
# Print the retrieved items
items = response['Responses'][table_name]
total_cnt = len(items)
print("Total count: ", total_cnt)

Total count:  20


In [29]:
def chunk_keys(keys, chunk_size=100):
    """Yield successive chunk_size chunks from keys."""
    for i in range(0, len(keys), chunk_size):
        yield keys[i:i + chunk_size]
def process_batch(keys_chunk):
    request_items = {
        table_name: {
            'Keys': keys_chunk
        }
    }
    response = dynamodb.batch_get_item(RequestItems=request_items)
    return response['Responses'][table_name]



In [31]:
%%timeit
keys_chunks = list(chunk_keys(keys_to_get))
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Schedule the process_batch function to be called for each chunk of keys
    future_to_chunk = {executor.submit(process_batch, chunk): chunk for chunk in keys_chunks}
    
    # Initialize a list to hold all retrieved items
    all_items = []
    
    # Process the results as they are completed
    for future in concurrent.futures.as_completed(future_to_chunk):
        chunk_items = future.result()
        all_items.extend(chunk_items)

# Now all_items contains the items retrieved from all batches
total_cnt = len(all_items)
print("Total count: ", total_cnt)

Total count:  729
