In [3]:
!pip install pyspark==3.4.0


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [17]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
import datetime
import os

ModuleNotFoundError: No module named 'kafka.gerador_menssagens_kafka'

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
import datetime
import os

class StreamingPipeline:

    def __init__(self):
        self.kafka_bootstrap_servers = 'localhost:9092'
        self.kafka_topic = 'topic_streaming_data'
        self.output_path = os.getcwd().split('stream_pipeline')[0] + "stream_pipeline/output/topic_streaming_data/data_eletronicos_kpi"
        self.checkpoint_path = os.getcwd().split('stream_pipeline')[0] + "stream_pipeline/checkpoint_path/topic_streaming_data/data_eletronicos_kpi"

    def get_spark_session(self):
        
        os.environ["JAVA_HOME"] = "/usr/local/opt/openjdk@11"

        spark = SparkSession.builder \
                .config("spark.driver.host", "localhost") \
                .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
                .appName("StreamingPipeline") \
                .master("local[2]") \
                .getOrCreate()
        return spark

    def read_kafka(self, spark):

        streaming_df = spark.readStream \
                            .format("kafka") \
                            .option("kafka.bootstrap.servers", self.kafka_bootstrap_servers) \
                            .option("subscribe", self.kafka_topic) \
                            .option("startingOffsets", "earliest") \
                            .option("failOnDataLoss", "false") \
                            .load()
        
        json_schema = StructType([
                        StructField('vendedor', StructType([
                            StructField('nome', StringType()),
                            StructField('codigo', StringType()),
                            StructField('filial', StringType())
                        ])),
                        StructField('filial', StructType([
                            StructField('nome', StringType()),
                            StructField('estado', StringType()),
                            StructField('cidade', StringType())
                        ])),
                        StructField('produto', StructType([
                            StructField('codigo', StringType()),
                            StructField('nome', StringType()),
                            StructField('preço', DoubleType()),
                            StructField('categoria', StringType()),
                            StructField('cor', StringType())
                        ])),
                        StructField('quantidade', IntegerType()),
                        StructField('ordem_id', IntegerType()),
                        StructField('timestamp', StringType())
                    ])
        
        streaming_df = streaming_df.withColumn("value", F.decode(F.col("value"), 'UTF-8'))
        streaming_df = streaming_df.select(F.from_json(F.col('value').cast('string'), json_schema).alias('data'),'timestamp')
        streaming_df = streaming_df.withColumn("timestamp_venda", F.from_unixtime("data.timestamp"))

        filter_df = streaming_df.filter(F.col("data.produto.categoria") == 'Eletrônicos')\
                                .select(
                                    'data.produto.nome',
                                    'data.produto.preço',
                                    'data.quantidade',
                                    'data.filial.nome',
                                    'data.filial.cidade',
                                    'data.ordem_id',
                                    'timestamp'
                                )

        return filter_df

    def calcular_metricas(self, df):
        aggregated_df = df.withWatermark("timestamp", "10 minutes")\
                        .groupBy(F.window(F.col("timestamp"), "10 minutes"),"cidade").agg(F.sum( 
                                                                F.col("preço")*F.col("quantidade")
                                                                ).alias("total"),
                                                                F.sum("quantidade").alias("quantidade_vendida"),
                            )

        aggregated_df = aggregated_df.withColumn("valor_medio_de_venda", F.col("total")/F.col("quantidade_vendida"))
        return aggregated_df
    
    def save_data(df, self):
        df = df.withColumn("year", F.year(F.col("window.start")))
        df = df.withColumn("month",F.month(F.col("window.start")))
        df = df.withColumn("day", F.dayofmonth(F.col("window.start")))
        df = df.withColumn("hour", F.hour(F.col("window.start")))

        query = df.writeStream \
                            .outputMode("append") \
                            .format("parquet") \
                            .option("path", self.output_path) \
                            .option("checkpointLocation", self.checkpoint_path) \
                            .partitionBy("year", "month", "day", "hour") \
                            .trigger(processingTime='10 minute') \
                            .start()

    def run(self):
        spark = self.get_spark_session()
        while True:
            df = self.read_kafka(spark, self)
            df = self.calcular_metricas(df)
            self.save_data(df)

In [3]:
os.environ["JAVA_HOME"] = "/usr/local/opt/openjdk@11"

In [4]:
spark = SparkSession.builder \
    .config("spark.driver.host", "localhost") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .appName("StreamingPipeline") \
    .master("local[2]") \
    .getOrCreate()

:: loading settings :: url = jar:file:/Users/luiz.oliveira/projects/n_projects/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/luiz.oliveira/.ivy2/cache
The jars for the packages stored in: /Users/luiz.oliveira/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-37d90c30-c27c-4af9-b5f1-b4f166492c20;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 466ms :: artifa

In [5]:
kafka_bootstrap_servers = 'localhost:9092'
kafka_topic = 'topic_streaming_data'

In [6]:
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

In [7]:
streaming_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
json_schema = StructType([
    StructField('vendedor', StructType([
        StructField('nome', StringType()),
        StructField('codigo', StringType()),
        StructField('filial', StringType())
    ])),
    StructField('filial', StructType([
        StructField('nome', StringType()),
        StructField('estado', StringType()),
        StructField('cidade', StringType())
    ])),
    StructField('produto', StructType([
        StructField('codigo', StringType()),
        StructField('nome', StringType()),
        StructField('preço', DoubleType()),
        StructField('categoria', StringType()),
        StructField('cor', StringType())
    ])),
    StructField('quantidade', IntegerType()),
    StructField('ordem_id', IntegerType()),
    StructField('timestamp', StringType())
])

