# 1. Data Validation

In [29]:
# Imports básicos
import os
import json
import time
import datetime
import pandas as pd
from dotenv import load_dotenv
from kafka import KafkaProducer, KafkaConsumer

# PySpark imports
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassificationModel

# Carregar variáveis de ambiente
load_dotenv('.env')

# Criar SparkSession
spark = SparkSession.builder.appName("KafkaIntegration").getOrCreate()
base_path = os.getenv('BASE_PATH')

# Carregar modelo treinado
modelo_carregado = GBTClassificationModel.load("modelos/gradient_boosting_model")


In [31]:
# Configurações do Kafka
topic = 'customer-data'
bootstrap_servers = 'localhost:9092'

# Inicializar o produtor Kafka
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Carregar dados de clientes
base_path = os.getenv('BASE_PATH ')
df_customer_features = spark.read.csv(f"data/customer_features.csv", header=True, inferSchema=True)

df_customer_features.write.mode("Overwrite").parquet("/home/jovyan/code/data-ml")
df_customer_parquet = spark.read.parquet("/home/jovyan/code/data-ml")
# df_customer_features = df_customer_features.limit(10)  # Limitar para exemplo
df_customer_features = df_customer_parquet.limit(10)

# Enviar dados para o tópico Kafka
for row in df_customer_features.collect():
    data = row.asDict()

    # Corrigir campos que são datas
    for k, v in data.items():
        if isinstance(v, datetime.date):
            data[k] = v.isoformat()

    producer.send(topic, value=data)
    print(f"Enviado: {data}")
    time.sleep(0.5)  # Simular streaming

producer.flush()
producer.close()


INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. 
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 ho

Enviado: {'id': 2669945285, 'total_transactions': 1523, 'total_spent': 6320.52, 'avg_spent': 4.15, 'first_purchase': '2012-03-03', 'last_purchase': '2013-04-16', 'unique_categories': 248, 'unique_products': 312}
Enviado: {'id': 2669952782, 'total_transactions': 345, 'total_spent': 2229.0, 'avg_spent': 6.46, 'first_purchase': '2012-03-12', 'last_purchase': '2013-06-16', 'unique_categories': 108, 'unique_products': 145}
Enviado: {'id': 266996275, 'total_transactions': 1258, 'total_spent': 7447.03, 'avg_spent': 5.92, 'first_purchase': '2012-03-02', 'last_purchase': '2013-04-02', 'unique_categories': 191, 'unique_products': 212}
Enviado: {'id': 2670041982, 'total_transactions': 656, 'total_spent': 5417.1, 'avg_spent': 8.26, 'first_purchase': '2012-03-02', 'last_purchase': '2013-06-18', 'unique_categories': 160, 'unique_products': 279}
Enviado: {'id': 267008595, 'total_transactions': 777, 'total_spent': 2728.3, 'avg_spent': 3.51, 'first_purchase': '2012-03-09', 'last_purchase': '2013-06-19'

INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. 


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col
from kafka import KafkaConsumer
import json
from pyspark.ml.feature import VectorAssembler
import logging
import time

# Configurar logging para depuração
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configurações do Kafka
topic = 'customer-data'
bootstrap_servers = 'localhost:9092'

# Inicializar o consumidor Kafka com timeout
try:
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        consumer_timeout_ms=10000  # Timeout após 10 segundos sem mensagens
    )
except Exception as e:
    logger.error(f"Erro ao inicializar o consumidor Kafka: {e}")
    raise

# Esquema explícito para os dados recebidos, alinhado com o pipeline de treino
schema = StructType([
    StructField("id", StringType(), True),
    StructField("total_transactions", IntegerType(), True),
    StructField("total_spent", DoubleType(), True),
    StructField("avg_spent", DoubleType(), True),
    StructField("first_purchase", StringType(), True),
    StructField("last_purchase", StringType(), True),
    StructField("unique_categories", IntegerType(), True),
    StructField("unique_products", IntegerType(), True),
])

# Colunas de features usadas pelo modelo
feature_cols = [
    "total_transactions",
    "total_spent",
    "avg_spent",
    "unique_categories",
    "unique_products"
]

# Configurar o VectorAssembler
try:
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
except Exception as e:
    logger.error(f"Erro ao configurar VectorAssembler: {e}")
    raise

# Lista para acumular mensagens
batch_data = []
start_time = time.time()
timeout_duration = 60  # Timeout após 60 segundos

