In [2]:
#Importar as bibliotecas

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [3]:
sp = SparkSession.builder \
    .appName("spark") \
    .config("spark.jars.packages",
           "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,"
           "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .config("spark.master", "local[*]") \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/myuser/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/myuser/.ivy2/cache
The jars for the packages stored in: /home/myuser/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3817edfd-c28a-40fb-a593-ba3b4ff1df06;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
lz4#lz4-java;1.8.0 in central0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central

	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in centralund org.apache.sp

In [4]:
# Criar o dataframe do tipo stream, apontando para o servidor kafka e o tópico a ser consumido.
df = (sp.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "spark-master:9092")
        .option("subscribe", "topico-mongo.spark-streaming.atletas")
        .option("startingOffsets", "earliest") 
        .load()
)

In [5]:
# Carregando colunas selecionadas de .CSV para MongoDB
from pyspark.sql.types import *

print("--- INICIANDO: Carga de Dados em Lote (Batch) com colunas selecionadas ---")

athletes_schema = StructType([
    StructField("code", IntegerType(), True),
    StructField("current", BooleanType(), True),
    StructField("name", StringType(), True),
    StructField("name_short", StringType(), True),
    StructField("name_tv", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("function", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("country_long", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("nationality_long", StringType(), True),
    StructField("nationality_code", StringType(), True),
    StructField("height", DoubleType(), True),
    StructField("weight", DoubleType(), True),
    StructField("disciplines", StringType(), True),
    StructField("events", StringType(), True),
    StructField("birth_date", StringType(), True),
    StructField("birth_place", StringType(), True),
    StructField("birth_country", StringType(), True),
    StructField("residence_place", StringType(), True),
    StructField("residence_country", StringType(), True),
    StructField("nickname", StringType(), True),
    StructField("hobbies", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("education", StringType(), True),
    StructField("family", StringType(), True),
    StructField("lang", StringType(), True),
    StructField("coach", StringType(), True),
    StructField("reason", StringType(), True),
    StructField("hero", StringType(), True),
    StructField("influence", StringType(), True),
    StructField("philosophy", StringType(), True),
    StructField("sporting_relatives", StringType(), True),
    StructField("ritual", StringType(), True),
    StructField("other_sports", StringType(), True)
])

# 2. Ler o arquivo .csv do HDFS com o schema completo
df_completo = sp.read \
    .format("csv") \
    .option("header", "true") \
    .schema(athletes_schema) \
    .load("hdfs://spark-master:9000/user/myuser/data/athletes.csv")

df_selecionado = df_completo.select(
    "name",
    "nickname",
    "nationality",
    "disciplines",
    "height",
    "weight",
    "birth_date",
    "birth_place",
    "birth_country",
    "residence_place",
    "residence_country",
)

# 4. (Opcional) Mostrar os dados do novo DataFrame para verificação
print("Dados com colunas selecionadas:")
df_selecionado.show(10, truncate=False)

--- INICIANDO: Carga de Dados em Lote (Batch) com colunas selecionadas ---
Dados com colunas selecionadas:


                                                                                

+------------------------+----------+-----------+-------------+------+------+-----------------------------------+-----------+-------------+---------------+-----------------+
|name                    |nickname  |nationality|disciplines  |height|weight|birth_date                         |birth_place|birth_country|residence_place|residence_country|
+------------------------+----------+-----------+-------------+------+------+-----------------------------------+-----------+-------------+---------------+-----------------+
|ALEKSANYAN Artur        |White Bear|Armenia    |['Wrestling']|0.0   |0.0   |1991-10-21                         |GYUMRI     |Armenia      |GYUMRI         |Armenia          |
|AMOYAN Malkhas          |NULL      |Armenia    |['Wrestling']|0.0   |0.0   |1999-01-22                         |YEREVAN    |Armenia      |YEREVAN        |Armenia          |
|GALSTYAN Slavik         |NULL      |Armenia    |['Wrestling']|0.0   |0.0   |1996-12-21                         |NULL       |NULL 

In [6]:
import csv
from pymongo import MongoClient
client = MongoClient("mongodb://mongo:27017")
db = client["spark-streaming"]
collection = db["atletas"]

collection.delete_many({})

with open("athletes.csv", newline='', encoding="utf-8") as f:
    reader = csv.DictReader(f, delimiter=',')
    documentos = list(reader)
    collection.insert_many(documentos)

print(f"{len(documentos)} documentos inseridos com sucesso no MongoDB")

11113 documentos inseridos com sucesso no MongoDB


## Inserindo VIA JSON

In [18]:
schema_json = StructType([
    StructField("name", StringType(), True),
    StructField("nickname", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("disciplines", StringType(), True),
    StructField("height", DoubleType(), True),
    StructField("weight", DoubleType(), True),
    StructField("birth_date", StringType(), True),
    StructField("birth_place", StringType(), True),
    StructField("birth_country", StringType(), True),
    StructField("residence_place", StringType(), True),
    StructField("residence_country", StringType(), True)
])

df_json = sp.read \
    .format("json") \
    .option("multiLine", True) \
    .schema(schema_json) \
    .load("hdfs://spark-master:9000/user/myuser/data/atletas_ficticios_30.json")


print("Adicionando dados do JSON na coleção 'atletas'...")
df_json.show(5)
df_json.printSchema()

Adicionando dados do JSON na coleção 'atletas'...
+-----------------+------------+-----------+-------------+------+------+----------+--------------+-------------+---------------+-----------------+
|             name|    nickname|nationality|  disciplines|height|weight|birth_date|   birth_place|birth_country|residence_place|residence_country|
+-----------------+------------+-----------+-------------+------+------+----------+--------------+-------------+---------------+-----------------+
|      ALINE SILVA|  Aline Silv|     BRAZIL|     HANDBALL|  1.62| 55.47|1992-02-07|       GOIANIA|       BRAZIL|         MANAUS|           BRAZIL|
|    MARIANA SILVA|Mariana Silv|     BRAZIL|   BASKETBALL|  1.96|100.49|1997-06-04|  PORTO ALEGRE|       BRAZIL| RIO DE JANEIRO|           BRAZIL|
|   CAMILA MARTINS|    Camila M|     BRAZIL|   VOLLEYBALL|  1.67| 90.29|1993-02-04|        RECIFE|       BRAZIL|   PORTO ALEGRE|           BRAZIL|
|CARLOS NASCIMENTO|    Carlos N|     BRAZIL|WEIGHTLIFTING|  1.68| 55

In [23]:
import json
#Salvando no MongoDB
json_data = df_json.toJSON().collect()
batch_size = 100
for i in range(0, len(json_data), batch_size):
    batch = [json.loads(doc) for doc in json_data[i:i+batch_size]]
    collection.insert_many(batch)

print(f"Dados inseridos! Total: {len(json_data)} registros")

Dados inseridos! Total: 30 registros
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-------+-------------+----------+-------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+-----------+------+----------+------------+-------------+---------------+-----------------+------------+-------+----------+---------+------+----+-----+------+----+---------+----------+------------------+------+------------+
|code|current|name         |name_short|name_tv|gender|function|country_code|country|country_long|nationality|nationality_long|nationality_code|height|weight|disciplines|events|birth_date|birth_place |birth_country|residence_place|residence_country|nickname    |hobbies|occupation|education|family|lang|coach|reason|hero|influence|philosophy|sporting_relatives|ritual|other_sports|
+----+-------+-------------+----------+-------+------+--------+------------+-------+------------+-------

In [7]:
# ==============================================================================
# PARTE 2: PROCESSAMENTO EM TEMPO REAL (STREAMING - Kafka para MongoDB)
# ==============================================================================

print("\n--- INICIANDO PARTE 2: Processamento em Tempo Real (Streaming) ---")

# Ler o stream do tópico Kafka que recebe as alterações do Debezium
df_kafka = sp.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "spark-master:9092") \
    .option("subscribe", "topico-mongo.spark-streaming.atletas") \
    .option("startingOffsets", "latest") \
    .load()



--- INICIANDO PARTE 2: Processamento em Tempo Real (Streaming) ---


In [8]:
# Schema para a mensagem completa do Debezium (conforme o .ipynb)
debezium_schema = StructType([
    StructField("payload", StructType([
        StructField("after", StringType(), True),
        StructField("op", StringType(), True),
    ]), True)
])

# Transformar os dados do Kafka
dx = df_kafka.select(from_json(df_kafka.value.cast("string"), debezium_schema).alias("data")) \
       .select("data.payload.after") \
       .withColumn("athlete_data", from_json(col("after"), athletes_schema)) \
       .select("athlete_data.*")



25/07/20 20:19:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [9]:
# Escrever o stream processado na coleção de DESTINO no MongoDB.
print("Aguardando novos dados do Kafka para processar e salvar na coleção de destino...")
ds = dx.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

25/07/20 20:19:33 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fcc78760-3677-4e8e-a20e-a101336f1d93. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


Aguardando novos dados do Kafka para processar e salvar na coleção de destino...


25/07/20 20:19:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/07/20 20:19:35 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                r, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 1:>                                                          (0 + 1) / 1]r, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+----+-------+---------------------+--------------+---------------------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+---------------------------------+-------------------------------------------------------------+----------+--------------------+-------------+---------------+-----------------+-----------------------+------------------------------------------------------------+--------------------+---------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------+----------------------+------------------------------------------------------------------+-------------------------------------------------------------------------------------

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-------+--------------------------+--------------+--------------------------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-----------------+-------------+-------------------+-----------------+------------+------------------------------------------------------------------+----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------