In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, to_json, struct,  expr
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

In [2]:
spark = SparkSession.builder. \
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0").getOrCreate()

In [3]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("driver_id", IntegerType(), True),
    StructField("source", StringType(), True),
    StructField("destination", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("time_traveled", DoubleType(), True),
    StructField("distance_traveled", DoubleType(), True),
    StructField("uber_black", IntegerType(), True),
    StructField("time_message", TimestampType(), True)
])

In [4]:
topic_read = "topic-ingestion"
ip_kafka = "192.168.1.6:9092"

In [5]:
df_rides = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", ip_kafka) \
  .option("subscribe", topic_read) \
  .load() \
  .select(from_json(col("value").cast("string"), schema).alias("data"))

In [6]:
df_rides.createOrReplaceTempView("df_rides")

In [17]:
df_driver = spark.read.parquet("/home/jovyan/work/lake/drivers")
df_driver.createOrReplaceTempView("df_driver")
df_driver.cache()
df_driver.count()

30055

In [18]:
df_driver.show(5)

+---+--------------------+---+----------------+----+
| id|                name|age|             car|rate|
+---+--------------------+---+----------------+----+
|  1|John Hernandez Me...| 42|Chevrolet Blazer|4.51|
|  2|Haley Patterson H...| 59|Volkswagen Atlas|4.48|
|  3|  Jill Burke Stewart| 40|    BMW 3 Series|4.53|
|  4|    Maria Lewis Bass| 21|Dodge Challenger|4.55|
|  5|  Austin Miles Drake| 36|    Nissan Versa|4.46|
+---+--------------------+---+----------------+----+
only showing top 5 rows



In [19]:
df_rides_01 = spark.sql("""

    select
        a.data.id as id,
        a.data.user_id as user_id,
        a.data.driver_id as driver_id,
        a.data.source as source,
        a.data.destination as destination,
        a.data.price as price,
        a.data.time_traveled as time_traveled,
        a.data.distance_traveled as distance_traveled,
        a.data.uber_black as uber_black,
        a.data.time_message as time_message,

        case when a.data.uber_black = 1 then round(a.data.price * 0.20 ,2)
            else round(a.data.price * 0.25,2) end as rate
        
    from
        df_rides a
        """)


df_rides_01.createOrReplaceTempView("df_rides_01")

In [20]:
df_rides_02 = spark.sql("""
    select
        a.id as id,
        b.name as driver_name,
        a.user_id,
        a.source as source,
        a.destination as destination,
        a.price as price,
        a.time_traveled as time_traveled,
        a.distance_traveled as distance_traveled,
        a.uber_black as uber_black,
        a.time_message as time_message,
        a.rate

    from
        df_rides_01 a
    left join
        df_driver b
    on (a.driver_id = b.id)

""")

df_rides_02.createOrReplaceTempView("df_rides_02")

In [21]:
topic_write = "topic-processed"

In [None]:
struct_col = expr("struct(id, driver_name, user_id, source, destination, price, time_traveled, distance_traveled, rate, uber_black, time_message) as data")

# Adicione uma coluna JSON contendo todas as informações
json_df = df_rides_02.select(to_json(struct_col).alias("value"))

query = (
    json_df
    .writeStream
    .outputMode("append")
    .format("kafka")
    .option("kafka.bootstrap.servers", ip_kafka) 
    .option("topic", topic_write) 
    .option("checkpointLocation", "/home/jovyan/work/lake/checkpoint")
    .start()
)

# Aguarde a conclusão do streaming
query.awaitTermination()

In [None]:
query = (
    df_rides_02
    .writeStream
    .outputMode("append")
    .format("console")
    .option("checkpointLocation", "checkpoint")  # Adicione esta linha
    .start()
)

# Aguarde a consulta ser encerrada
query.awaitTermination()

In [None]:
df_rides_schema = StructType([
  StructField("id", IntegerType()),
  StructField("travel_time", IntegerType()),
  StructField("distance", FloatType()),
  StructField("mode", StringType())
])



df_rides = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "192.168.1.6:9092") \
  .option("subscribe", "topic-data") \
  .load() \
  .select(from_json(col("value").cast("string"), df_rides_schema).alias("data"))

df_rides.createOrReplaceTempView("df_rides")

df_rides_01 = spark.sql("""

    select
        a.data.id as id,
        a.data.travel_time as travel_time,
        a.data.distance as distance,
        case when a.data.mode = 'UberX' then 1 else 0 end as dummy_uberx,
        case when a.data.mode = 'UberConfort' then 1 else 0 end as dummy_uberconfort
        
    from
        df_rides a
        """)

# query = df_rides_01.writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()

# # Aguarda o término da execução
# query.awaitTermination()

outputDF = df_rides_01.select(to_json(struct("*")).alias("value"))
#transformed_df = outputDF.selectExpr("value")
transformed_df = df_rides_01
# kafkaOptions = {
#   "kafka.bootstrap.servers": "192.168.1.6:9092",
#   "topic": "topic-data-for-model",
#   "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
# }

# outputDF.writeStream \
#     .format("kafka") \
#     .option("checkpointLocation", "/home/jovyan/work/checkpoint") \
#     .options(**kafkaOptions) \
#     .start().awaitTermination()



In [None]:
# query = (
#     outputDF
#     .writeStream
#     .outputMode("append")  # Pode ser "append", "complete" ou "update" dependendo do seu caso
#     .format("console")
#     .start()
# )

struct_col = expr("struct(id, travel_time, distance, dummy_uberx, dummy_uberconfort) as data")

# Adicione uma coluna JSON contendo todas as informações
json_df = df_rides_01.select(to_json(struct_col).alias("value"))

query = (
    json_df
    .writeStream
    .outputMode("append")  # Pode ser "append", "complete" ou "update" dependendo do seu caso
    .format("kafka")
    .option("kafka.bootstrap.servers", "192.168.1.6:9092")  # Endereço do servidor Kafka
    .option("topic", "topic-gold")  # Tópico Kafka de saída
    .option("checkpointLocation", "checkpoint")  # Diretório para armazenar checkpoints
    .start()
)

# Aguarde a conclusão do streaming
query.awaitTermination()

In [None]:
spark.version

In [None]:
outputDF.printSchema()

In [None]:
spark.sql("SELECT data.zone, SUM(data.time) as total_time FROM df_zone GROUP BY 1")

In [None]:
df_zone_02.printSchema()

In [None]:
df_rides_demand.withWatermark("timestamp", "1 minute")

In [None]:
transformed_df.printSchema()