## Inicio

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from sqlalchemy import create_engine
import datetime as dt
import json
import ast

KAFKA_TOPIC_NAME = "test"
KAFKA_TOPIC_SINK_NAME = "sink"
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
# CHECKPOINT_LOCATION = "LOCAL DIRECTORY LOCATION (FOR DEBUGGING PURPOSES)"
CHECKPOINT_LOCATION = "/home/jovyan/work/tmp"


## Cria sessão do spark

In [15]:
scala_version = '2.12'
spark_version = '3.2.1'
kafka_version = '3.3.1'

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    f'org.apache.kafka:kafka-clients:{kafka_version}',
    'org.xerial:sqlite-jdbc:3.34.0'
]

spark = (
    SparkSession.builder.appName("PysparkKafkaStreaming")
    .master("local[*]")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
) 
spark.sparkContext.setLogLevel("ERROR")

## Leitura do banco de dados

In [16]:
df_read_db = spark.read.format('jdbc') \
        .options(driver='org.sqlite.JDBC', dbtable='tb_cliente',
                 url='jdbc:sqlite:/home/jovyan/work/database/database.db') \
        .load()

df_read_db.printSchema()

root
 |-- cod_cliente: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- idade: long (nullable = true)
 |-- gerente_conta: string (nullable = true)
 |-- conta_corrente: long (nullable = true)
 |-- tipo_conta: string (nullable = true)
 |-- score: long (nullable = true)



In [17]:
df_read_db.show()

+-----------+---------+-----+-------------+--------------+----------+-----+
|cod_cliente|     nome|idade|gerente_conta|conta_corrente|tipo_conta|score|
+-----------+---------+-----+-------------+--------------+----------+-----+
|          1|     Joao|   25|Fulano de tal|          1234|     Povao|   50|
|          2|    Maria|   30|Fulano de tal|          4321|    Chefao|   90|
|          3|    Pedro|   32|     Beltrano|          1212|     Ricao|   95|
|          4|Sebastiao|   44|     Beltrano|          3232|     Ricao|   92|
|          5|     Jose|   44|     Beltrano|          3232|     Povao|   30|
|          6|    Joana|   44|Fulano de tal|          3232|     Povao|   35|
+-----------+---------+-----+-------------+--------------+----------+-----+



## Definição da função

In [18]:
#def funcao_exemplo(df, batchID):
#    if df.count()>0:
#            df.join(df_read_db, on='cod_cliente', how='left') \
#            .select(to_json(struct(col("*"))).alias("value")) \
#            .write \
#            .format("kafka") \
#            .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
#            .option("topic", KAFKA_TOPIC_SINK_NAME) \
#           .save()
#   return df

def funcao_exemplo(df): #, batchID):
    if df.count()>0:
        df = df.join(df_read_db, on='cod_cliente', how='left') \
        .withColumn("oferta", when(df_read_db.score >= 90,"Investimento").otherwise("Emprestimo")) \
        .withColumn("saldo", when(df.tipo_op == "saque",df.saldo - df.valor_op)
                                .when(df.tipo_op == "deposito",df.saldo + df.valor_op)) \
        .select("cod_cliente","nome","idade","agencia","conta_corrente","tipo_conta","gerente_conta","score","oferta","saldo")
    return df

## Enriquecimento

In [19]:
message_data = [{'cod_cliente': 1, 'agencia': 27, 'valor_op': 100, 'tipo_op': 'saque', 'data': str(dt.date.today()), 'saldo': 666},
                {'cod_cliente': 2, 'agencia': 223, 'valor_op': 500, 'tipo_op': 'deposito', 'data': str(dt.date.today()), 'saldo': 5000},
                {'cod_cliente': 3, 'agencia': 100, 'valor_op': 500, 'tipo_op': 'deposito', 'data': str(dt.date.today()), 'saldo': 5000},
                {'cod_cliente': 4, 'agencia': 27, 'valor_op': 1000, 'tipo_op': 'saque', 'data': str(dt.date.today()), 'saldo': 10000},
                {'cod_cliente': 5, 'agencia': 105, 'valor_op': 300, 'tipo_op': 'deposito', 'data': str(dt.date.today()), 'saldo': 5000}]

_schema = (
    StructType()
    .add("cod_cliente", IntegerType())
    .add('agencia', IntegerType())
    .add('valor_op', IntegerType())
    .add('tipo_op', StringType())
    .add('data', StringType())
    .add('saldo', IntegerType())
)

df = spark.createDataFrame(message_data,_schema)

df2 = funcao_exemplo(df)

df2.show()

#df.show()

#df.printSchema()

+-----------+---------+-----+-------+--------------+----------+-------------+-----+------------+-----+
|cod_cliente|     nome|idade|agencia|conta_corrente|tipo_conta|gerente_conta|score|      oferta|saldo|
+-----------+---------+-----+-------+--------------+----------+-------------+-----+------------+-----+
|          1|     Joao|   25|     27|          1234|     Povao|Fulano de tal|   50|  Emprestimo|  566|
|          2|    Maria|   30|    223|          4321|    Chefao|Fulano de tal|   90|Investimento| 5500|
|          3|    Pedro|   32|    100|          1212|     Ricao|     Beltrano|   95|Investimento| 5500|
|          4|Sebastiao|   44|     27|          3232|     Ricao|     Beltrano|   92|Investimento| 9000|
|          5|     Jose|   44|    105|          3232|     Povao|     Beltrano|   30|  Emprestimo| 5300|
+-----------+---------+-----+-------+--------------+----------+-------------+-----+------------+-----+



## Leitura do kafka

In [None]:
df_kfk = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
    .option("subscribe", KAFKA_TOPIC_NAME)
    .option("startingOffsets", "latest")
    .load()
)

In [None]:
_schema = (
    StructType()
    .add("cod_cliente", IntegerType())
    .add('agencia', IntegerType())
    .add('valor_op', IntegerType())
    .add('tipo_op', StringType())
    .add('data', StringType())
    .add('saldo', IntegerType())
)

df_base = df_kfk.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)", "timestamp")
df_base = df_base.select(from_json(col("value"), _schema).alias("values"), "timestamp")
df_base = df_base.select("values.*")

## Escrita no topico sink

In [None]:
stream_final = df_base \
    .writeStream \  
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("topic", KAFKA_TOPIC_SINK_NAME) \
    .option("checkpointLocation", CHECKPOINT_LOCATION) \
    #.foreachBatch(funcao_exemplo) \
    .outputMode("append") \
    .start()

In [None]:
stream_final.status

In [None]:
teste = stream_final.exception()
print(teste)

In [None]:
teste = stream_final.stop()

In [None]:
stream_final.awaitTermination()