In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum, expr, lit
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("CryptoPulseNative") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "com.clickhouse.spark:clickhouse-spark-runtime-3.5_2.12:0.10.0") \
    .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") \
    .config("spark.sql.catalog.clickhouse.host", "localhost") \
    .config("spark.sql.catalog.clickhouse.port", "8123") \
    .config("spark.sql.catalog.clickhouse.user", "default") \
    .config("spark.sql.catalog.clickhouse.password", "") \
    .config("spark.sql.catalog.clickhouse.database", "default") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

26/02/20 14:38:32 WARN Utils: Your hostname, Amans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.29.9 instead (on interface en0)
26/02/20 14:38:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/amansatya/.ivy2/cache
The jars for the packages stored in: /Users/amansatya/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.clickhouse.spark#clickhouse-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-26e6f07d-f574-4511-95d1-dc68ba694ea2;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/amansatya/bigdata-tools/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found com.clickhouse.spark#clickhouse-spark-runtime-3.5_2.12;0.10.0 in central
:: resolution report :: resolve 202ms :: artifacts dl 6ms
	:: modules in use:
	com.clickhouse.spark#clickhouse-spark-runtime-3.5_2.12;0.10.0 from central in [default]
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central 

In [2]:
schema = StructType([
    StructField("symbol", StringType()),
    StructField("price", DoubleType()),
    StructField("quantity", DoubleType()),
    StructField("timestamp", TimestampType()),
    StructField("side", StringType())
])

In [3]:
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:19092") \
    .option("subscribe", "trades_btc") \
    .option("startingOffsets", "latest") \
    .load()

In [4]:
trades = raw_stream.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("trade_value", col("price") * col("quantity"))

In [5]:
# ... (Previous trade and aggregation code)

pulse_analytics = trades \
    .withWatermark("timestamp", "2 seconds") \
    .groupBy(window(col("timestamp"), "1 second")) \
    .agg(
        (sum("trade_value") / sum("quantity")).alias("vwap"),
        sum(expr("CASE WHEN side = 'BUY' THEN quantity ELSE -quantity END")).alias("net_delta"),
        expr("sum(CASE WHEN side = 'BUY' THEN quantity ELSE 0 END) / sum(quantity) * 100").alias("buy_pressure_pct"),
        sum("quantity").alias("total_volume"),
        expr("count(*)").alias("trade_count")
    ) \
    .select(
        # Convert UTC Window Start to IST
        F.from_utc_timestamp(col("window.start"), "Asia/Kolkata").alias("window_start"),
        # Convert UTC Window End to IST
        F.from_utc_timestamp(col("window.end"), "Asia/Kolkata").alias("window_end"),
        col("vwap"),
        col("net_delta"),
        col("buy_pressure_pct"),
        col("total_volume"),
        col("trade_count")
    )

# The Sink remains the same, as it will now detect the new column automatically


In [6]:
def write_to_clickhouse_native(df, epoch_id):
    
    df.write \
          .format("clickhouse") \
          .option("host", "localhost") \
          .option("port", "8123") \
          .option("database", "default") \
          .option("table", "pulse_metrics") \
          .option("user", "default") \
          .option("password", "") \
          .mode("append") \
          .save()

# Update your query to use this sink
query = pulse_analytics.writeStream \
    .foreachBatch(write_to_clickhouse_native) \
    .outputMode("update") \
    .trigger(processingTime='0 seconds') \
    .start()

26/02/20 14:38:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/cf/vwq978qs05zcc0dt_n0lyss40000gn/T/temporary-7f441366-d92e-4d94-9375-ce002140abca. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
26/02/20 14:38:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


26/02/20 14:38:35 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