In [9]:
streaming_df = streaming_df.withColumn("value", F.decode(F.col("value"), 'UTF-8'))
streaming_df = streaming_df.select(F.from_json(F.col('value').cast('string'), json_schema).alias('data'),'timestamp')
streaming_df = streaming_df.withColumn("timestamp_venda", F.from_unixtime("data.timestamp"))

In [10]:
streaming_df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- vendedor: struct (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |    |-- codigo: string (nullable = true)
 |    |    |-- filial: string (nullable = true)
 |    |-- filial: struct (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |-- produto: struct (nullable = true)
 |    |    |-- codigo: string (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |    |-- preço: double (nullable = true)
 |    |    |-- categoria: string (nullable = true)
 |    |    |-- cor: string (nullable = true)
 |    |-- quantidade: integer (nullable = true)
 |    |-- ordem_id: integer (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestamp_venda: string (nullable = true)



In [11]:
filtered_df = streaming_df.filter(F.col("data.produto.categoria") == 'Eletrônicos')\
                            .select(
                                'data.produto.nome',
                                'data.produto.preço',
                                'data.quantidade',
                                'data.filial.nome',
                                'data.filial.cidade',
                                'data.ordem_id',
                                'timestamp'
                            )

In [12]:
filtered_df.printSchema()

root
 |-- nome: string (nullable = true)
 |-- preço: double (nullable = true)
 |-- quantidade: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- ordem_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [13]:
aggregated_df = filtered_df.withWatermark("timestamp", "10 minutes")\
                           .groupBy(F.window(F.col("timestamp"), "10 minutes"),"cidade").agg(F.sum( 
                                                            F.col("preço")*F.col("quantidade")
                                                            ).alias("total"),
                                                            F.sum("quantidade").alias("quantidade_vendida"),
                           )

aggregated_df = aggregated_df.withColumn("valor_medio_de_venda", F.col("total")/F.col("quantidade_vendida"))

In [14]:
aggregated_df.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- cidade: string (nullable = true)
 |-- total: double (nullable = true)
 |-- quantidade_vendida: long (nullable = true)
 |-- valor_medio_de_venda: double (nullable = true)



In [15]:
aggregated_df = aggregated_df.withColumn("year", F.year(F.col("window.start")))
aggregated_df = aggregated_df.withColumn("month",F.month(F.col("window.start")))
aggregated_df = aggregated_df.withColumn("day", F.dayofmonth(F.col("window.start")))
aggregated_df = aggregated_df.withColumn("hour", F.hour(F.col("window.start")))

In [16]:
output_path = os.getcwd().split('stream_pipeline')[0] + "stream_pipeline/output/topic_streaming_data/data_29"
checkpoint_path = os.getcwd().split('stream_pipeline')[0] + "stream_pipeline/checkpoint_path/topic_streaming_data/data_29"

In [None]:
query = aggregated_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", output_path) \
    .option("checkpointLocation", checkpoint_path) \
    .partitionBy("year", "month", "day", "hour") \
    .trigger(processingTime='1 minute') \
    .start()

24/01/04 16:26:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/01/04 16:26:05 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

In [119]:
print(query.status)

{'message': 'Waiting for next trigger', 'isDataAvailable': True, 'isTriggerActive': False}


In [30]:
os.getcwd()#.split('stream_pipeline')[0] + "stream_pipeline/output"

'/Users/luiz.oliveira/eng_data/stream_pipeline/pipeline'

In [121]:
df = spark.read.format('delta').parquet('/Users/luiz.oliveira/eng_data/stream_pipeline/output/topic_streaming_data/data_28')

In [35]:
df = df.withColumn("value_2", F.decode(F.col("value"), 'UTF-8'))

In [122]:
df.show(truncate=False)

+------------------------------------------+--------------+--------+----+-----+---+----+
|window                                    |cidade        |total   |year|month|day|hour|
+------------------------------------------+--------------+--------+----+-----+---+----+
|{2024-01-04 15:30:00, 2024-01-04 15:40:00}|Rio de Janeiro|57000.0 |2024|1    |4  |15  |
|{2024-01-04 15:30:00, 2024-01-04 15:40:00}|Belo Horizonte|56500.0 |2024|1    |4  |15  |
|{2024-01-04 15:20:00, 2024-01-04 15:30:00}|Rio de Janeiro|32500.0 |2024|1    |4  |15  |
|{2024-01-04 15:20:00, 2024-01-04 15:30:00}|Belo Horizonte|45500.0 |2024|1    |4  |15  |
|{2024-01-04 15:50:00, 2024-01-04 16:00:00}|Belo Horizonte|53000.0 |2024|1    |4  |15  |
|{2024-01-04 15:50:00, 2024-01-04 16:00:00}|Florianópolis |64500.0 |2024|1    |4  |15  |
|{2024-01-04 15:20:00, 2024-01-04 15:30:00}|Florianópolis |35500.0 |2024|1    |4  |15  |
|{2024-01-04 15:50:00, 2024-01-04 16:00:00}|Rio de Janeiro|74500.0 |2024|1    |4  |15  |
|{2024-01-04 16:00:00

In [62]:
df.select("data.vendedor.nome").show()

+--------+
|    nome|
+--------+
|    José|
|  Carlos|
|   Pedro|
|   Maria|
|Fernanda|
|    José|
|    João|
|   Pedro|
| Mariana|
| Mariana|
|     Ana|
|   Pedro|
|    João|
|    José|
|   Paulo|
|    João|
|   Pedro|
|     Ana|
| Juliana|
|    João|
+--------+
only showing top 20 rows

