In [1]:
%AddDeps org.apache.spark spark-sql-kafka-0-10_2.11 2.4.5 --transitive

Marking org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 for download
Obtained 12 files


Download the connector. Note the _--transitive_ flag- it is necessary in order to download Kafka utils etc

In [2]:
import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._

## Spark Context

In [3]:
val spark = SparkSession.builder().appName("MessageProcessor")
            .master("spark://spark:7077").getOrCreate()

spark = org.apache.spark.sql.SparkSession@7a546bb9


org.apache.spark.sql.SparkSession@7a546bb9

## Obtain dataframes from Kafka

In [4]:
val dataframe = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka1:29092")
    .option("subscribe", "Spectra")
    .option("startingOffsets", "earliest")
    .load()

dataframe = [key: binary, value: binary ... 5 more fields]


[key: binary, value: binary ... 5 more fields]

In [5]:
dataframe.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [20]:
val output = dataframe
                    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
                    .as[(String, String)]
                    .writeStream
                    .outputMode("append")
                    .format("memory")
                    .queryName("spectra")
                    .start()

output = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1658cbf


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1658cbf

## Transformacje Dataframe

In [21]:
val spectra = spark.sql("select * from spectra")

spectra = [key: string, value: string]


[key: string, value: string]

Odczytuję schemat kolumny z wartościami. Jest on w JSON, ale zamienię go na osobne kolumny.

In [29]:
def explode_JSON(table: DataFrame, column_name: String): DataFrame = {
    val JSONschema: StructType = spark.read.json(table.select(column_name).as[String]).schema
    table.withColumn("JSON", from_json(col(column_name), JSONschema)).select("JSON.*")
}

explode_JSON: (table: org.apache.spark.sql.DataFrame, column_name: String)org.apache.spark.sql.DataFrame


Kolumna z kluczem nie jest potrzebna- służy ona do oznaczania wiadomości podczas przesyłu przez Kafkę i nie stanowi części danych.

In [30]:
val transformedDF = explode_JSON(spectra, "value").select("dataRow.*")

transformedDF = [continuum: string, dec: string ... 12 more fields]


[continuum: string, dec: string ... 12 more fields]

In [31]:
transformedDF.dtypes

Array((continuum,StringType), (dec,StringType), (fiber,StringType), (mjd,StringType), (name,StringType), (plate,StringType), (ra,StringType), (size,StringType), (spectraSetOID,StringType), (spectrum,StringType), (subtype,StringType), (type,StringType), (z,StringType), (zerr,StringType))

In [37]:
transformedDF.select("continuum", "dec", "fiber", "mjd", "name", "plate", "ra", "size", "spectraSetOID").show()

+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+
|           continuum|        dec|fiber|  mjd|                name|plate|       ra|size|       spectraSetOID|
+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+
|[1.36860629060082...|-0.98491332|    3|51630|SDSS J094736.55-0...|  266|146.90229|3819|ObjectId(5cdd4ad3...|
|                null|       null| null| null|                null| null|     null|null|                    |
+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+



In [38]:
transformedDF.select("spectrum", "subtype", "type", "z", "zerr").show()

+--------------------+--------------------+------+------------------+--------------------+
|            spectrum|             subtype|  type|                 z|                zerr|
+--------------------+--------------------+------+------------------+--------------------+
|[-3.1290216943920...|BROADLINE        ...|QSO   |0.6524170637130737|8.855115447659045...|
|                null|                null|  null|              null|                null|
+--------------------+--------------------+------+------------------+--------------------+



Kolumny ze wszystkimi wartościami nullowymi na pewno są błędem w przesyłaniu danych.

In [48]:
val droppedDF = transformedDF.na.drop("all", transformedDF.drop("spectraSetOID").columns)
droppedDF.show()

+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+--------------------+--------------------+------+------------------+--------------------+
|           continuum|        dec|fiber|  mjd|                name|plate|       ra|size|       spectraSetOID|            spectrum|             subtype|  type|                 z|                zerr|
+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+--------------------+--------------------+------+------------------+--------------------+
|[1.36860629060082...|-0.98491332|    3|51630|SDSS J094736.55-0...|  266|146.90229|3819|ObjectId(5cdd4ad3...|[-3.1290216943920...|BROADLINE        ...|QSO   |0.6524170637130737|8.855115447659045...|
+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+--------------------+--------------------+------+------------------+--------------------+



droppedDF = [continuum: string, dec: string ... 12 more fields]


[continuum: string, dec: string ... 12 more fields]

Wszystkie typy są reprezentowane jako łańcuchy- niektóre kolumny trzeba przekonwertować do liczb.

In [49]:
droppedDF.dtypes

Array((continuum,StringType), (dec,StringType), (fiber,StringType), (mjd,StringType), (name,StringType), (plate,StringType), (ra,StringType), (size,StringType), (spectraSetOID,StringType), (spectrum,StringType), (subtype,StringType), (type,StringType), (z,StringType), (zerr,StringType))

In [58]:
val stringToArray = udf((b: String) => b.substring(1, b.length()-1).split(","))

stringToArray = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(StringType)))


UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(StringType)))

In [53]:
val trimString = udf((b: String) => b.trim())

trimString = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

In [54]:
val castDF = droppedDF
                .withColumn("continuum", stringToArray(col("continuum")).cast(ArrayType(DoubleType, true)))
                .withColumn("dec", col("dec").cast(DoubleType))
                .withColumn("fiber", col("fiber").cast(LongType))
                .withColumn("mjd", col("mjd").cast(LongType))
                .withColumn("plate", col("plate").cast(LongType))
                .withColumn("ra", col("ra").cast(DoubleType))
                .withColumn("size", col("size").cast(LongType))
                .withColumn("spectrum", stringToArray(col("spectrum")).cast(ArrayType(DoubleType, true)))
                .withColumn("subtype", trimString(col("subtype")))
                .withColumn("type", trimString(col("type")))
                .withColumn("z", col("z").cast(DoubleType))
                .withColumn("zerr", col("zerr").cast(DoubleType))

castDF = [continuum: array<double>, dec: double ... 12 more fields]


[continuum: array<double>, dec: double ... 12 more fields]

In [55]:
castDF.show()

+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+--------------------+---------+----+------------------+--------------------+
|           continuum|        dec|fiber|  mjd|                name|plate|       ra|size|       spectraSetOID|            spectrum|  subtype|type|                 z|                zerr|
+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+--------------------+---------+----+------------------+--------------------+
|[1.36860629060082...|-0.98491332|    3|51630|SDSS J094736.55-0...|  266|146.90229|3819|ObjectId(5cdd4ad3...|[-3.1290216943920...|BROADLINE| QSO|0.6524170637130737|8.855115447659045E-5|
+--------------------+-----------+-----+-----+--------------------+-----+---------+----+--------------------+--------------------+---------+----+------------------+--------------------+



In [56]:
castDF.dtypes

Array((continuum,ArrayType(DoubleType,true)), (dec,DoubleType), (fiber,LongType), (mjd,LongType), (name,StringType), (plate,LongType), (ra,DoubleType), (size,LongType), (spectraSetOID,StringType), (spectrum,ArrayType(DoubleType,true)), (subtype,StringType), (type,StringType), (z,DoubleType), (zerr,DoubleType))

Takie dane będą już gotowe do analizy. Aby ten proces mógłbyć na bieżąco stosowany dla wszystkich przychodzących danych, dobrze będzie go umieścić w skrypcie.

In [19]:
output.stop()