try:
    # Processar mensagens do Kafka
    for message in consumer:
        try:
            data = message.value
            logger.info(f"Recebido: {data}")

            # Validar os dados recebidos
            required_fields = schema.fieldNames()
            if not all(field in data for field in required_fields):
                logger.warning(f"Mensagem inválida, faltando campos: {data}")
                continue

            batch_data.append(data)

            # Processar em lote (a cada 5 mensagens)
            if len(batch_data) >= 5:
                logger.info(f"Processando lote de {len(batch_data)} mensagens")
                # Criar DataFrame com esquema explícito
                df = spark(batch_data)
                logger.info("DataFrame criado com sucesso")

                # Verificar valores nulos nas colunas de features
                df = df.dropna(subset=feature_cols)
                logger.info("Valores nulos removidos")

                # Montar vetor de features
                df = assembler.transform(df)
                logger.info("Vetor de features criado")

                # Fazer predição com o modelo
                predictions = modelo_carregado.transform(df)
                logger.info("Predição realizada")
                predictions.select("id", "prediction", "probability").show(truncate=False)

                # Limpar o batch
                batch_data = []

            # Verificar timeout
            if time.time() - start_time > timeout_duration:
                logger.info("Timeout atingido, encerrando loop")
                break

        except Exception as e:
            logger.error(f"Erro ao processar mensagem: {e}")
            continue

    # Processar mensagens restantes
    if batch_data:
        logger.info(f"Processando {len(batch_data)} mensagens restantes")
        df = spark.createDataFrame(batch_data, schema=schema)
        df = df.dropna(subset=feature_cols)
        df = assembler.transform(df)
        predictions = modelo_carregado.transform(df)
        predictions.select("id", "prediction", "probability").show(truncate=False)

except Exception as e:
    logger.error(f"Erro geral no processamento do Kafka: {e}")
finally:
    consumer.close()
    logger.info("Consumidor Kafka fechado")

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('customer-data',)
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='customer-data', partition=0), TopicPartition(topic='customer-data', partition=1), TopicPartition(topic='customer-data', partition=2)]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO

In [19]:
def safe_float(value, default=0.0):
    try:
        return float(value)
    except (ValueError, TypeError):
        return default

def safe_int(value, default=0):
    try:
        return int(value)
    except (ValueError, TypeError):
        return default

for message in consumer:
    raw_data = message.value
    print(f"Recebido: {raw_data}")

    try:
        data = {
            "id": safe_int(raw_data.get("id")),
            "total_transactions": safe_float(raw_data.get("total_transactions")),
            "total_spent": safe_float(raw_data.get("total_spent")),
            "avg_spent": safe_float(raw_data.get("avg_spent")),
            "first_purchase": raw_data.get("first_purchase", ""),
            "last_purchase": raw_data.get("last_purchase", ""),
            "unique_categories": safe_float(raw_data.get("unique_categories")),
            "unique_products": safe_float(raw_data.get("unique_products"))
        }

        df = spark.createDataFrame([Row(**data)], schema=schema)
        df_features = assembler.transform(df)
        pred = modelo_carregado.transform(df_features)
        pred.select("id", "prediction", "probability").show()

    except Exception as e:
        print(f"Erro ao processar mensagem: {e}")
        print(f"Mensagem com problema: {raw_data}")
        continue

    break  # remove esse break para rodar continuamente


KeyboardInterrupt: 

In [13]:
consumer = KafkaConsumer(
    'customer-data',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='spark-consumer-group'
)


In [14]:
for message in consumer:
    data = message.value
    print(f"Recebido: {data}")

    # Garantir que os campos esperados existam e com tipos corretos
    campos = ["id", "total_transactions", "total_spent", "avg_spent",
              "first_purchase", "last_purchase", "unique_categories", "unique_products"]

    for campo in campos:
        if campo not in data:
            data[campo] = None  # ou algum valor padrão

    # Conversão segura
    data["id"] = int(data["id"])
    data["total_transactions"] = float(data["total_transactions"])
    data["total_spent"] = float(data["total_spent"])
    data["avg_spent"] = float(data["avg_spent"])
    data["unique_categories"] = float(data["unique_categories"])
    data["unique_products"] = float(data["unique_products"])

    # Criar DataFrame com schema explícito
    data_filtrado = {k: data[k] for k in schema.fieldNames()}
    df = spark.createDataFrame([data_filtrado], schema=schema)

    # Transformar para features e aplicar modelo
    df = assembler.transform(df)
    pred = modelo_carregado.transform(df)

    # Mostrar resultado
    pred.select("id", "prediction", "probability").show()

    # Simplesmente para parar após 1 iteração no teste
    break


Recebido: {'id': 100007447, 'total_transactions': 1096, 'total_spent': 6644.88, 'avg_spent': 6.06, 'first_purchase': '2012-03-06', 'last_purchase': '2013-04-21', 'unique_categories': 235, 'unique_products': 226}


