In [19]:
from kafka import KafkaConsumer

topic_name = 'testout'
group_id = 'test'
bootstrap_servers = ['localhost:9092']

consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    group_id=group_id,
    auto_offset_reset='earliest', 
    enable_auto_commit=False )

for i,  message in enumerate(consumer):
    print(message.value)
    if i == 3:
        consumer.close()

b'{"id":"c451a038-9f30-499b-b652-ec877df6f50d","name":"Dr. Jason Mcdowell","email":"lewisjennifer@example.com","date":"2024-06-04T09:32:45.282497","country":"Latvia","company":"Cantrell LLC","job":"Logistics and distribution manager","phone":"(484)200-7378x036","sentence":"Become she bad edge maintain.","number":"70","timestamp":"1.6939533142669601E9"}'
b'{"id":"3e5324e6-0da6-4934-8693-8e5ae46c32d6","name":"Douglas Moyer","email":"julie58@example.org","date":"2024-04-03T23:37:24.041961","country":"Romania","company":"Smith-Lopez","job":"Television camera operator","phone":"001-870-384-8740x260","sentence":"Mrs other more recently born on.","number":"6","timestamp":"1.652820360989759E9"}'
b'{"id":"f5b19dcf-cfb8-4583-a2ed-397fc50e7103","name":"Michael Valdez","email":"bmunoz@example.org","date":"2024-01-01T15:12:43.217212","country":"Malawi","company":"Wagner-Walters","job":"Journalist, broadcasting","phone":"795.951.1204x19741","sentence":"Suffer resource baby trouble country action.","

In [7]:
import json
a = message.value.decode()

json.loads(a)

{'key': '4043ab72-91cd-47dd-9d1b-8c56a5df8e1b',
 'value': {'id': '4043ab72-91cd-47dd-9d1b-8c56a5df8e1b',
  'name': 'Charles Kemp',
  'email': 'abryant@example.net',
  'date': '2024-06-06T04:55:32.828762',
  'country': 'Marshall Islands',
  'company': 'Hernandez, Guerrero and Stevens',
  'job': 'Scientist, audiological',
  'phone': '(681)644-4927',
  'sentence': 'Yourself fear address recent adult.',
  'number': 25}}

In [17]:
from pyspark.sql.types import *
import pandas as pd

class DataUtil:
    def __init__(self) -> None:
        pass
    
    @staticmethod
    def _get_one_kafka_record(topic_name, bootstrap_servers, group_id=None):
        if not group_id:
            group_id = 'read_one_record'
            
        consumer = KafkaConsumer(
            topic_name,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest', 
            enable_auto_commit=False )
        try:
            for i, c in enumerate(consumer):
                if c is not None:
                    # this is real string in one record
                    return c.value.decode('utf-8')
                if i == 10:
                    # not sure here needed?
                    break
            print("Not get")
        finally:
            consumer.close()

    @staticmethod
    def _infer_kafka_data_schema(input_topic, bootstrap_servers, group_id=None, return_engine='flink'):
        # todo: for spark and pyflink schema is different, change it.
        kafka_record = DataUtil._get_one_kafka_record(input_topic, bootstrap_servers, group_id=group_id)
        if not kafka_record:
            print("Couldn't get one record from kafka topic: {}".format(input_topic))
            return None

        # based on record to get value, and it's schema
        record_json = json.loads(kafka_record)
        value_json = record_json['value']
        
        df = pd.json_normalize(value_json)
        
        if return_engine == 'flink':
            schema = {}
            for col, dtype in zip(df.columns, df.dtypes):
                if dtype == 'int64':
                    schema[col] = "INT"
                elif dtype == 'float64':
                    schema[col] = "DOUBLE"
                elif dtype == 'bool':
                    schema[col] = "BOOLEAN"
                elif pd.api.types.is_datetime64_any_dtype(dtype):
                    schema[col] = "TIMESTAMP"
                else:
                    schema[col] = "STRING"
            return schema
        else:
            # convert to structure type for spark
            schema = {}
            for col, dtype in zip(df.columns, df.dtypes):
                if dtype == 'int64':
                    schema[col] = IntegerType()
                elif dtype == 'float64':
                    schema[col] = FloatType()
                elif dtype == 'bool':
                    schema[col] = BooleanType()
                else:
                    schema[col] = StringType()
                    
            field_list = []
            for c, t in schema.items():
                field = StructField(c, t, True)
                field_list.append(field)
            schema = StructType(field_list) 
            return schema

input_topic = topic_name
bootstrap_servers = bootstrap_servers

DataUtil._infer_kafka_data_schema(topic_name, bootstrap_servers, return_engine='flink')      


{'id': 'STRING',
 'name': 'STRING',
 'email': 'STRING',
 'date': 'STRING',
 'country': 'STRING',
 'company': 'STRING',
 'job': 'STRING',
 'phone': 'STRING',
 'sentence': 'STRING',
 'number': 'INT',
 'timestamp': 'DOUBLE'}