In [1]:
from hops import hdfs
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import LongType
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1559150799864_0015,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7fe57285dfd0>

In [2]:
df = spark \
    .read \
    .format("parquet") \
    .load(hdfs.project_path() + "Resources/iot-benchmarks/data/topic-lwm2m-3303-temperature") \
    .filter(F.col('endpointClientName')=='node-latency-4-1')

df.cache().count()

1155

In [3]:
df.printSchema()

root
 |-- measurement: struct (nullable = true)
 |    |-- timestamp: long (nullable = false)
 |    |-- endpointClientName: string (nullable = false)
 |    |-- instanceId: integer (nullable = false)
 |    |-- gatewayId: integer (nullable = false)
 |    |-- ipsoObject: struct (nullable = false)
 |    |    |-- sensorValue: double (nullable = false)
 |    |    |-- minMeasuredValue: double (nullable = true)
 |    |    |-- maxMeasuredValue: double (nullable = true)
 |    |    |-- minRangeValue: double (nullable = true)
 |    |    |-- maxRangeValue: double (nullable = true)
 |    |    |-- sensorUnits: string (nullable = true)
 |    |    |-- resetMinAndMaxMeasuredValues: boolean (nullable = true)
 |-- kafkaTimestamp: timestamp (nullable = true)
 |-- endpointClientName: string (nullable = true)

In [4]:
timeFmt = "yyyy-MM-dd HH:mm:ss.SSS"
sensDf = df \
    .select(F.to_timestamp(F.col('measurement.timestamp')/1000).alias('measurementTmp'), \
            'kafkaTimestamp', \
            (F.col('measurement.timestamp')).alias('measurementTmpLong'), \
            'endpointClientName') \
    .withColumn('kafkaTmpLong', F.substring_index('kafkaTimestamp', '.', -1).cast('float')) \
    .withColumn('kafkaTmpLong', F.when(F.col('kafkaTmpLong') < 100, F.col('kafkaTmpLong')*10).otherwise(F.col('kafkaTmpLong')).cast('long')) \
    .withColumn('kafkaTmpLong', (F.unix_timestamp('kafkaTimestamp', format=timeFmt)*1000 + F.col('kafkaTmpLong'))) \
    .withColumn("diffMilliseconds", (F.col('kafkaTmpLong') - F.col('measurementTmpLong')))
    

In [12]:
sensDf.show(5, False)

+-----------------------+-----------------------+------------------+------------------+-------------+----------------+
|measurementTmp         |kafkaTimestamp         |measurementTmpLong|endpointClientName|kafkaTmpLong |diffMilliseconds|
+-----------------------+-----------------------+------------------+------------------+-------------+----------------+
|2019-05-31 14:15:59.818|2019-05-31 14:15:59.844|1559312159818     |node-latency-4-1  |1559312159844|26              |
|2019-05-31 14:16:01.818|2019-05-31 14:16:01.841|1559312161818     |node-latency-4-1  |1559312161841|23              |
|2019-05-31 14:16:03.821|2019-05-31 14:16:03.84 |1559312163821     |node-latency-4-1  |1559312163840|19              |
|2019-05-31 14:16:05.819|2019-05-31 14:16:05.84 |1559312165819     |node-latency-4-1  |1559312165840|21              |
|2019-05-31 14:16:07.818|2019-05-31 14:16:07.839|1559312167818     |node-latency-4-1  |1559312167839|21              |
+-----------------------+-----------------------

In [13]:
sensDf.printSchema()

root
 |-- measurementTmp: timestamp (nullable = true)
 |-- kafkaTimestamp: timestamp (nullable = true)
 |-- measurementTmpLong: long (nullable = true)
 |-- endpointClientName: string (nullable = true)
 |-- kafkaTmpLong: long (nullable = true)
 |-- diffMilliseconds: long (nullable = true)

In [14]:
sensDf.agg(F.avg(F.col("diffMilliseconds"))).show()

+---------------------+
|avg(diffMilliseconds)|
+---------------------+
|   12.146574154379879|
+---------------------+

In [8]:
sensDf.sort(F.col('diffMilliseconds').desc()).show(10, False)

+-----------------------+-----------------------+------------------+------------------+-------------+----------------+
|measurementTmp         |kafkaTimestamp         |measurementTmpLong|endpointClientName|kafkaTmpLong |diffMilliseconds|
+-----------------------+-----------------------+------------------+------------------+-------------+----------------+
|2019-05-31 14:15:55.837|2019-05-31 14:15:56.857|1559312155837     |node-latency-4-1  |1559312156857|1020            |
|2019-05-31 14:15:57.818|2019-05-31 14:15:58.07 |1559312157818     |node-latency-4-1  |1559312158070|252             |
|2019-05-31 14:36:37.819|2019-05-31 14:36:37.86 |1559313397819     |node-latency-4-1  |1559313397860|41              |
|2019-05-31 14:26:59.819|2019-05-31 14:26:59.857|1559312819819     |node-latency-4-1  |1559312819857|38              |
|2019-05-31 14:27:45.819|2019-05-31 14:27:45.855|1559312865819     |node-latency-4-1  |1559312865855|36              |
|2019-05-31 14:19:51.819|2019-05-31 14:19:51.854

In [9]:
sensDf = sensDf.where(F.col('diffMilliseconds') < 200)
sensDf.count()

1153

In [10]:
%%spark -o sensDf

In [11]:
%%local
sensDf





VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()