Traceback (most recent call last):
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
    return self._dynamic_function_reduce(obj)
      

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [10]:
for message in consumer:
    data = message.value
    print(f"Recebido: {data}")

    # Garantir que os dados estejam no formato certo
    for key in ["total_transactions", "total_spent", "avg_spent", "unique_categories", "unique_products"]:
        if key in data and data[key] is not None:
            data[key] = float(data[key])

    df = spark.createDataFrame([data], schema=schema)
    df = assembler.transform(df)

    # Usar o modelo carregado globalmente
    predictions = modelo_carregado.transform(df)
    predictions.select("prediction", "probability").show()

    break  # apenas para teste, pode remover isso depois


Recebido: {'id': 100033247, 'total_transactions': 1596, 'total_spent': 6834.73, 'avg_spent': 4.28, 'first_purchase': '2012-03-04', 'last_purchase': '2013-03-28', 'unique_categories': 265, 'unique_products': 237}


Traceback (most recent call last):
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
    return self._dynamic_function_reduce(obj)
      

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

# Inicializar o consumidor Kafka
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Esquema explícito para os dados recebidos
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("total_transactions", DoubleType(), True),
    StructField("total_spent", DoubleType(), True),
    StructField("avg_spent", DoubleType(), True),
    StructField("first_purchase", StringType(), True),
    StructField("last_purchase", StringType(), True),
    StructField("unique_categories", DoubleType(), True),
    StructField("unique_products", DoubleType(), True),
])

# Colunas de features
feature_cols = [
    "total_transactions",
    "total_spent",
    "avg_spent",
    "unique_categories",
    "unique_products"
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Processar mensagens do Kafka
for message in consumer:
    data = message.value
    print(f"Recebido: {data}")

    # Converter valores numéricos para float
    for key in ['total_transactions', 'total_spent', 'avg_spent', 'unique_categories', 'unique_products']:
        if key in data and data[key] is not None:
            data[key] = float(data[key])

    # Criar DataFrame com esquema explícito
    df = spark.createDataFrame([data], schema=schema)

    # Montar vetor de features
    df = assembler.transform(df)

    # Fazer predição com o modelo
    predictions = modelo_carregado.transform(df)
    predictions.select("prediction", "probability").show()

    # Pausar após uma iteração (para teste)
    break



Recebido: {'id': 100012115, 'total_transactions': 118, 'total_spent': 553.02, 'avg_spent': 4.69, 'first_purchase': '2012-03-12', 'last_purchase': '2013-04-17', 'unique_categories': 62, 'unique_products': 54}


Traceback (most recent call last):
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
    return self._dynamic_function_reduce(obj)
      

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [37]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, to_date, datediff, lit
from kafka import KafkaConsumer
import json
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassificationModel
import logging
import time

# Configurar logging para depuração
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configurações do Kafka
topic = 'customer-data'
bootstrap_servers = 'localhost:9092'

# Inicializar o consumidor Kafka com timeout
try:
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        consumer_timeout_ms=10000  # Timeout após 10 segundos sem mensagens
    )
except Exception as e:
    logger.error(f"Erro ao inicializar o consumidor Kafka: {e}")
    raise

# Esquema explícito para os dados recebidos
schema = StructType([
    StructField("id", StringType(), True),
    StructField("total_transactions", IntegerType(), True),
    StructField("total_spent", DoubleType(), True),
    StructField("avg_spent", DoubleType(), True),
    StructField("first_purchase", StringType(), True),
    StructField("last_purchase", StringType(), True),
    StructField("unique_categories", IntegerType(), True),
    StructField("unique_products", IntegerType(), True),
])

# Colunas de features usadas pelo modelo
feature_cols = [
    "total_transactions",
    "total_spent",
    "avg_spent",
    "unique_categories",
    "unique_products"
]

# Configurar o VectorAssembler
try:
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
except Exception as e:
    logger.error(f"Erro ao configurar VectorAssembler: {e}")
    raise

# Data de referência para cálculo de dias
reference_date = to_date(lit("2025-05-29"))

# Lista para acumular mensagens
batch_data = []
start_time = time.time()
timeout_duration = 60  # Timeout após 60 segundos

