In [2]:
import time
import random
import boto3

In [17]:
client = boto3.client("kinesis", region_name="eu-central-1")

STREAM_NAME = "vbo-aws-de-demo-ds"

# Create stream

In [24]:
client.create_stream(
    StreamName=STREAM_NAME,
    # ShardCount=2, only PROVISIONED mode
    StreamModeDetails={
          'StreamMode': 'ON_DEMAND'
      }
)

{'ResponseMetadata': {'RequestId': 'd97018a6-7147-bf2a-8440-5e46c528c29e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd97018a6-7147-bf2a-8440-5e46c528c29e',
   'x-amz-id-2': 'oQrNnqJeE7bahjhcrYrAgNpHpauFSKq+phnjW1bcBILUjwjqlfL1nwULoHEdIzePRxS2ZSUENnTft2gUVMRLNTGTLRrTOmOW',
   'date': 'Tue, 18 Jul 2023 07:52:00 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0'},
  'RetryAttempts': 0}}

In [25]:
stream_description = client.describe_stream(StreamName=STREAM_NAME)
stream_description

{'StreamDescription': {'StreamName': 'vbo-aws-de-demo-ds',
  'StreamARN': 'arn:aws:kinesis:eu-central-1:152100380489:stream/vbo-aws-de-demo-ds',
  'StreamStatus': 'ACTIVE',
  'StreamModeDetails': {'StreamMode': 'ON_DEMAND'},
  'Shards': [{'ShardId': 'shardId-000000000000',
    'HashKeyRange': {'StartingHashKey': '0',
     'EndingHashKey': '85070591730234615865843651857942052863'},
    'SequenceNumberRange': {'StartingSequenceNumber': '49642750538171707563930981033457045185650116393439330306'}},
   {'ShardId': 'shardId-000000000001',
    'HashKeyRange': {'StartingHashKey': '85070591730234615865843651857942052864',
     'EndingHashKey': '170141183460469231731687303715884105727'},
    'SequenceNumberRange': {'StartingSequenceNumber': '49642750538194008309129511656598580903922764754945310738'}},
   {'ShardId': 'shardId-000000000002',
    'HashKeyRange': {'StartingHashKey': '170141183460469231731687303715884105728',
     'EndingHashKey': '255211775190703847597530955573826158591'},
    'Sequ

In [26]:
stream_arn = stream_description['StreamDescription']['StreamARN']
stream_arn

'arn:aws:kinesis:eu-central-1:152100380489:stream/vbo-aws-de-demo-ds'

In [29]:
shard_ids = [ shard['ShardId'] for shard in stream_description['StreamDescription']['Shards'] ]
shard_ids

['shardId-000000000000',
 'shardId-000000000001',
 'shardId-000000000002',
 'shardId-000000000003']

In [30]:
import pandas as pd

In [31]:
df = pd.read_csv("https://raw.githubusercontent.com/erkansirin78/datasets/master/tr_il_plaka_kod.csv")
df.head()

Unnamed: 0,plaka,il
0,1,Adana
1,2,Adıyaman
2,3,Afyonkarahisar
3,4,Ağrı
4,5,Amasya


# Start producing

In [34]:
try:
    for idx, city in df.iterrows():
        time.sleep(2)
        data = city['il'].encode("utf-8")
        print(f"Sending {data.decode('utf-8')}")
        response = client.put_record(StreamName=STREAM_NAME, Data=data, 
                                     PartitionKey=str(city['plaka']))
        print(response['ShardId'])
except KeyboardInterrupt:
    print("Finishing due to keyboard interrupt")

Sending Adana
shardId-000000000003
Sending Adıyaman
shardId-000000000003
Sending Afyonkarahisar
shardId-000000000003
Sending Ağrı
shardId-000000000002
Sending Amasya
shardId-000000000003
Sending Ankara
shardId-000000000000
Sending Antalya
shardId-000000000002
Sending Artvin
shardId-000000000003
Sending Aydın
shardId-000000000001
Sending Balıkesir
shardId-000000000003
Sending Bilecik
shardId-000000000001
Sending Bingöl
shardId-000000000003
Sending Bitlis
shardId-000000000003
Sending Bolu
shardId-000000000002
Sending Burdur
shardId-000000000002
Sending Bursa
shardId-000000000003
Sending Çanakkale
shardId-000000000001
Sending Çankırı
shardId-000000000001
Sending Çorum
shardId-000000000000
Sending Denizli
shardId-000000000002
Sending Diyarbakır
shardId-000000000000
Sending Edirne
shardId-000000000002
Sending Elâzığ
shardId-000000000000
Sending Erzincan
shardId-000000000000
Sending Erzurum
shardId-000000000002
Sending Eskişehir
shardId-000000000001
Sending Gaziantep
shardId-000000000000
Sen

In [15]:
client.delete_stream(StreamARN=stream_arn)

{'ResponseMetadata': {'RequestId': 'efd13487-ddb7-7677-b2e1-728ffe77d573',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'efd13487-ddb7-7677-b2e1-728ffe77d573',
   'x-amz-id-2': 'Cg1BMa9RUZuLBsLFSXQAzfuqi/DYxM401fK+pHXJaTKj1KEBTeXS+K5TcxSqHjmTXQzpOp06cDyoFaGlJXCHGHSM9FXlKUQa',
   'date': 'Tue, 18 Jul 2023 07:48:24 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0'},
  'RetryAttempts': 0}}

In [16]:
client.list_streams()

{'StreamNames': ['vbo-aws-de-demo-ds'],
 'HasMoreStreams': False,
 'StreamSummaries': [{'StreamName': 'vbo-aws-de-demo-ds',
   'StreamARN': 'arn:aws:kinesis:eu-central-1:152100380489:stream/vbo-aws-de-demo-ds',
   'StreamStatus': 'DELETING',
   'StreamModeDetails': {'StreamMode': 'ON_DEMAND'},
   'StreamCreationTimestamp': datetime.datetime(2023, 7, 18, 7, 45, 9, tzinfo=tzlocal())}],
 'ResponseMetadata': {'RequestId': 'dc057b56-f62f-8ef3-8135-3d5fe612178b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'dc057b56-f62f-8ef3-8135-3d5fe612178b',
   'x-amz-id-2': 'CxG9rp/1JiQcapGWeeBCxNVBw5qh0OfR0V3yIlSiotJdOt9VbDj457eKCEUjx8gCZshEh9FWA3Szy3zMPIj4QfnBZ1iFm8ts',
   'date': 'Tue, 18 Jul 2023 07:48:25 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '312'},
  'RetryAttempts': 0}}