### Create a dataframe on raw_metrics table

In [1]:
df = sqlContext.read\
   .format("org.apache.spark.sql.cassandra")\
   .options(keyspace = "metrics", table = "raw_metrics")\
   .load()	

### Explain the query plan and view some data

In [2]:
df.printSchema()

root
 |-- device_id: string (nullable = true)
 |-- metric_time: timestamp (nullable = true)
 |-- metric_name: string (nullable = true)
 |-- metric_value: decimal(38,18) (nullable = true)



In [3]:
df.explain()

== Physical Plan ==
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@636b6ef0[device_id#0,metric_time#1,metric_name#2,metric_value#3]


In [4]:
df.show()

+---------+--------------------+-----------+--------------------+
|device_id|         metric_time|metric_name|        metric_value|
+---------+--------------------+-----------+--------------------+
|        6|2016-04-05 18:35:...|        KwH|1.766843615010000000|
|        6|2016-04-05 18:35:...|        KwH|1.738139198460000000|
|        6|2016-04-05 18:35:...|        KwH|1.436533017840000000|
|        6|2016-04-05 18:34:...|        KwH|1.208331618880000000|
|        6|2016-04-05 18:34:...|        KwH|0.331787741071000000|
|        6|2016-04-05 18:34:...|        KwH|0.525922436202000000|
|        6|2016-04-05 18:33:...|        KwH|0.602361949655000000|
|        6|2016-04-05 18:33:...|        KwH|0.187636462810000000|
|        6|2016-04-05 18:33:...|        KwH|1.168336551980000000|
|        6|2016-04-05 18:32:...|        KwH|1.456318619910000000|
|        6|2016-04-05 18:32:...|        KwH|1.423352235120000000|
|        6|2016-04-05 18:32:...|        KwH|1.784004004300000000|
|        6

In [8]:
df.select("device_id").distinct().show()

+---------+
|device_id|
+---------+
|       50|
|       51|
|       52|
|       53|
|       54|
|       55|
|       56|
|       57|
|       58|
|       59|
|        1|
|        2|
|        3|
|        4|
|       60|
|        5|
|       61|
|        6|
|       62|
|        7|
+---------+
only showing top 20 rows



In [9]:
df.groupBy("device_id").count().show()

+---------+-----+
|device_id|count|
+---------+-----+
|       50|  149|
|       51|  149|
|       52|  149|
|       53|  149|
|       54|  149|
|       55|  149|
|       56|  149|
|       57|  149|
|       58|  149|
|       59|  149|
|        1|  149|
|        2|  149|
|        3|  149|
|       60|  149|
|        4|  149|
|        5|  149|
|       61|  149|
|        6|  149|
|       62|  149|
|        7|  149|
+---------+-----+
only showing top 20 rows



In [10]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import datetime
from pyspark.sql import functions as func
from pyspark.sql.functions import col
from pyspark.sql.functions import lit

### set time variables for date filtering

In [11]:
time = datetime.datetime.now()
epochtime = int(time.strftime("%s"))
start_time = epochtime - 86400
compare_time = datetime.datetime.fromtimestamp(start_time)

### create a dataframe from the raw metrics

In [12]:
rawmetrics = sqlContext.read.format("org.apache.spark.sql.cassandra")\
    .options(table="raw_metrics", keyspace="metrics").load()

### filter metrics to those in last 24 hours

In [8]:
last_day = rawmetrics.where(rawmetrics.metric_time > compare_time)
last_day.show()

+---------+--------------------+-----------+------------+
|device_id|         metric_time|metric_name|metric_value|
+---------+--------------------+-----------+------------+
|        1|2016-03-31 23:43:...|        KWH|   1.5783356|
|        1|2016-03-31 23:28:...|        KWH|   1.0391227|
|        1|2016-03-31 23:13:...|        KWH|  0.26933274|
|        1|2016-03-31 22:58:...|        KWH|  0.10949156|
|        1|2016-03-31 22:43:...|        KWH|   1.5273823|
|        1|2016-03-31 22:28:...|        KWH|  0.10674351|
|        1|2016-03-31 22:13:...|        KWH|   1.7821758|
|        1|2016-03-31 21:58:...|        KWH|   0.9134149|
|        1|2016-03-31 21:43:...|        KWH|   1.3860309|
|        1|2016-03-31 21:28:...|        KWH|   1.4682286|
|        1|2016-03-31 21:13:...|        KWH|   0.8666165|
|        1|2016-03-31 20:58:...|        KWH|   1.0810951|
|        1|2016-03-31 20:43:...|        KWH|   1.4882127|
|        1|2016-03-31 20:28:...|        KWH|  0.43541336|
|        1|201

### aggregates

In [9]:
averages = last_day.groupby('device_id').agg(func.avg('metric_value').alias('metric_avg'))
maximums = last_day.groupby('device_id').agg(func.max('metric_value').alias('metric_max'))
minimums = last_day.groupby('device_id').agg(func.min('metric_value').alias('metric_min'))

### rename id columns for uniqueness

In [10]:
averages_a = averages.withColumnRenamed("device_id", "id")
maximums_a = maximums.withColumnRenamed("device_id", "maxid")
minimums_a = minimums.withColumnRenamed("device_id", "minid")

### join the tables above

In [11]:
temp = averages_a.join(maximums_a, averages_a.id == maximums_a.maxid)
aggs = temp.join(minimums, temp.id == minimums.device_id).select('id','metric_min','metric_max','metric_avg')

### add columns to format for cassandra

In [13]:
addday = aggs.withColumn("metric_day", lit(time))
addname = addday.withColumn("metric_name",lit("KWH"))
inserts = addname.withColumnRenamed("id","device_id")

### Save to a new table

In [14]:
inserts.write.format("org.apache.spark.sql.cassandra").\
    options(table="daily_rollups", keyspace = "metrics").save(mode ="append")