try:
    # Processar mensagens do Kafka
    for message in consumer:
        try:
            data = message.value
            logger.info(f"Recebido: {data}")

            # Validar os dados recebidos
            required_fields = schema.fieldNames()
            if not all(field in data for field in required_fields):
                logger.warning(f"Mensagem inválida, faltando campos: {data}")
                continue

            batch_data.append(data)

            # Processar em lote (a cada 5 mensagens)
            if len(batch_data) >= 5:
                logger.info(f"Processando lote de {len(batch_data)} mensagens")
                # Criar DataFrame com esquema explícito
                df = spark.createDataFrame(batch_data, schema=schema)
                logger.info("DataFrame criado com sucesso")

                # Pré-processamento
                df = df.withColumn("days_since_first_purchase", datediff(reference_date, to_date(col("first_purchase")))) \
                       .withColumn("days_since_last_purchase", datediff(reference_date, to_date(col("last_purchase"))))
                logger.info("Colunas de dias calculadas")

                # Verificar valores nulos
                df = df.dropna(subset=feature_cols)
                logger.info("Valores nulos removidos")

                # Montar vetor de features
                df = assembler.transform(df)
                logger.info("Vetor de features criado")

                # Fazer predição com o modelo
                predictions = modelo_carregado.transform(df)
                logger.info("Predição realizada")
                predictions.select("id", "prediction", "probability").show(truncate=False)

                # Limpar o batch
                batch_data = []

            # Verificar timeout
            if time.time() - start_time > timeout_duration:
                logger.info("Timeout atingido, encerrando loop")
                break

        except Exception as e:
            logger.error(f"Erro ao processar mensagem: {e}")
            continue

    # Processar mensagens restantes
    if batch_data:
        logger.info(f"Processando {len(batch_data)} mensagens restantes")
        df = spark.createDataFrame(batch_data, schema=schema)
        df = df.withColumn("days_since_first_purchase", datediff(reference_date, to_date(col("first_purchase")))) \
               .withColumn("days_since_last_purchase", datediff(reference_date, to_date(col("last_purchase"))))
        df = df.dropna(subset=feature_cols)
        df = assembler.transform(df)
        predictions = modelo_carregado.transform(df)
        predictions.select("id", "prediction", "probability").show(truncate=False)

except Exception as e:
    logger.error(f"Erro geral no processamento do Kafka: {e}")
finally:
    consumer.close()
    logger.info("Consumidor Kafka fechado")

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('customer-data',)
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='customer-data', partition=0), TopicPartition(topic='customer-data', partition=1), TopicPartition(topic='customer-data', partition=2)]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO

In [9]:
def on_send_success(metadata):
    print(f'Published to Kafka. Topic: {metadata.topic}, Partition: {metadata.partition}, Offset: {metadata.offset}')

def on_send_error(excp):
    print(f'Error publishing: {excp}')

def kafka_producer(topic='shoppers', batch_size=1000, max_rows=10000):
    test_df = spark.read.parquet(f"{base_path}-ml/test_table.parquet").toPandas()
    
    if max_rows > 0 and len(test_df) > max_rows:
        test_df = test_df.iloc[:max_rows]
    
    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda x: json.dumps(x).encode('utf-8'),
        api_version=(3, 9),
        batch_size=16384,
        linger_ms=10
    )
    
    for _, row in test_df.iterrows():
        json_data = row.to_dict()
        try:
            producer.send(topic, value=json_data).add_callback(on_send_success).add_errback(on_send_error)
            if _ % batch_size == 0:
                producer.flush()
                print(f'Sent batch of {_+1} rows')
        except Exception as e:
            print(f'Error sending row: {e}')
    
    producer.flush()
    producer.close()
    print("Producer finished.")

kafka_producer(batch_size=1000, max_rows=10000)

Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20072
Sent batch of 1 rows
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20073
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20074
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20075
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20076
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20077
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20078
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20079
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20080
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20081
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20082
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20083
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20084
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20085
Published to Kafka. Topic: shoppers, Partition: 0, Offset: 20086
Publ

