In [1]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

class raw_kafka_to_kudu:
    def __init__(self, sparkMaster, appName, params):
        # base params
        self.appName = appName
        self.sparkMaster = sparkMaster
        
        # extra params
        self.kafkaBroker = params.get('kafkaBroker')
        self.kuduMaster = params.get('kuduMaster')
        
        # execute process
        self.ejecutar()
        
    def ejecutar(self):
        # 
        sparkConn = self.createSparkSession()
        
        # 
        self.avgPriceProcess(sparkConn)
        
        #
        self.pricesKlinesProcess(sparkConn)
        
        #       
        self.tickerProcess(sparkConn)
        
        # 
        self.tradesProcess(sparkConn)
    
    def avgPriceProcess(self, sparkConn):
        # 
        topicName = 'pairs_avg_price'
        raw_df = self.readFromKafka(sparkConn, topicName)
        
        # 
        outputFields, schema = self.schAvgPrices()

        # 
        json_df = raw_df.selectExpr("CAST(value AS STRING) as json") \
                .select(from_json(col("json"), schema).alias("data")) \
                .select("data.*")
        
        # 
        final_df = json_df.toDF(*outputFields)
        
        # 
        kuduTableName = 'impala::s_productos.k_pairs_avg_prices'
        (final_df.writeStream \
                .queryName(topicName) \
                .outputMode("append") \
                .foreachBatch(lambda batch_df, batch_id: self.writeToKudu(batch_df, batch_id, kuduTableName)) \
                .option("checkpointLocation", f"hdfs://namenode:9000/tmp/pyspark/checkpoints/{topicName}") \
                .start())
        
        # 
        # (query.awaitTermination())
    
    def pricesKlinesProcess(self, sparkConn):
        # 
        topicName = 'pairs_prices_klines'
        raw_df = self.readFromKafka(sparkConn, topicName)
        
        # 
        first_column_names, final_column_names, schema = self.schPricesKlines()

        # 
        json_df = raw_df.selectExpr("CAST(value AS STRING) as json") \
                .select(from_json(col("json"), schema).alias("data")) \
                .select("data.*")
                
        # Renombrar las columnas usando toDF
        renamed_df = json_df.toDF(*first_column_names)
        
        # 
        df_flattened = renamed_df.select(
            "event_type",
            "event_timestamp",
            "currency",
            "kline.*"
        )

        # 
        final_df = df_flattened.toDF(*final_column_names)
        
        # 
        kuduTableName = 'impala::s_productos.k_klines'
        (final_df.writeStream \
                .queryName(topicName) \
                .outputMode("append") \
                .foreachBatch(lambda batch_df, batch_id: self.writeToKudu(batch_df, batch_id, kuduTableName)) \
                .option("checkpointLocation", f"hdfs://namenode:9000/tmp/pyspark/checkpoints/{topicName}") \
                .start())
        
        # 
        # (query.awaitTermination())
    
    def tickerProcess(self, sparkConn):
        # 
        topicName = 'pairs_ticker'
        raw_df = self.readFromKafka(sparkConn, topicName)
        
        # 
        outputFields, schema = self.sch24hrTicker()

        # 
        json_df = raw_df.selectExpr("CAST(value AS STRING) as json") \
                .select(from_json(col("json"), schema).alias("data")) \
                .select("data.*")
        
        # 
        final_df = json_df.toDF(*outputFields)
        
        # 
        kuduTableName = 'impala::s_productos.k_ticker'
        (final_df.writeStream \
                .queryName(topicName) \
                .outputMode("append") \
                .foreachBatch(lambda batch_df, batch_id: self.writeToKudu(batch_df, batch_id, kuduTableName)) \
                .option("checkpointLocation", f"hdfs://namenode:9000/tmp/pyspark/checkpoints/{topicName}") \
                .start())
        
        # 
        # (query.awaitTermination())
    
    def tradesProcess(self, sparkConn):
        # 
        topicName = 'pairs_trades'
        raw_df = self.readFromKafka(sparkConn, topicName)
        
        # 
        outputFields, schema = self.schTrades()

        # 
        json_df = raw_df.selectExpr("CAST(value AS STRING) as json") \
                .select(from_json(col("json"), schema).alias("data")) \
                .select("data.*")
        
        # 
        final_df = json_df.toDF(*outputFields)
        
        # 
        kuduTableName = 'impala::s_productos.k_trades'
        (final_df.writeStream \
                .queryName(topicName) \
                .outputMode("append") \
                .foreachBatch(lambda batch_df, batch_id: self.writeToKudu(batch_df, batch_id, kuduTableName)) \
                .option("checkpointLocation", f"hdfs://namenode:9000/tmp/pyspark/checkpoints/{topicName}") \
                .start())
        
        # 
        # (query.awaitTermination())
    
    def schAvgPrices(self):
        field_names = ["event_type", "event_timestamp", "currency", "time_interval", "avg_price", "time_stamp"]
        
        schema = StructType([
            StructField("e", StringType(), True),
            StructField("E", LongType(), True),
            StructField("s", StringType(), True),
            StructField("i", StringType(), True),
            StructField("w", StringType(), True),
            StructField("T", LongType(), True)
        ])
        
        return field_names, schema
    
    def schPricesKlines(self):
        start_field_names = ["event_type", "event_timestamp", "currency", "kline"]
        
        final_field_names = [
            "event_type", "event_timestamp", "currency", "kline_start_time", "kline_close_time",
            "kline_symbol", "time_interval", "kline_first_trade_id", "kline_last_trade_id",
            "kline_open_price", "kline_close_price", "kline_high_price", "kline_low_price",
            "kline_volume", "kline_trade_count", "kline_is_closed", "kline_quote_asset_volume",
            "kline_active_buy_volume", "kline_active_buy_quote_volume", "kline_ignore"
        ]
        
        schema = StructType([
            StructField("e", StringType(), True),
            StructField("E", LongType(), True),
            StructField("s", StringType(), True),
            StructField("k", StructType([
                StructField("t", LongType(), True),
                StructField("T", LongType(), True),
                StructField("s", StringType(), True),
                StructField("i", StringType(), True),
                StructField("f", LongType(), True),
                StructField("L", LongType(), True),
                StructField("o", StringType(), True),
                StructField("c", StringType(), True),
                StructField("h", StringType(), True),
                StructField("l", StringType(), True),
                StructField("v", StringType(), True),
                StructField("n", IntegerType(), True),
                StructField("x", BooleanType(), True),
                StructField("q", StringType(), True),
                StructField("V", StringType(), True),
                StructField("Q", StringType(), True),
                StructField("B", StringType(), True)
            ]))
        ])
        
        return start_field_names, final_field_names, schema
    
    def sch24hrTicker(self):
        field_names = [
            "event_type", "event_timestamp", "currency", "price_change", "price_change_percent",
            "weighted_avg_price", "last_price", "last_qty", "quote_qty",  # Agregamos quote_qty
            "bid_price", "bid_qty", "ask_price", "ask_qty", "open_price",
            "high_price", "low_price", "volume", "quote_volume", "open_time",
            "close_time", "first_trade_id", "last_trade_id", "trade_count"
        ]
        
        schema = StructType([
            StructField("e", StringType(), True),   # event_type
            StructField("E", LongType(), True),      # event_timestamp
            StructField("s", StringType(), True),     # currency
            StructField("p", StringType(), True),     # price_change
            StructField("P", StringType(), True),     # price_change_percent
            StructField("w", StringType(), True),     # weighted_avg_price
            StructField("x", StringType(), True),     # last_price
            StructField("c", StringType(), True),     # last_qty
            StructField("Q", StringType(), True),     # quote_qty
            StructField("b", StringType(), True),     # bid_price
            StructField("B", StringType(), True),     # bid_qty
            StructField("a", StringType(), True),     # ask_price
            StructField("A", StringType(), True),     # ask_qty
            StructField("o", StringType(), True),     # open_price
            StructField("h", StringType(), True),     # high_price
            StructField("l", StringType(), True),     # low_price
            StructField("v", StringType(), True),     # volume
            StructField("q", StringType(), True),     # quote_volume
            StructField("O", LongType(), True),       # open_time
            StructField("C", LongType(), True),       # close_time
            StructField("F", LongType(), True),       # first_trade_id
            StructField("L", LongType(), True),       # last_trade_id
            StructField("n", LongType(), True)        # trade_count
        ])
        
        return field_names, schema

    def schTrades(self):
        field_names = ["event_type", "event_timestamp", "currency", "trade_id", "price", "quantity", "trade_time", "is_market_maker", "is_maker"]

        schema = StructType([
            StructField("e", StringType(), True),    # event type
            StructField("E", LongType(), True),       # event timestamp
            StructField("s", StringType(), True),     # symbol
            StructField("t", LongType(), True),       # trade ID
            StructField("p", StringType(), True),     # price
            StructField("q", StringType(), True),     # quantity
            StructField("T", LongType(), True),       # trade time
            StructField("m", BooleanType(), True),    # is market maker
            StructField("M", BooleanType(), True)     # is maker
        ])

        return field_names, schema
    
    def createSparkSession(self):
        s_conn = None
        try:
            # 
            s_conn = SparkSession.builder \
                .appName(self.appName) \
                .master(self.sparkMaster) \
                .config('spark.executor.memory', '1g') \
                .config('spark.driver.memory', '1g') \
                .config("spark.executor.cores", "1") \
                .config("spark.cores.max", "3") \
                .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1,org.apache.kudu:kudu-spark3_2.12:1.17.0") \
                .getOrCreate()

            # 
            s_conn.sparkContext.setLogLevel("ERROR")
            logging.info("Spark connection created successfully!")
        except Exception as e:
            logging.error(f"Couldn't create the spark session due to exception {e}")

        return s_conn
    
    def readFromKafka(self, sparkConn, topicName):
        # Leer datos desde el tópico de Kafka
        df = sparkConn.readStream  \
            .format("kafka") \
            .option("kafka.bootstrap.servers", self.kafkaBroker) \
            .option("subscribe", topicName) \
            .option("maxOffsetsPerTrigger", 1000) \
            .option("startingOffsets", "earliest") \
            .option("failOnDataLoss", "false") \
            .load()
            
        return df
    
    def writeToKudu(self, batch_df, batch_id, kuduTableName):
        # 
        batch_df.write \
            .format("kudu") \
            .option("kudu.master", self.kuduMaster) \
            .option("kudu.table", kuduTableName) \
            .mode("append") \
            .save()

