In [2]:
import os
import uuid
import random
import ast
import base64
import json

import boto3
import pandas as pd

REGION = boto3.session.Session().region_name
print(f"AWS Region = {REGION}")

AWS Region = us-east-1


## Load the data from local storage

In [3]:
df = pd.read_csv("Fake and Real News Dataset/True.csv")

In [4]:
def get_record(df):
    """
    This is a generator to get a random record from the source dataframe. Also, converts
    the data into bytes object. Data Streams accepts only bytes datatypes.
    """
    i = 0
    while True:
        record = df.sample(1).to_dict()
        record = {
            "text": "".join(record['text'].values()),
        }
        record = str(record).encode('utf-8')
        yield record

record_generator = get_record(df) # Initialize the generator

### Descriptive stats of the Real news data

In [5]:
df.text.apply(lambda x:len(x.split())).describe()

count    21417.000000
mean       385.640099
std        274.006204
min          0.000000
25%        148.000000
50%        359.000000
75%        525.000000
max       5172.000000
Name: text, dtype: float64

## PUT data into Kinesis Streams (randomly)

In [6]:
PARTITION_KEYS = [uuid.uuid4().hex for i in range(512)]

STREAM_NAME = "news-stream" # Name of the Kinesis Stream

In [32]:
kinesis_client = boto3.client('kinesis') # Kinesis client
len(kinesis_client.describe_stream(StreamName="news-stream")['StreamDescription']['Shards'])

62

### Run the cell only once and comment out to use same data every time (if needed)
Otherwise use accordingly

In [34]:
put_records = []
count = 5
for _ in range(count):    
    put_params = {

        'Data': next(record_generator),
        'PartitionKey': random.choice(PARTITION_KEYS)
    }
    put_records.append(put_params)

assert STREAM_NAME in kinesis_client.list_streams()['StreamNames']
len(put_records)

5

In [35]:
params = {
    'StreamName': STREAM_NAME,
    'Records' : put_records
}
kinesis_client.put_records(**params)['ResponseMetadata']

{'RequestId': 'f83febb4-cf86-e7f3-a500-7cc6ec84cd77',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'x-amzn-requestid': 'f83febb4-cf86-e7f3-a500-7cc6ec84cd77',
  'x-amz-id-2': '6jfsAK5bFtvQbfoYVJs7Hlv+9m2d9Awjbiihxxhboxj2IgWPbcvuBHwZW9xQlVr3vO4/uomQSh34vj6UZSndbm+ZgjBSSA45',
  'date': 'Sat, 29 Jul 2023 22:39:46 GMT',
  'content-type': 'application/x-amz-json-1.1',
  'content-length': '590'},
 'RetryAttempts': 0}

In [None]:
Write capacity
16 MiB/second and 16000 records/second
Read capacity
32 MiB/second