In [1]:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.sql.Timestamp

//dummpy data for test
val someDF = Seq(("LOLL",287.33,17,2, Timestamp.valueOf("2020-06-29 15:26:48"))).toDF("t", "p","x","s","dt")
someDF.printSchema

Intitializing Scala interpreter ...

Spark Web UI available at http://Umer:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1593600087231)
SparkSession available as 'spark'


root
 |-- t: string (nullable = true)
 |-- p: double (nullable = false)
 |-- x: integer (nullable = false)
 |-- s: integer (nullable = false)
 |-- dt: timestamp (nullable = true)



import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.sql.Timestamp
someDF: org.apache.spark.sql.DataFrame = [t: string, p: double ... 3 more fields]


In [2]:
//reading data from spark and extracting required data
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "stockData")
  .load()
df.printSchema

val selectDF = df.select(get_json_object(($"value").cast("string"),"$.data.T").alias("ticker"),
                         get_json_object(($"value").cast("string"),"$.data.p").alias("price"),
                         get_json_object(($"value").cast("string"),"$.data.x").alias("exchange_id"),
                         get_json_object(($"value").cast("string"),"$.data.s").alias("trade_size"),
                         get_json_object(($"value").cast("string"),"$.data.t").alias("date_time"))
//drop all rows with null values
val temp = selectDF.na.drop()
// if null removes properly
if(temp.count == selectDF.count){
    println("error null rows in the dataset")
}
//function to re-affirm dataType
def checkData(df: DataFrame): DataFrame = {
    var tmpDf = df.schema("date_time").dataType match {
        case StringType => df.withColumn("date_time", ((col("date_time").cast("Long"))/1000000000).cast("timestamp"))
        case LongType => df.withColumn("date_time",(col("date_time")/1000000000).cast("timestamp"))
        case _ => df
    }
    tmpDf = tmpDf.schema("price").dataType match {
        case StringType => tmpDf.withColumn("price",col("price").cast("Double"))
        case _ => tmpDf
    }
    tmpDf = tmpDf.schema("exchange_id").dataType match {
        case StringType => tmpDf.withColumn("exchange_id",col("exchange_id").cast("Int"))
        case _ => tmpDf
    }
    tmpDf = tmpDf.schema("trade_size").dataType match {
        case StringType => tmpDf.withColumn("trade_size",col("trade_size").cast("Int"))
        case _ => tmpDf
    }
    return tmpDf.toDF
}

val temp2 = checkData(temp)
temp2.printSchema

//testing with dummy data
assert(someDF.schema("dt").dataType == temp2.schema("date_time").dataType)
assert(someDF.schema("t").dataType == temp2.schema("ticker").dataType)
assert(someDF.schema("p").dataType == temp2.schema("price").dataType)
assert(someDF.schema("x").dataType == temp2.schema("exchange_id").dataType)
assert(someDF.schema("s").dataType == temp2.schema("trade_size").dataType)

temp2.sort(desc("date_time")).show

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- ticker: string (nullable = true)
 |-- price: double (nullable = true)
 |-- exchange_id: integer (nullable = true)
 |-- trade_size: integer (nullable = true)
 |-- date_time: timestamp (nullable = true)

+------+--------+-----------+----------+--------------------+
|ticker|   price|exchange_id|trade_size|           date_time|
+------+--------+-----------+----------+--------------------+
|  MSFT|  201.25|          3|         9|2020-06-30 17:01:...|
|  TSLA| 1079.94|         15|        57|2020-06-30 17:01:...|
|  TSLA| 1080.17|         17|        98|2020-06-30 17:01:...|
|  TSLA| 1080.08|         17|         2|2020-06-30 17:01:...|
|  TSLA| 1079.76|         17|         2|2020-06-30 17:01:...|
|  TSLA| 

df: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
selectDF: org.apache.spark.sql.DataFrame = [ticker: string, price: string ... 3 more fields]
temp: org.apache.spark.sql.DataFrame = [ticker: string, price: string ... 3 more fields]
checkData: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
temp2: org.apache.spark.sql.DataFrame = [ticker: string, price: double ... 3 more fields]


