# Import das bibliotecas

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from random import randint

KAFKA_TOPIC_NAME = "test"
KAFKA_TOPIC_SINK_NAME = "sink"
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
# CHECKPOINT_LOCATION = "LOCAL DIRECTORY LOCATION (FOR DEBUGGING PURPOSES)"
CHECKPOINT_LOCATION = "/home/jovyan/work/tmp"


# Criação de sessão spark

In [2]:
scala_version = '2.12'
spark_version = '3.2.1'
kafka_version = '3.3.1'

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    f'org.apache.kafka:kafka-clients:{kafka_version}',
    'org.xerial:sqlite-jdbc:3.34.0'
]

spark = (
    SparkSession.builder.appName("PysparkKafkaStreaming")
    .master("local[*]")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
) 
spark.sparkContext.setLogLevel("ERROR")

# Criação connection com SQLite

In [3]:
from sqlalchemy import create_engine

engine = create_engine('sqlite:////home/jovyan/work/database/database.db', echo=False)


# Criação da tabela clientes

In [4]:
engine.execute('''drop table if exists tb_clientes''')

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f45c3f6b730>

In [7]:
engine.execute('''CREATE TABLE if not exists tb_clientes (
        codigo_cliente int null,
        nome varchar(500) null,
        idade int null,
        gerente_conta varchar(500) null,
        conta_corrente varchar(500) null,
        tipo_conta varchar(500) null,
        score int null
    )''')


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f45c3f6a050>

# Função para gerar dados

In [8]:
def gen_data(qtd_data):
    datas = []
    for i in range(qtd_data):
        datas.append([
            i,
            f'Nome {i}',
            randint(0, 100),
            f'Gerente {i}',
            f'{i}0{i}00{i}000{i}',
            ['Chefão', 'Ricão', 'Povão'][randint(0,2)],
            randint(0, 1000)
        ])
    return datas


In [9]:
# giving column names of dataframe
columns = ["codigo_cliente", "nome", "idade", "gerente_conta", "conta_corrente", "tipo_conta", "score"]
  
# creating a dataframe
dataframe_to_save = spark.createDataFrame(gen_data(10), columns)
  
# show data frame
dataframe_to_save.show()

+--------------+------+-----+-------------+--------------+----------+-----+
|codigo_cliente|  nome|idade|gerente_conta|conta_corrente|tipo_conta|score|
+--------------+------+-----+-------------+--------------+----------+-----+
|             0|Nome 0|   20|    Gerente 0|    0000000000|     Ricão|  232|
|             1|Nome 1|   57|    Gerente 1|    1010010001|     Povão|  911|
|             2|Nome 2|   45|    Gerente 2|    2020020002|    Chefão|  674|
|             3|Nome 3|   56|    Gerente 3|    3030030003|     Povão|  547|
|             4|Nome 4|   56|    Gerente 4|    4040040004|    Chefão|  674|
|             5|Nome 5|   11|    Gerente 5|    5050050005|    Chefão|  914|
|             6|Nome 6|   44|    Gerente 6|    6060060006|     Povão|  643|
|             7|Nome 7|    7|    Gerente 7|    7070070007|     Ricão|  894|
|             8|Nome 8|   28|    Gerente 8|    8080080008|     Povão|  691|
|             9|Nome 9|    5|    Gerente 9|    9090090009|     Povão|  707|
+-----------

# Gravando no SQLite

In [10]:
dataframe_to_save.write.format('jdbc') \
        .mode("overwrite") \
        .options(driver='org.sqlite.JDBC', dbtable='tb_clientes',
                 url='jdbc:sqlite:/home/jovyan/work/database/database.db') \
        .save()


# Lendo dados do SQLite

In [12]:
dataframe_read_from_sqlite = spark.read.format('jdbc') \
        .options(driver='org.sqlite.JDBC', dbtable='tb_clientes',
                 url='jdbc:sqlite:/home/jovyan/work/database/database.db') \
        .load()

In [13]:
dataframe_read_from_sqlite.show()

+--------------+------+-----+-------------+--------------+----------+-----+
|codigo_cliente|  nome|idade|gerente_conta|conta_corrente|tipo_conta|score|
+--------------+------+-----+-------------+--------------+----------+-----+
|             0|Nome 0|   20|    Gerente 0|    0000000000|     Ricão|  232|
|             1|Nome 1|   57|    Gerente 1|    1010010001|     Povão|  911|
|             6|Nome 6|   44|    Gerente 6|    6060060006|     Povão|  643|
|             7|Nome 7|    7|    Gerente 7|    7070070007|     Ricão|  894|
|             8|Nome 8|   28|    Gerente 8|    8080080008|     Povão|  691|
|             9|Nome 9|    5|    Gerente 9|    9090090009|     Povão|  707|
|             2|Nome 2|   45|    Gerente 2|    2020020002|    Chefão|  674|
|             3|Nome 3|   56|    Gerente 3|    3030030003|     Povão|  547|
|             4|Nome 4|   56|    Gerente 4|    4040040004|    Chefão|  674|
|             5|Nome 5|   11|    Gerente 5|    5050050005|    Chefão|  914|
+-----------

# Função resposável para enriquecer os dados de cliente e publicar no tópico sink

In [14]:
def join_cliente_detalhes(df, batchID):
    if df.count()>0:
            df.join(dataframe_read_from_sqlite, on='codigo_cliente', how='left') \
            .select(to_json(struct(col("*"))).alias("value")) \
            .write \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
            .option("topic", KAFKA_TOPIC_SINK_NAME) \
            .save()
    return df

# Consumindo o que esta dentro do tópico test

In [15]:
df_kfk = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
    .option("subscribe", KAFKA_TOPIC_NAME)
    .option("startingOffsets", "latest")
    .load()
)


# Definindo schema da mensagem do tópico test

In [16]:
_schema = (
    StructType()
    .add("codigo_cliente", StringType())
    .add('agencia', StringType())
    .add('valor_operacao', LongType())
    .add('tipo_operacao', StringType())
    .add('data', StringType())
    .add('saldo_conta', LongType())
)

df_base = df_kfk.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)", "timestamp")
df_base = df_base.select(from_json(col("value"), _schema).alias("values"), "timestamp")
df_base = df_base.select("values.*")

# Iniciando consumer kafka do tópico test

In [17]:

stream_final = df_base \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("topic", KAFKA_TOPIC_SINK_NAME) \
    .option("checkpointLocation", CHECKPOINT_LOCATION) \
    .foreachBatch(join_cliente_detalhes) \
    .outputMode("append") \
    .start()