if __name__ == "__main__":
    # base params
    sparkMaster = 'spark://e5616452f8d5:7077'
    appName = 'KafkaToKuduRaw1'
    
    # extra params
    extraParams = None
    extraParams = {
            'kafkaBroker' : 'kafka-broker:29092',
            'kuduMaster' : 'kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251'
            }
    
    app = raw_kafka_to_kudu(sparkMaster, appName, params=extraParams)


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.apache.kudu#kudu-spark3_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a2871d72-0818-4f44-b855-09ca5a12eb14;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central


                                                                                

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

spark_master = "spark://e5616452f8d5:7077"

spark = SparkSession.builder \
        .appName("prueba_kafka") \
        .master(spark_master) \
        .config('spark.executor.memory', '1g') \
        .config('spark.driver.memory', '1g') \
        .config("spark.executor.cores", "1") \
        .config("spark.cores.max", "2") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1,org.apache.kudu:kudu-spark3_2.12:1.17.0") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print(spark.sparkContext.uiWebUrl)

In [2]:
def schAvgPrices():
    field_names = ["event_type", "event_timestamp", "currency", "time_interval", "avg_price", "time_stamp"]
    
    schema = StructType([
        StructField("e", StringType(), True),
        StructField("E", LongType(), True),
        StructField("s", StringType(), True),
        StructField("i", StringType(), True),
        StructField("w", StringType(), True),
        StructField("T", LongType(), True)
    ])
    
    return field_names, schema

