In [9]:
import boto3

In [10]:
sts = boto3.client("sts")
sts.get_caller_identity()

{'UserId': 'AIDA5TABDYKVBZOBMC6BL',
 'Account': '934157730474',
 'Arn': 'arn:aws:iam::934157730474:user/jay',
 'ResponseMetadata': {'RequestId': 'cbc0ed2c-f0a0-48ba-9184-b1e5eceb9144',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cbc0ed2c-f0a0-48ba-9184-b1e5eceb9144',
   'content-type': 'text/xml',
   'content-length': '400',
   'date': 'Wed, 22 Feb 2023 21:38:08 GMT'},
  'RetryAttempts': 0}}

In [11]:
import logging
from botocore.exceptions import ClientError
import json
logger = logging.getLogger(__name__)

class KinesisStream:
    """Encapsulates a Kinesis stream."""
    def __init__(self, kinesis_client):
        """
        :param kinesis_client: A Boto3 Kinesis client.
        """
        self.kinesis_client = kinesis_client
        self.name = None
        self.details = None
        self.stream_exists_waiter = kinesis_client.get_waiter('stream_exists')

    def set_name(self, name):
        self.name = name
    
    def _clear(self):
        """
        Clears property data of the stream object.
        """
        self.name = None
        self.details = None

    def arn(self):
        """
        Gets the Amazon Resource Name (ARN) of the stream.
        """
        return self.details['StreamARN']
    
    def create(self, name, chard_count=None, wait_until_exists=True):
        """
        Creates a stream.

        :param name: The name of the stream.
        :param wait_until_exists: When True, waits until the service reports that
                                  the stream exists, then queries for its metadata.
        """
        
        if chard_count: 
            config = {'StreamName':name, 
                      'ShardCount':chard_count,
                      'StreamModeDetails':{'StreamMode': 'PROVISIONED'}
                     }
        else:
            config = {'StreamName':name, 
                      'StreamModeDetails':{'StreamMode': 'ON_DEMAND'}
                     }
        
        try:
           
            self.kinesis_client.create_stream(**config)
            self.name = name
            logger.info("Created stream %s.", name)
            if wait_until_exists:
                logger.info("Waiting until exists.")
                self.stream_exists_waiter.wait(StreamName=name)
                self.describe(name)
        except ClientError as e:
            print(e)
            logger.exception("Couldn't create stream %s.", name)
            raise
            
    def delete(self):
        """
        Deletes a stream.
        """
        try:
            self.kinesis_client.delete_stream(StreamName=self.name)
            self._clear()
            logger.info("Deleted stream %s.", self.name)
        except ClientError:
            logger.exception("Couldn't delete stream %s.", self.name)
            raise
            
    def describe(self, name):
        """
        Gets metadata about a stream.

        :param name: The name of the stream.
        :return: Metadata about the stream.
        """
        try:
            response = self.kinesis_client.describe_stream(StreamName=name)
            self.name = name
            self.details = response['StreamDescription']
            print(response)
            logger.info("Got stream %s.", name)
        except ClientError:
            logger.exception("Couldn't get %s.", name)
            raise
        else:
            return self.details
        
        
    def put_record(self, data, partition_key):
        """
        Puts data into the stream. The data is formatted as JSON before it is passed
        to the stream.

        :param data: The data to put in the stream.
        :param partition_key: The partition key to use for the data.
        :return: Metadata about the record, including its shard ID and sequence number.
        """
        try:
            response = self.kinesis_client.put_record(
                StreamName=self.name,
                Data=json.dumps(data),
                PartitionKey=partition_key)
            logger.info("Put record in stream %s.", self.name)
        except ClientError:
            logger.exception("Couldn't put record in stream %s.", self.name)
            raise
        else:
            return response
        
        
    def get_records(self, max_records):
        """
        Gets records from the stream. This function is a generator that first gets
        a shard iterator for the stream, then uses the shard iterator to get records
        in batches from the stream. Each batch of records is yielded back to the
        caller until the specified maximum number of records has been retrieved.

        :param max_records: The maximum number of records to retrieve.
        :return: Yields the current batch of retrieved records.
        """
        try:
            response = self.kinesis_client.get_shard_iterator(
                StreamName=self.name, ShardId=self.details['Shards'][0]['ShardId'],
                ShardIteratorType='LATEST')
            shard_iter = response['ShardIterator']
            record_count = 0
            print(shard_iter)
            while record_count < max_records:
                response = self.kinesis_client.get_records(
                    ShardIterator=shard_iter, Limit=10)
                shard_iter = response['NextShardIterator']
                records = response['Records']
                logger.info("Got %s records.", len(records))
                record_count += len(records)
                yield records
                
        except ClientError:
            logger.exception("Couldn't get records from stream %s.", self.name)
            raise

        
