## Captura de datos de cambios para DynamoDB Streams

DynamoDB Streams captura una secuencia en orden cronológico de las modificaciones de los elementos en una tabla de DynamoDB y almacena esta información en un log durante un máximo de 24 horas. Las aplicaciones pueden obtener acceso a este registro y ver los elementos de datos tal y como se encontraban antes y después de la modificación, prácticamente en tiempo real.

El cifrado en reposo cifra los datos en DynamoDB streams. Para obtener más información, consulte [Cifrado en reposo en DynamoDB](https://docs.aws.amazon.com/es_es/amazondynamodb/latest/developerguide/EncryptionAtRest.html).

Una transmisión de DynamoDB es un flujo ordenado de información sobre los cambios que se realizan en los elementos de una tabla de DynamoDB. Cuando se habilita una transmisión en una tabla, DynamoDB obtiene información sobre cada modificación de los elementos de datos de esa tabla.

Cada vez que una aplicación crea, actualiza o elimina elementos en la tabla, DynamoDB Streams escribe un registro de transmisión con los atributos de clave principal de los elementos modificados. Un registro de transmisión contiene información sobre una modificación de los datos de un solo elemento de una tabla de DynamoDB. Puede configurar la secuencia de tal forma que sus registros capturen información adicional; por ejemplo, las imágenes de "antes" y "después" de los elementos modificados.

DynamoDB Streams ayuda a garantizar lo siguiente:

* Cada registro de secuencia aparece una única vez en la secuencia.
* Para cada elemento que se modifica de una tabla de DynamoDB, los registros de transmisión aparecen en el mismo orden en que se han realizado las modificaciones del elemento.

DynamoDB Streams escribe los registros de transmisión prácticamente en tiempo real, para que pueda crear aplicaciones que consuman estas transmisiones y adopten medidas en función de su contenido.

In [3]:
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import base64
from spdynamodb import DynamoTable
from datetime import datetime
import json
import time
from decimal import Decimal

In [4]:
#dt = DynamoTable(profile_name='my-profile')
dt=DynamoTable()
try:
    dt.select_table('FooBarTable')
    print(dt)
except:
    dt.create_table(
        table_name='FooBarTable',
        partition_key='PK',
        partition_key_type='S',
        sort_key='SK',
        sort_key_type='S',
    )

Table info:
 - Table name: FooBarTable
 - Table arn: arn:aws:dynamodb:us-east-1:810855290601:table/FooBarTable
 - Table creation: 2024-08-13T14:11:40
 - Key schema: [{'AttributeName': 'PK', 'KeyType': 'HASH'}, {'AttributeName': 'SK', 'KeyType': 'RANGE'}]
 - Attribute definitions: [{'AttributeName': 'PK', 'AttributeType': 'S'}, {'AttributeName': 'SK', 'AttributeType': 'S'}]
 - Table class: STANDARD
 - Point-in-time recovery status: DISABLED
 - Delete protection: False
 - Stream enabled: OFF



In [5]:
dt.status_stream

'OFF'

In [6]:
dt.status_stream = 'ON'

DynamoDB streams turned on successfully.


In [7]:
dt.status_stream

'NEW_AND_OLD_IMAGES'

In [6]:
dt.status_stream = 'KEYS_ONLY'

DynamoDB streams are already NEW_AND_OLD_IMAGES. You must turn them off before changing the stream view type.


In [67]:
dt.status_stream = 'OFF'

In [None]:
dt.status_stream = 'KEYS_ONLY'

In [None]:
dt.status_stream

In [None]:
dt.status_stream = 'KEYS_ONLY'

In [8]:
dt.stream_arn

'arn:aws:dynamodb:us-east-1:810855290601:table/FooBarTable/stream/2024-08-13T17:13:16.358'

### Uso de Kinesis Data Streams para capturar cambios en DynamoDB

Puede utilizar Amazon Kinesis Data Streams para capturar cambios en Amazon DynamoDB.

Kinesis Data Streams captura modificaciones a nivel de elemento en cualquier tabla de DynamoDB y las replica en una secuencia de datos de Kinesis. Sus aplicaciones pueden acceder a este flujo de datos y ver los cambios a nivel de elemento casi en tiempo real. Puede capturar y almacenar continuamente terabytes de datos por hora. Puede aprovechar el tiempo de retención de datos más prolongado y, con la capacidad de distribución ramificada mejorada, puede llegar simultáneamente a dos o más aplicaciones descendentes. Otros beneficios incluyen una mayor transparencia de auditoría y seguridad.

Kinesis Data Streams también le da acceso a Amazon Kinesis Data Firehose y Amazon Kinesis Data Analytics. Estos servicios pueden ayudarle a crear aplicaciones que alimenten paneles en tiempo real, generen alertas, apliquen precios y publicidad dinámicos y apliquen análisis de datos sofisticados y algoritmos de machine learning.

In [9]:
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
# Create kinesis data stream
try:
    response = kinesis_client.create_stream(
        StreamName='FooBarStream',
        ShardCount=1
    )
    print("Stream created successfully.")
    # Get kinesis data stream arn
    stream_arn = kinesis_client.describe_stream(StreamName='FooBarStream')['StreamDescription']['StreamARN']
except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceInUseException':
        print('Stream already exists')
    else:
        print("Unexpected error: %s" % e)

Stream created successfully.


In [10]:
dt.table.meta.client.enable_kinesis_streaming_destination(
    TableName=dt.table_name,
    StreamArn=stream_arn
)

{'TableName': 'FooBarTable',
 'StreamArn': 'arn:aws:kinesis:us-east-1:810855290601:stream/FooBarStream',
 'DestinationStatus': 'ENABLING',
 'EnableKinesisStreamingConfiguration': {},
 'ResponseMetadata': {'RequestId': '2T1P1KBP2VPKS2LHA3NV935VU3VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Tue, 13 Aug 2024 17:14:48 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '172',
   'connection': 'keep-alive',
   'x-amzn-requestid': '2T1P1KBP2VPKS2LHA3NV935VU3VV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2392602840'},
  'RetryAttempts': 0}}