In [11]:
def process_stream(spark, brokers='localhost:9092', topic='shoppers'):
    try:
        # Define schema
        schema = StructType([
            StructField("id", StringType(), True),
            StructField("offer", StringType(), True),
            StructField("total_transactions", IntegerType(), True),
            StructField("total_spent", DoubleType(), True),
            StructField("avg_spent", DoubleType(), True),
            StructField("unique_categories", IntegerType(), True),
            StructField("unique_products", IntegerType(), True)
        ])
        
        # Read streaming data
        df_stream = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", brokers) \
            .option("subscribe", topic) \
            .option("startingOffsets", "earliest") \
            .load() \
            .selectExpr("CAST(value AS STRING)") \
            .select(from_json(col("value"), schema).alias("data")) \
            .select("data.*") \
            .na.fill(0, ["total_transactions", "total_spent", "avg_spent", "unique_categories", "unique_products"])
        
        # Assemble features
        feature_cols = [
            "total_transactions", "total_spent", "avg_spent", "unique_categories", "unique_products"
        ]
        assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol="features",
            handleInvalid="skip"
        )
        df_transformed = assembler.transform(df_stream)
        
        # Apply ML model
        model_path = f"{base_path}-ml/model-B/rf/model_rf"
        best_model = PipelineModel.load(model_path)
        predictions_stream = best_model.transform(df_transformed)
        
        # Write to console
        query = predictions_stream.select(
            col("id"),
            col("probability").cast("string").alias("repeater_probability"),
            when(col("prediction") == 1, "Likely to repeat").otherwise("Unlikely to repeat").alias("repeat_likelihood")
        ).writeStream \
            .outputMode("append") \
            .format("console") \
            .trigger(processingTime='10 seconds') \
            .option("truncate", False) \
            .start()
        
        # Write to CSV
        query_file = predictions_stream.select(
            col("id"),
            col("probability").cast("string").alias("repeater_probability"),
            when(col("prediction") == 1, "Likely to repeat").otherwise("Unlikely to repeat").alias("repeat_likelihood")
        ).writeStream \
            .outputMode("append") \
            .format("csv") \
            .option("path", f"{base_path}-ml/submission_stream") \
            .option("checkpointLocation", f"{base_path}-ml/checkpoint") \
            .option("header", True) \
            .trigger(processingTime='10 seconds') \
            .start()
        
        return query, query_file
    except Exception as e:
        print(f"Stream failed: {e}")
        raise

query, query_file = process_stream(spark)

Stream failed: An error occurred while calling o514.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/jovyan/code/data-ml/model-B/rf/model_rf/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.api.java.JavaRDDLike.partitions(Ja

Py4JJavaError: An error occurred while calling o514.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/jovyan/code/data-ml/model-B/rf/model_rf/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Input path does not exist: file:/home/jovyan/code/data-ml/model-B/rf/model_rf/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more


In [None]:
def show_streaming_status(spark, query, query_file):
    import time
    sleeptime = 5
    maxiterations = 10
    for i in range(maxiterations):
        time.sleep(sleeptime)
        print(f'Iteration {i+1}/{maxiterations} - Streaming Status:')
        print(f'Query Active: {query.isActive}')
        print(f'Query Status: {query.status}')
        print(f'File Query Active: {query_file.isActive}')
        print(f'File Status: {query_file.status}')
    query.stop()
    query_file.stop()
    print("Streaming stopped.")

show_streaming_status(spark, query, query_file)

NameError: name 'query' is not defined

# 2. DATA INGESTION

In [None]:
# # Data to read - offers.csv
# data_dir_offers = f'{base_path}/offers.csv.gz'
# data_file_offers = data_dir_offers

# ! head $data_file_offers

# # Data to read - sampleSubmission.csv
# data_dir_sampleSubmission = f'{base_path}/sampleSubmission.csv.gz'
# data_file_sampleSubmission = data_dir_sampleSubmission

# ! head $data_file_sampleSubmission

# # Data to read - testHistory.csv
# data_dir_testHistory = f'{base_path}/testHistory.csv.gz'
# data_file_testHistory = data_dir_testHistory

# ! head $data_file_testHistory

# # Data to read - trainHistory.csv
# data_dir_trainHistory = f'{base_path}/trainHistory.csv.gz'
# data_file_trainHistory = data_dir_trainHistory

# ! head $data_file_trainHistory

# # Data to read - transactions.csv
# data_dir_transactions = f'{base_path}/transactions.csv.gz'
# data_file_transactions = data_dir_transactions

# ! head $data_file_transactions;

�}ESoffers.csv ���n�0��=Q�����u�0`K� ���������Y)�^__��|��|?�����p���\������x���=�|{�������fW�q.g�brjm��Q7� )��u�X�t)XG&fa�O�$j9�(���׶#A��䜘�Bw��
,d�2�3;�b4[&� ̞E��f�l<�Ȉ���D�"��#u���V҃��Y�@���ʎ��W;XKU�C�6� ���5j�T���g��̛�ܮA�U=@AV0',%�W��M�=Sp!��^�����9�{�h�Ȭ��o�F#F�|l�W�Yխ�X���#
�H�l*j#����A�ø�8f���Br��Eh�%C�-��r���	��+���+C���	RL�5B���r�V  head: cannot open 'data/sampleSubmission.csv.gz' for reading: No such file or directory
�.��testHistory.csv |�[�$5��YK!�r�c/qb�"���G�4��N��������?����?����o����������o�,MWǥ����e�����!H��C�fԃ�C��������	�C�!;/��"xk�oԅH�K�Z�n@�E�.� D� #��k��V}�QT�4^@߀
�+`��:� "׽E����B,��V��
| 4�����	PQ%��������^�ߧ��]Z�s��Kch�/Ľě��E_�p1���VG���Y9^�ᅐ��z| ��}�%\�NJ� ���(}�1T�=��t��_\�p�K�J��8 �|~��x�K
����i�@�T���مC���5PR�%	� 4���^9DzF|�n�ЦQk���ȯԤ]���]L����+���#Q^��TT�E�e����1����(���n�G:�Rp�u�8�IE�X~�0�����5��,���'䪷��#Ǚa����ն��Gζ*Y���|�ɾm+���C{���R���A!c- EĊT�4��

**Offers**

In [None]:
# # Reading data - offers.csv
# df_offers = spark.read.csv(
#         data_file_offers, 
#         header=True, sep=',', inferSchema=True
#     )

**TESTHISTORY**

In [None]:
# # Reading data - testHistory.csv
# df_testHistory = spark.read.csv(
#         data_file_testHistory, 
#         header=True, sep=',', inferSchema=True
#     )

**TRAINHISTORY**

In [None]:
# # Reading data - trainHistory.csv
# df_trainHistory = spark.read.csv(
#         data_file_trainHistory, 
#         header=True, sep=',', inferSchema=True
#     )

**TRANSACTIONS**

In [None]:
# df_transactions = spark.read.csv(
#     data_file_transactions, 
#     header=True, sep=',', inferSchema=False
# ).sample(fraction=0.001, seed=42).limit(1000000)

**KAFKA-CLUSTER-CREATE-TOPIC**

In [None]:
# from kafka.admin import KafkaAdminClient, NewTopic

# my_topic = 'shoppers'
# try:
#     admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
#     topic = NewTopic(name=my_topic, num_partitions=1, replication_factor=1)
#     admin.create_topics([topic])
#     print(f'DEBUG: Topic {my_topic} successfully created.')
# except Exception as e:
#     print(f'Error creating topic.\n{e}')

**KAFKA-CLUSTER-INFO**

In [None]:
# from kafka import KafkaAdminClient
# from kafka.cluster import ClusterMetadata

# try:
#     admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
#     print(f'List of topics in the cluster: {admin.list_topics()}')
#     print(f'List of consumer groups known to the cluster: {admin.list_consumer_groups()}')
# except Exception:
#     print(f'Error connecting to cluster')

# clusterMetadata = ClusterMetadata(bootstrap_servers=['localhost:9092'])
# print(f'All brokers metadata: {clusterMetadata.brokers()}')
# print(f'Partitions for topic shoppers: {clusterMetadata.partitions_for_topic("shoppers")}')
# print(f'Topics: {clusterMetadata.topics()}')

**KAFKA-PRODUCER**

In [None]:
# import os, time, json, datetime, csv
# import pandas as pd
# from kafka import KafkaProducer

# topic = 'shoppers'
# inputfile = './data/validated_testHistory.csv'  # Use test data for streaming
# chunksize = 10000
# sleeptime = 0.6

# def datetime_converter(dt):
#     if isinstance(dt, datetime.datetime):
#         return dt.__str__()

# def on_send_success(metadata):
#     print(f'Published to Topic: {metadata.topic}, Partition: {metadata.partition}, Offset: {metadata.offset}')

# def on_send_error(excp):
#     print(f'Error: {excp}')

# kafka_producer = KafkaProducer(
#     bootstrap_servers='localhost:9092',
#     api_version=(3, 9),
#     value_serializer=lambda x: json.dumps(x).encode('utf-8')
# )

# # Load and join test data
# test_df = pd.read_csv(inputfile)
# offers_df = pd.read_csv('./data/validated_offers.csv')
# transaction_agg = pd.read_parquet('./data/transaction_features.parquet')
# df_joined = test_df.merge(offers_df, on='offer', how='left').merge(transaction_agg, on='id', how='left')

# for _, row in df_joined.iterrows():
#     json_data = json.dumps(row.to_dict(), default=datetime_converter)
#     print(f'Sending to Kafka: {json_data}')
#     try:
#         kafka_producer.send(topic, value=json_data).add_callback(on_send_success).add_errback(on_send_error)
#     except Exception as e:
#         print(f'Error: {e}')
#     time.sleep(sleeptime)

# kafka_producer.flush()
# kafka_producer.close()

**KAFKA-CONSUMER**

In [None]:
# from kafka import KafkaConsumer