kinesisClieant = boto3.client("kinesis", region_name="us-east-1")          
kinesisStream = KinesisStream(kinesisClieant)

In [5]:
kinesisStream.describe('stream1')

In [6]:
kinesisStream.describe('stream1')

{'StreamDescription': {'StreamName': 'stream1', 'StreamARN': 'arn:aws:kinesis:us-east-1:934157730474:stream/stream1', 'StreamStatus': 'ACTIVE', 'StreamModeDetails': {'StreamMode': 'PROVISIONED'}, 'Shards': [{'ShardId': 'shardId-000000000000', 'HashKeyRange': {'StartingHashKey': '0', 'EndingHashKey': '340282366920938463463374607431768211455'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49638266352139984731879337046679361028812958104742264834'}}], 'HasMoreShards': False, 'RetentionPeriodHours': 24, 'StreamCreationTimestamp': datetime.datetime(2023, 2, 22, 15, 55, 53, tzinfo=tzlocal()), 'EnhancedMonitoring': [{'ShardLevelMetrics': []}], 'EncryptionType': 'NONE'}, 'ResponseMetadata': {'RequestId': 'cf50eb78-4ca1-046e-9320-60fc4f86a19d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'cf50eb78-4ca1-046e-9320-60fc4f86a19d', 'x-amz-id-2': 'OlvyTLy0pFYAaVTddrnumhQxZys6ORItH0y7vM414y9qc5dw+lxcZ6QYAU68joY/EevuMBYVsykvj8YMIGqpdK9vQVu4Vb2r', 'date': 'Wed, 22 Feb 2023 21:29:40 G

{'StreamName': 'stream1',
 'StreamARN': 'arn:aws:kinesis:us-east-1:934157730474:stream/stream1',
 'StreamStatus': 'ACTIVE',
 'StreamModeDetails': {'StreamMode': 'PROVISIONED'},
 'Shards': [{'ShardId': 'shardId-000000000000',
   'HashKeyRange': {'StartingHashKey': '0',
    'EndingHashKey': '340282366920938463463374607431768211455'},
   'SequenceNumberRange': {'StartingSequenceNumber': '49638266352139984731879337046679361028812958104742264834'}}],
 'HasMoreShards': False,
 'RetentionPeriodHours': 24,
 'StreamCreationTimestamp': datetime.datetime(2023, 2, 22, 15, 55, 53, tzinfo=tzlocal()),
 'EnhancedMonitoring': [{'ShardLevelMetrics': []}],
 'EncryptionType': 'NONE'}

In [7]:
kinesisStream.put_record({'name':'x','price':100}, 'id_100')

{'ShardId': 'shardId-000000000000',
 'SequenceNumber': '49638266352139984731879337088123755976841816827983036418',
 'ResponseMetadata': {'RequestId': 'd0052b83-e031-c6f4-8c75-a0231787da1a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd0052b83-e031-c6f4-8c75-a0231787da1a',
   'x-amz-id-2': '7/yqHK84s37djAUXtd3Zh8+RcMCP9I+K8b7cIXeSj8HU9pwyU5scj8n6m/ro0o9X5qNvDmLUqF0q4vouruvnjB0+T68kG49p',
   'date': 'Wed, 22 Feb 2023 21:30:08 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '110'},
  'RetryAttempts': 0}}

In [62]:
for x in kinesisStream.get_records(20):
    print(x)

AAAAAAAAAAF0CQTXG7eTLgwjnb6xv/Qg1OTBBJ8/5bU6IGqIeGlSFAvjDr9jD/OGmPcORDyFlQlqLiZv3JnF+qHgr/z8zUSnSrBq0A1A+dLy24/oVbZ7SkTkSFb/MeI5nnTnuyelTOWMxGay3415YJLu3Wx9CWZyVysRMjMAo4UKasYWlYO7RW/r0pPlYO3ZEULYgVEDOChVJcQF26feS8bPNA56PuLtGqSxfrWjZZhK2+uNhFockw==
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]


KeyboardInterrupt: 

In [8]:
def get_data():
    return {
        'EVENT_TIME': datetime.datetime.now().isoformat(),
        'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'PRICE': round(random.random() * 100, 2)}
