In [20]:
## Sample code to send messages to Kafka
from kafka import KafkaProducer

bootstrap_servers = 'kafka:29092'
producer = KafkaProducer(bootstrap_servers='kafka:29092')
topic = 'test_t'
messages = [
    '<?xml version="1.0><filmes><filme id="4"><titulo>O XML veste prada</titulo><resumo>O filme mostra a elegância da XML na representação de dados estruturados e semi estruturados.</resumo><genero>Aventura</genero><genero>Documentário</genero><elenco><ator>Mark UPlanguage</ator><ator>Mary well-Formed</ator><ator>Sedna D. Atabase</ator></elenco></filme></filmes>',
    '<?xml version="1.0><filmes><filme id="5"><titulo>O XML veste prada</titulo><resumo>O filme mostra a elegância da XML na representação de dados estruturados e semi estruturados.</resumo><genero>Aventura</genero><genero>Documentário</genero><elenco><ator>Mark UPlanguage</ator><ator>Mary well-Formed</ator><ator>Sedna D. Atabase</ator></elenco></filme></filmes>',
    '<?xml version="1.0><filmes><filme id="6"><titulo>O XML veste prada</titulo><resumo>O filme mostra a elegância da XML na representação de dados estruturados e semi estruturados.</resumo><genero>Aventura</genero><genero>Documentário</genero><elenco><ator>Mark UPlanguage</ator><ator>Mary well-Formed</ator><ator>Sedna D. Atabase</ator></elenco></filme></filmes>'
]
count = 1
while count > 0:
    for message in messages:
        producer.send(topic, message.encode('utf-8'))
    count = count - 1

producer.flush()
producer.close()

In [21]:
## Using KafkaConsumer and kafka-python
## Access docker shell and execute pip install kafka-python

from kafka import KafkaConsumer
import pandas as pd
import itertools
import time


In [22]:
# Create a fuction to receive a dict and parse message
def record_to_dict(record):
    value = record.value.decode('utf-8')
    return {
        "topic": str(record.topic),
        "partition": int(record.partition),
        "offset": int(record.offset),
        "timestamp": int(record.timestamp),
        "timestamp_type": int(record.timestamp_type),
        "key": str(record.key),
        "value": str(value),
        "headers": str(record.headers),
        "checksum": str(record.checksum),
        "serialized_key_size": int(record.serialized_key_size),
        "serialized_value_size": int(record.serialized_value_size),
        "serialized_header_size": int(record.serialized_header_size)
    }

In [23]:
# Function to cleanup dataframe and keep only 'offset', 'timestamp', 'key', 'value'
from pyspark.sql import SparkSession

def cleanup_dataframe(data):
    spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
    spark_df = spark.createDataFrame(data)
    selected_columns = ["offset", "timestamp", "key", "value"]
    spark_df = spark_df.select(selected_columns)
    parquet_path = "/tmp/parquet_file.parquet"
    spark_df.write.mode('append').parquet(parquet_path)

    


In [24]:
## Consumer Function to retrieve messages from topic: 
## Function receive topic as parameter

def consumer(topic_name):
    try:
        consumer = KafkaConsumer(bootstrap_servers='kafka:29092',
                                 group_id='my-ssdgroupss',
                                 auto_offset_reset = "latest",
                                 enable_auto_commit = "false",
                                 max_poll_records = 100
                                )

        consumer.subscribe([f'{topic_name}'])
        total_messages = 0

        # keep track of when we last received a message
        last_message_time = time.time()

        while True:
            messages = consumer.poll(timeout_ms=10000).values()
            if messages:
                flat_messages = list(itertools.chain.from_iterable(messages))
                data = list(map(record_to_dict, flat_messages))
                df = pd.DataFrame(data)
                df = cleanup_dataframe(df)
                total_messages += len(flat_messages)
                # reset the last message time
                last_message_time = time.time()
            
            # If we have not received a message for 10 seconds, stop consuming
            if time.time() - last_message_time > 10:
                print(f'Total messages received:{total_messages}')
                break

    except KeyboardInterrupt:
        print("Interrupted")

    finally:
        # After finished consumer will be closed, make sure that all messages were commited.
        consumer.commit()
        consumer.close()

In [27]:
## Call function as batch to read topic and need topic as parameter
consumer('test_t')

Total messages received:15


In [30]:
## Function to read parquet file
def read_parquet(path):
    spark = SparkSession.builder \
        .appName("Read Parquet") \
        .getOrCreate()
    df = spark.read.parquet(path)
    df.show()
    return df

In [31]:
# Sample function to read parquet and print as dataframe
parquet_path = "/tmp/parquet_file.parquet"
df = read_parquet(parquet_path)

+-------+-------------+----+--------------------+
| offset|    timestamp| key|               value|
+-------+-------------+----+--------------------+
|3038918|1686786509679|None|<?xml version="1....|
|3038907|1686786509678|None|<?xml version="1....|
|3038906|1686786509678|None|<?xml version="1....|
|3038932|1686786540870|None|<?xml version="1....|
|3038904|1686786471982|None|<?xml version="1....|
|3038933|1686786540870|None|<?xml version="1....|
|3038913|1686786509679|None|<?xml version="1....|
|3038922|1686786540870|None|<?xml version="1....|
|3038927|1686786540870|None|<?xml version="1....|
|3038908|1686786509678|None|<?xml version="1....|
|3038926|1686786540870|None|<?xml version="1....|
|3038905|1686786471982|None|<?xml version="1....|
|3038928|1686786540870|None|<?xml version="1....|
|3038917|1686786509679|None|<?xml version="1....|
|3038912|1686786509679|None|<?xml version="1....|
|3038911|1686786509678|None|<?xml version="1....|
|3038916|1686786509679|None|<?xml version="1....|
