## boto3 client and kinesis data stream creation

In [None]:
import boto3
import json
import time
import random

# Create a Boto3 Kinesis client
kinesis_client = boto3.client('kinesis')

# Function to create a Kinesis Data Stream
def create_kinesis_stream(stream_name):
    # Define the parameters for creating the stream
    stream_creation_params = {
        'StreamName': stream_name,
        'ShardCount': 2  # Number of shards for the stream
    }

    # Create the Kinesis Data Stream
    response = kinesis_client.create_stream(**stream_creation_params)
    
    return response



## used for producer (kinesis data stream)

In [None]:
def put_weather_data(city, temperature, humidity):
    try:
        data = {
            "city": city,
            "temperature": temperature,
            "humidity": humidity,
            "timestamp": int(time.time())
        }

        response = kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=city
        )
        print(f"Weather data written to Kinesis. SequenceNumber: {response['SequenceNumber']}, ShardId: {response['ShardId']}, Data: {data}")
    except Exception as e:
        print(f"Error: {e}")

## used for consumer (kinesis data stream)

In [None]:

def get_shard_iterator():
    response = kinesis_client.get_shard_iterator(
        StreamName=stream_name,
        ShardId='shardId-000000000001',  # Replace with your shard ID
        ShardIteratorType='TRIM_HORIZON'
    )
    return response['ShardIterator']

def get_records(shard_iterator):
    response = kinesis_client.get_records(
        ShardIterator=shard_iterator,
        Limit=10
    )
    return response.get('Records', []), response.get('NextShardIterator')

In [None]:
def process_weather_data(data):
    print(f"Received Weather Data: {data}")

## main() - Kinesis Data Stream

In [None]:
# Main function
if __name__ == "__main__":
    # Specify the name of the Kinesis Data Stream
    stream_name = 'paravx1-datastream'  # Change to your desired stream name

    # Create the Kinesis Data Stream
    weather_stream = create_kinesis_stream(stream_name)

    # Print the response from the create_stream operation
    print(weather_stream)


## To add weather data to Kinesis Data Stream

In [None]:
    cities = ["New York", "Los Angeles", "Chicago", "Houston", "Miami"]

    for _ in range(5):
        city = random.choice(cities)
        temperature = round(random.uniform(10, 30), 2)
        humidity = round(random.uniform(40, 80), 2)

        put_weather_data(city, temperature, humidity)
        time.sleep(2)  # Simulating periodic data transmission

## Reading / getting records from kinesis data stream

In [None]:
    shard_iterator = get_shard_iterator()

    while True:
        records, next_shard_iterator = get_records(shard_iterator)
        for record in records:
            data = json.loads(record['Data'])
            process_weather_data(data)

        if not next_shard_iterator:
            print("No more records in the shard. Exiting.")
            break

        shard_iterator = next_shard_iterator
        time.sleep(2)  # Add a delay between reads (adjust as needed)

## To Describe Kinesis Data Stream

In [None]:
!aws kinesis describe-stream-summary --stream-name 'paravx1-datastream'

## To delete Kinesis Data Stream

In [None]:
!aws kinesis delete-stream --stream-name 'paravx1-datastream'