In [3]:
val APPL = temp2.filter("ticker == 'AAPL'").sort(desc("date_time")).show

+------+-------+-----------+----------+--------------------+
|ticker|  price|exchange_id|trade_size|           date_time|
+------+-------+-----------+----------+--------------------+
|  AAPL|  365.5|          2|        92|2020-06-30 17:01:...|
|  AAPL|  365.5|          2|         8|2020-06-30 17:01:...|
|  AAPL| 364.83|          3|         6|2020-06-30 16:53:...|
|  AAPL|364.795|         15|         1|2020-06-30 16:53:...|
|  AAPL| 364.81|         15|       100|2020-06-30 16:53:...|
|  AAPL| 364.81|         15|       100|2020-06-30 16:53:...|
|  AAPL|364.735|         15|       165|2020-06-30 16:53:...|
|  AAPL|364.735|         15|       200|2020-06-30 16:53:...|
|  AAPL| 364.72|         15|         3|2020-06-30 16:53:...|
|  AAPL| 364.72|         15|        10|2020-06-30 16:53:...|
|  AAPL| 364.73|         15|         1|2020-06-30 16:53:...|
|  AAPL|364.755|         15|       100|2020-06-30 16:53:...|
|  AAPL|364.755|         15|       100|2020-06-30 16:53:...|
|  AAPL|364.755|        

APPL: Unit = ()


In [4]:
val TSLA = temp2.filter("ticker == 'TSLA'").sort(desc("date_time")).show

+------+--------+-----------+----------+--------------------+
|ticker|   price|exchange_id|trade_size|           date_time|
+------+--------+-----------+----------+--------------------+
|  TSLA| 1079.94|         15|        57|2020-06-30 17:01:...|
|  TSLA| 1080.08|         17|         2|2020-06-30 17:01:...|
|  TSLA| 1080.17|         17|        98|2020-06-30 17:01:...|
|  TSLA| 1079.76|         17|         2|2020-06-30 17:01:...|
|  TSLA| 1079.76|         17|         2|2020-06-30 17:01:...|
|  TSLA| 1079.77|         17|         2|2020-06-30 17:01:...|
|  TSLA|1080.365|         15|        20|2020-06-30 17:01:...|
|  TSLA|1080.365|         15|         1|2020-06-30 17:01:...|
|  TSLA| 1080.19|         17|         1|2020-06-30 17:01:...|
|  TSLA|1080.365|         15|        20|2020-06-30 17:01:...|
|  TSLA| 1080.55|         17|       100|2020-06-30 17:01:...|
|  TSLA| 1080.54|         17|        30|2020-06-30 17:01:...|
|  TSLA| 1080.54|         17|        70|2020-06-30 17:01:...|
|  TSLA|

TSLA: Unit = ()


In [5]:
val MSFT = temp2.filter("ticker == 'MSFT'").sort(desc("date_time")).show

+------+-------+-----------+----------+--------------------+
|ticker|  price|exchange_id|trade_size|           date_time|
+------+-------+-----------+----------+--------------------+
|  MSFT| 201.25|          3|         9|2020-06-30 17:01:...|
|  MSFT|201.325|         15|       100|2020-06-30 17:01:...|
|  MSFT|201.145|         15|        20|2020-06-30 16:53:...|
|  MSFT| 201.16|         15|         3|2020-06-30 16:53:...|
|  MSFT| 201.16|         15|       500|2020-06-30 16:53:...|
|  MSFT| 201.15|         15|       500|2020-06-30 16:53:...|
|  MSFT| 201.15|         15|       100|2020-06-30 16:53:...|
|  MSFT|201.115|         15|       150|2020-06-30 16:53:...|
|  MSFT|201.135|         15|      7100|2020-06-30 16:53:...|
|  MSFT| 201.16|         15|       100|2020-06-30 16:53:...|
|  MSFT|201.165|         15|         7|2020-06-30 16:53:...|
|  MSFT|201.185|         15|         2|2020-06-30 16:53:...|
|  MSFT|201.185|         15|        10|2020-06-30 16:53:...|
|  MSFT|  201.4|        

MSFT: Unit = ()