def schPricesKlines():
    start_field_names = ["event_type", "event_timestamp", "currency", "kline"]
    
    final_field_names = [
        "event_type", "event_timestamp", "currency", "kline_start_time", "kline_close_time",
        "kline_symbol", "time_interval", "kline_first_trade_id", "kline_last_trade_id",
        "kline_open_price", "kline_close_price", "kline_high_price", "kline_low_price",
        "kline_volume", "kline_trade_count", "kline_is_closed", "kline_quote_asset_volume",
        "kline_active_buy_volume", "kline_active_buy_quote_volume", "kline_ignore"
    ]
    
    schema = StructType([
        StructField("e", StringType(), True),
        StructField("E", LongType(), True),
        StructField("s", StringType(), True),
        StructField("k", StructType([
            StructField("t", LongType(), True),
            StructField("T", LongType(), True),
            StructField("s", StringType(), True),
            StructField("i", StringType(), True),
            StructField("f", LongType(), True),
            StructField("L", LongType(), True),
            StructField("o", StringType(), True),
            StructField("c", StringType(), True),
            StructField("h", StringType(), True),
            StructField("l", StringType(), True),
            StructField("v", StringType(), True),
            StructField("n", IntegerType(), True),
            StructField("x", BooleanType(), True),
            StructField("q", StringType(), True),
            StructField("V", StringType(), True),
            StructField("Q", StringType(), True),
            StructField("B", StringType(), True)
        ]))
    ])
    
    return start_field_names, final_field_names, schema