In [14]:
response = dt.table.meta.client.describe_kinesis_streaming_destination(
    TableName=dt.table_name,
)
status = response['KinesisDataStreamDestinations'][0]['DestinationStatus']
if status == 'ACTIVE':
    print('Kinesis data stream destination is active')
else:
    print(f'Kinesis data stream destination is not active: {status}')

Kinesis data stream destination is active


In [32]:
for i in range(1, 50):
    dt.table.update_item(
        Key={
            'PK': f'ORDER#10{i}',
            'SK': f'ORDER#10{i}'
        },
        UpdateExpression='SET #attr = :val, #attr2 = :val2',
        ExpressionAttributeNames={
            '#attr': 'status',
            '#attr2': 'totalItems'
        },
        ExpressionAttributeValues={
            ':val': 'processing',
            ':val2': i
        }
    )

In [45]:
# Get records from kinisis data stream
response = kinesis_client.describe_stream(StreamName='FooBarStream')
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName='FooBarStream', ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')
shard_iterator = shard_iterator['ShardIterator']
response = kinesis_client.get_records(ShardIterator=shard_iterator)  #Limit=10
print("Number of records: ", len(response['Records']))

Number of records:  147


In [64]:
def process_record(record):
    data = json.loads(record['Data'].decode('utf-8'))
    record_data = data['dynamodb']
    timestamp = record['ApproximateArrivalTimestamp'].strftime('%Y-%m-%d %H:%M:%S')
    # Compare new and old images	
    print('Timestamp: {}'.format(timestamp))
    print(record_data)
    
    print('Item Key: {}'.format(record_data['Keys']))
    # Get the item attributes
    

In [65]:
process_record(response['Records'][-1])

Timestamp: 2024-08-13 14:22:27
{'ApproximateCreationDateTime': 1723569746754, 'Keys': {'PK': {'S': 'ORDER#1036'}, 'SK': {'S': 'ORDER#10{i}'}}, 'OldImage': {'SK': {'S': 'ORDER#10{i}'}, 'PK': {'S': 'ORDER#1036'}, 'totalItems': {'N': '36'}, 'status': {'S': 'processing'}}, 'SizeBytes': 78}
Item Key: {'PK': {'S': 'ORDER#1036'}, 'SK': {'S': 'ORDER#10{i}'}}