# topic = 'shoppers'
# kafka_consumer = KafkaConsumer(
#     bootstrap_servers='localhost:9092',
#     api_version=(3, 9),
#     auto_offset_reset='earliest',
#     enable_auto_commit=False
# )

# kafka_consumer.subscribe([topic])
# print(f"Listening on topic: {topic}")

# for msg in kafka_consumer:
#     print(f"Received: [{msg.topic}:{msg.partition}:{msg.offset}] value={msg.value.decode('utf-8')}")

**APP_SPARK**

In [None]:
# import time

# # Importing critical functions that deal with data stream
# from data_streaming import ( spark_initialize, data_stream_spark, 
#             show_status, show_tables, show_sink_table, get_table_dataframe )

# brokers = 'localhost:9092'
# topic = 'books-amazon'
# table = 'bookstable'

# # Showing results of data stream processing
# def show_spark_results(spark, table):
#     df = get_table_dataframe(spark, table)
#     # cols_interest = ['timestamp','Asin','Group','Format','Title','Author','Publisher']
    
#     sleeptime = 0.8
#     maxiterations = 30
#     top_authors = 20
#     top_publishers = 20

#     # Iterative update
#     for i in range(maxiterations):
#         time.sleep(sleeptime)
#         print(f'Processing...  Iteration {i} with in-between delay of {sleeptime} second(s)')
#         print(f'Number of records processed so far: {df.count()}.')

#         #df.select(cols_interest).show(truncate=False)
#         #df.show(5, truncate=False)
#         #show_sink_table(spark, table)

#         print('Aggregated information as it stands (top 20):')
#         df.groupBy('Author').count().orderBy('count', ascending=False).limit(top_authors).show(truncate=False)
#         df.groupBy('Publisher').count().orderBy('count', ascending=False).limit(top_publishers).show(truncate=False)
#         #df.groupBy('Group', 'Format').count().show(truncate=False)
        

# # Execution

# spark = spark_initialize()
# query = data_stream_spark(spark, brokers, topic, table)

# show_status(spark, query)
# show_tables(spark)
# show_sink_table(spark, table)
# show_spark_results(spark, table)


**APP_STREAMLIT**

In [None]:
# import streamlit as st
# import time

# # Importing critical functions that deal with data stream (Spark/Kafka side)
# from data_streaming import ( spark_initialize, data_stream_spark, 
#                 show_tables, show_status, get_table_dataframe )

# # Caching the function that will access the running 
# # Spark/Kafka data query (a DataFrame)
# @st.cache_resource
# def get_data():
#     return get_table_dataframe(st.session_state.spark, st.session_state.table)

# # Showing results of data stream processing, 
# # as long as there is a SparkSession running
# def results():

#     if 'spark' not in st.session_state:
#         return
    
#     status_text = st.empty()
#     progress_bar = st.progress(0)
#     placeholder = st.empty()
#     sleeptime = 0.8
#     maxiterations = 30
#     top_authors = 20
#     top_publishers = 20

#     # Iterative update
#     for i in range(maxiterations):
#         time.sleep(sleeptime)
#         # getting data at this point in time
#         df = get_data()
#         count = df.count()
#         status_text.warning(f'Processing...  Iteration {i} with in-between delay of {sleeptime} second(s). Messages/records processed so far: {count}.')
#         cols1 = ['Author'] 
#         df_author = df.groupBy(cols1).count().orderBy('count', ascending=False).limit(top_authors).toPandas()
#         cols2 = ['Publisher']
#         df_publisher = df.groupBy(cols2).count().orderBy('count', ascending=False).limit(top_publishers).toPandas()
#         print(df_publisher)

#         with placeholder.container():

#             # Each chart in one column, so two columns required
#             fig_col1, fig_col2 = st.columns(2)
#             with fig_col1:
#                 st.markdown('### Author')
#                 st.markdown(f'**Counting of books by author - Top {top_authors}**')
#                 st.bar_chart(data=df_author, y='count', x=cols1[0], horizontal=True)
#             with fig_col2:
#                 st.markdown('### Publisher')
#                 st.markdown(f'**Counting of books by publisher - Top {top_publishers}**')
#                 st.bar_chart(data=df_publisher, y='count', x=cols2[0], horizontal=True)

#             # Show the related dataframes
#             st.markdown('### Detailed tables view')
#             st.markdown('**Author**')
#             st.dataframe(df_author)
#             st.markdown('**Publisher**')
#             st.dataframe(df_publisher)
    
#         progress_bar.progress(i)
  
#     progress_bar.empty()
#     status_text.success(f'Final results are shown after processing {count} messages/records.')