def sch24hrTicker():
    field_names = [
        "event_type", "event_timestamp", "currency", "price_change", "price_change_percent",
        "weighted_avg_price", "last_price", "last_qty", "quote_qty",  # Agregamos quote_qty
        "bid_price", "bid_qty", "ask_price", "ask_qty", "open_price",
        "high_price", "low_price", "volume", "quote_volume", "open_time",
        "close_time", "first_trade_id", "last_trade_id", "trade_count"
    ]
    
    schema = StructType([
        StructField("e", StringType(), True),   # event_type
        StructField("E", LongType(), True),      # event_timestamp
        StructField("s", StringType(), True),     # currency
        StructField("p", StringType(), True),     # price_change
        StructField("P", StringType(), True),     # price_change_percent
        StructField("w", StringType(), True),     # weighted_avg_price
        StructField("x", StringType(), True),     # last_price
        StructField("c", StringType(), True),     # last_qty
        StructField("Q", StringType(), True),     # quote_qty
        StructField("b", StringType(), True),     # bid_price
        StructField("B", StringType(), True),     # bid_qty
        StructField("a", StringType(), True),     # ask_price
        StructField("A", StringType(), True),     # ask_qty
        StructField("o", StringType(), True),     # open_price
        StructField("h", StringType(), True),     # high_price
        StructField("l", StringType(), True),     # low_price
        StructField("v", StringType(), True),     # volume
        StructField("q", StringType(), True),     # quote_volume
        StructField("O", LongType(), True),       # open_time
        StructField("C", LongType(), True),       # close_time
        StructField("F", LongType(), True),       # first_trade_id
        StructField("L", LongType(), True),       # last_trade_id
        StructField("n", LongType(), True)        # trade_count
    ])
    
    return field_names, schema

def schTrades():
    field_names = ["event_type", "event_timestamp", "currency", "trade_id", "price", "quantity", "trade_time", "is_market_maker", "is_maker"]

    schema = StructType([
        StructField("e", StringType(), True),    # event type
        StructField("E", LongType(), True),       # event timestamp
        StructField("s", StringType(), True),     # symbol
        StructField("t", LongType(), True),       # trade ID
        StructField("p", StringType(), True),     # price
        StructField("q", StringType(), True),     # quantity
        StructField("T", LongType(), True),       # trade time
        StructField("m", BooleanType(), True),    # is market maker
        StructField("M", BooleanType(), True)     # is maker
    ])

    return field_names, schema

In [3]:
# Topics
# Topic name
# ####
# pairs_avg_price -- OK 
# pairs_prices_klines -- OK
# pairs_ticker --OK
# pairs_trades -- OK

In [4]:
# Nombre del tópico de Kafka
kafka_topic = "pairs_ticker"

# Leer datos desde el tópico de Kafka
df = spark.readStream  \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker:29092") \
    .option("subscribe", kafka_topic) \
    .option("maxOffsetsPerTrigger", 300) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

In [5]:
final_column_names, schema = sch24hrTicker()

json_df = df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

# df_flattened = json_df.select(
#     "event_type",
#     "event_timestamp",
#     "currency",
#     "kline.*"   # Volumen base de transacción
# )

renamed_df = json_df.toDF(*final_column_names)


In [6]:
# renamed_df.show(10)

In [7]:
# Función para escribir en Kudu
def write_to_kudu(batch_df, batch_id):
    # Especificar la tabla Kudu a la que deseas insertar
    kudu_table = "impala::s_productos.k_ticker"
    kudu_masters = "kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251"
    
    batch_df.write \
        .format("kudu") \
        .option("kudu.master", kudu_masters) \
        .option("kudu.table", kudu_table) \
        .mode("append") \
        .save()

In [None]:
# Usar foreachBatch para insertar los datos en Kudu
query = renamed_df.writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_kudu) \
    .option("checkpointLocation", "hdfs://namenode:9000/tmp/pyspark/checkpoints/pairs_ticker") \
    .start()

In [None]:
# Esperar a que la consulta termine
(query.awaitTermination())

In [2]:
# spark.stop()