In [1]:
from io import StringIO
from kafka import KafkaConsumer
from json import dumps,loads
import boto3
import json
import pandas as pd

In [2]:
# AWS S3 Configuration
s3_client = boto3.client('s3')
bucket_name = '' #your bucket name
s3_csv_key = '' #your path that contain only csv

In [3]:
# Kafka Configuration
kafka_topic = '' # your topic name
consumer = KafkaConsumer(
    kafka_topic,
    bootstrap_servers=['localhost:9092'], # your ip address
    value_deserializer=lambda x: loads(x.decode('utf-8')))

In [4]:
#Transfrom data to your design table
def transform_data(data):
    datetime = pd.to_datetime(data['time']['updated'])
    usd_rate = float(data['bpi']['USD']['rate_float'])
    thb_rate = float(data['bpi']['THB']['rate_float'])
    return {
        'date': datetime.date(),
        'time': datetime.time(),
        'usd_rate': usd_rate,
        'thb_rate': thb_rate
    }

In [5]:
#Check file exists or not
def read_existing_data_from_s3():
    try:
        response = s3_client.get_object(Bucket=bucket_name, Key=s3_csv_key)
        return pd.read_csv(response['Body'])
    except:
        return pd.DataFrame()

In [None]:
#Write data to AWS s3 in CSV file
def write_data_to_s3(df):
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    s3_client.put_object(Bucket=bucket_name, Key=s3_csv_key, Body=csv_buffer.getvalue())
    print("Data appended to CSV in S3")

In [7]:
#Gather everything together and add new data to existing CSV file
def append_to_csv_s3(data):
    transformed_data = transform_data(data)
    df_new = pd.DataFrame([transformed_data])
    df_existing = read_existing_data_from_s3()
    df_combined = pd.concat([df_existing, df_new], ignore_index=True)
    write_data_to_s3(df_combined)

In [None]:
#Loop and send it to AWS s3
for message in consumer:
    bitcoin_data = message.value
    print("Received data:", bitcoin_data)
    append_to_csv_s3(bitcoin_data)

Received data: {'time': {'updated': 'Nov 23, 2023 04:54:00 UTC', 'updatedISO': '2023-11-23T04:54:00+00:00', 'updateduk': 'Nov 23, 2023 at 04:54 GMT'}, 'disclaimer': 'This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org', 'bpi': {'USD': {'code': 'USD', 'rate': '37,346.9455', 'description': 'United States Dollar', 'rate_float': 37346.9455}, 'THB': {'code': 'THB', 'rate': '1,345,983.9509', 'description': 'Thai Baht', 'rate_float': 1345983.9509}}}
         date      time    usd_rate      thb_rate
0  2023-11-23  04:54:00  37346.9455  1.345984e+06
Data appended to CSV in S3
Received data: {'time': {'updated': 'Nov 23, 2023 04:55:00 UTC', 'updatedISO': '2023-11-23T04:55:00+00:00', 'updateduk': 'Nov 23, 2023 at 04:55 GMT'}, 'disclaimer': 'This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org'