# # Page to hold results
# def page_results():
#     st.empty()
#     st.header(':one: Data stream processing')
#     st.subheader('Results')
#     results()
    
# # Page to hold information about the app
# def page_about():
#     st.empty()
#     st.header(':two: About')
#     st.subheader('Lab class handout #6')
#     st.write('Data streaming with Apache Spark and Apache Kafka')
#     st.badge('Streamlit version', icon='ℹ️', color='blue')
    
# # Entry point
# def main():
    
#     # Page config
#     st.set_page_config(
#         page_title = 'Books data streaming',
#         initial_sidebar_state = 'expanded',
#         layout = 'wide'
#     )
#     # App title
#     st.title('Books data streaming')
#     st.divider()
#     with st.sidebar:
#         st.empty()
#         st.header('Algoritmos para Big Data')

#     brokers = 'localhost:9092'
#     topic = 'books-amazon'
#     table = 'bookstable'

#     # As code is running everytime the user interacts with, 
#     # we must make sure that the spark side only starts once

#     if 'spark' not in st.session_state:
#         spark = spark_initialize()
#         query = data_stream_spark(spark, brokers, topic, table)
#         st.session_state.spark = spark
#         st.session_state.table = table
#         # just to check in the terminal
#         show_status(spark, query)
#         show_tables(spark)
        
#     pages = [ st.Page(page_results, title='Results'),
#               st.Page(page_about, title='About'),
#             ]
#     pg = st.navigation(pages)
#     pg.run()

# # Execution
# if __name__ == "__main__":
#     main()


**DATA_STREAMING**

In [None]:
# import os, sys, time, json
# import pyspark
# from pyspark.sql import SparkSession, DataFrame
# import pyspark.sql.functions as F
# import pyspark.sql.types as T

# def spark_initialize() -> SparkSession:
#     scala_version = '2.12'
#     spark_version = '3.3.1'
#     packages = [
#         f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
#         f'org.apache.spark:spark-token-provider-kafka-0-10_{scala_version}:{spark_version}',
#         f'org.apache.spark:spark-streaming-kafka-0-10_{scala_version}:{spark_version}',
#         'org.apache.kafka:kafka-clients:3.3.1',
#         'org.apache.commons:commons-pool2:2.8.0'
#     ]
#     spark = SparkSession.builder\
#         .appName('Streaming')\
#         .config('spark.jars.packages', ','.join(packages))\
#         .getOrCreate()
#     spark.sparkContext.setLogLevel("ERROR")
#     return spark

# def data_stream_spark(spark, brokers, topic, table) -> DataFrame:
#     df = spark.readStream \
#         .format("kafka") \
#         .option("kafka.bootstrap.servers", brokers) \
#         .option("subscribe", topic) \
#         .option("includeHeaders", "true") \
#         .option("startingOffsets", "earliest") \
#         .load()

#     spark.sql(f'drop table if exists {table}')
#     query = df.writeStream \
#         .queryName(f'{table}') \
#         .outputMode("append") \
#         .format("memory") \
#         .start()
#     return query

# def show_status(spark, query):
#     print(f'Active: {spark.streams.active[0].isActive}.')
#     print(f'Status: {query.status}.')

# def show_tables(spark):
#     spark.sql("show tables").show(truncate=False)

# def show_sink_table(spark, table):
#     spark.sql(f'select * from {table}').show(truncate=False)

# def get_table_dataframe(spark, table):
#     df_kafka = spark.sql(f'select CAST(value AS STRING), topic, timestamp from {table}')
#     schema = T.StructType([
#         T.StructField("id", T.StringType(), True),
#         T.StructField("offer", T.StringType(), True),
#         T.StructField("category", T.StringType(), True),
#         T.StructField("quantity", T.DoubleType(), True),
#         T.StructField("avg_purchase", T.DoubleType(), True),
#         T.StructField("transaction_count", T.DoubleType(), True),
#         T.StructField("unique_categories", T.DoubleType(), True)
#     ])
#     df_kafka = df_kafka.withColumn('jsonvalue', F.from_json(F.col('value'), schema)) \
#         .select(
#             F.col('jsonvalue.id').alias('id'),
#             F.col('jsonvalue.offer').alias('offer'),
#             F.col('jsonvalue.category').alias('category'),
#             F.col('jsonvalue.quantity').alias('quantity'),
#             F.col('jsonvalue.avg_purchase').alias('avg_purchase'),
#             F.col('jsonvalue.transaction_count').alias('transaction_count'),
#             F.col('jsonvalue.unique_categories').alias('unique_categories'),
#             F.col('timestamp')
#         )
#     return df_kafka