In [None]:
!pip install boto3 awscli

In [2]:
from google.colab import userdata

In [3]:
import boto3
import json
import time

In [4]:
# Configuracao das credenciais da AWS (substitua pelos seus valores) - Utilizando o Google Colab faca a criacao da credencial com os nomes aws_access_key_id e aws_secret_access_key.
aws_access_key_id = userdata.get('aws_access_key_id')
aws_secret_access_key = userdata.get('aws_secret_access_key')
region_name = 'us-east-2'

In [7]:
# Inicializacao dos clientes do Kinesis e Firehose
kinesis_client = boto3.client(
    'kinesis',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

firehose_client = boto3.client(
    'firehose',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

In [8]:
# Nome do stream do Kinesis Firehose
delivery_stream_name = 'eda-kdf-market'  # Substitua pelo nome desejado
# Nome do seu Kinesis Data Stream
stream_name = 'eda-kds-market' # Substitua pelo nome desejado

In [None]:
# Criacao do stream do Kinesis Firehose (se nao existir)
firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = 'DirectPut',
    S3DestinationConfiguration = {
        'RoleARN': 'arn:aws:iam::763308243640:role/eda-kdf-firehose-access',  # Substitua pelos seus valores - Acesso do Firehose ao S3 e AWS Lambda
        'BucketARN': 'arn:aws:s3:::eda-landing-zone-us-east-2-763308243640',  # Substitua pelos seus valores - Nome do bucket que armazera os dados originados do Firehose
        'Prefix': 'dados_firehose/eda_kdf_market/',  # Substitua pelo prefixo desejado - Nome do "diretprio" no bucket do s3 que vai armazenar os dados processados
        'BufferingHints': {
            'SizeInMBs': 1,
            'IntervalInSeconds': 60
        },
        'CompressionFormat': 'UNCOMPRESSED',
        'ErrorOutputPrefix': 'erros/'
    }
)
print(f"Stream do Kinesis Firehose '{delivery_stream_name}' criado com sucesso.")

# Aguardar a criacao do stream (pode levar alguns minutos)

In [10]:
# Funcao para processar os registros do Kinesis
def process_records(records):
    for record in records:
        # Decodifica os dados do registro
        data = record['Data'].decode('utf-8')

        # Converte os dados para um dicionario Python (se necessario)
        try:
            data_dict = json.loads(data)
        except:
            print("Erro ao converter dados para JSON:", data)
            continue

        # Aqui voce pode realizar o processamento dos dados,
        # como filtragem, transformacao, etc.

        # Envia os dados para o Firehose
        response = firehose_client.put_record(
            DeliveryStreamName=delivery_stream_name,
            Record={
                'Data': json.dumps(data_dict)
            }
        )

        # Imprime a resposta do Firehose (opcional)
        print(response)

In [None]:
# Loop principal para consumir dados do Kinesis
while True:
    # Obtem os shards do Kinesis Data Stream
    response = kinesis_client.describe_stream(StreamName=stream_name)
    shards = response['StreamDescription']['Shards']

    # Itera sobre os shards
    for shard in shards:
        shard_id = shard['ShardId']

        # Obtem o iterator para o shard
        shard_iterator = kinesis_client.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType='TRIM_HORIZON'  # LATEST para ler a partir da inicializacao do KDF ou 'TRIM_HORIZON' para ler desde o inicio
        )['ShardIterator']

        # Loop para consumir registros do shard
        while True:
            # Obtem os registros do shard
            records_response = kinesis_client.get_records(
                ShardIterator=shard_iterator,
                Limit=100  # Numero maximo de registros a serem retornados
            )

            # Processa os registros
            process_records(records_response['Records'])

            # Verifica se ha mais registros a serem lidos
            if 'NextShardIterator' in records_response:
                shard_iterator = records_response['NextShardIterator']
            else:
                break

            # Aguarda um curto periodo antes de ler mais registros (opcional)
            time.sleep(1)