Skip to content

7. Spark and Cassandra Exercises for KillrWeather data

Jurgis Pods edited this page Dec 1, 2015 · 5 revisions

Using Spark Shell

The following assumes you have installed DataStax Enterprise

You can read the docs here.

From the <DSE home dir>/bin directory, start the Spark Shell:

./dse spark

Try out some commands:

:showSchema isd_weather_data

:showSchema isd_weather_data raw_weather_data

val tableRDD = sc.cassandraTable("isd_weather_data", "raw_weather_data")

tableRDD.count

val firstRow = tableRDD.first

val tempRDD = sc.cassandraTable("isd_weather_data", "raw_weather_data").select("hour", "temperature").where("wsid = ? AND year = ? AND month = ? AND day = ?", "724940:23234", 2008, 6, 1)

tempRDD.toArray.foreach(println)

val singleWeatherStation: Array[Double] = sc.cassandraTable[Double]("isd_weather_data", "raw_weather_data").select("temperature").where("wsid = ?", "725030:14732").collect()

import org.apache.spark.util.StatCounter

println(StatCounter(singleWeatherStation))

Saving data back to Cassandra

From the CQLshell:

select year, month, day, hour, temperature from raw_weather_data where wsid='724957:23213';

In the Spark command line, now insert some new data:

case class RawWeather(wsid: String, year: Int, month:Int, Day:Int,  hour:Int, temperature:Double)

val collection = sc.parallelize(Seq(RawWeather("724957:23213", 2015, 7, 8, 8, 16), RawWeather("724957:23213", 2015, 7, 8, 7, 14)))

collection.saveToCassandra("isd_weather_data", "raw_weather_data", SomeColumns("wsid", "year", "month", "day", "hour", "temperature"))

Verify that it's been added:

select year, month, day, hour, temperature from raw_weather_data where wsid='724957:23213';

Spark SQL

Start with ./dse spark-sql

Change keyspaces:

use isd_weather_data;

Show all the ables in the keyspace. They are automatically mapped

show tables;

Describe the table definition:

describe raw_weather_data

Show a simple select:

select count(*) from raw_weather_data;

How about a min and max?

SELECT wsid, year, month, day, max(temperature) high, min(temperature) low
FROM raw_weather_data
WHERE month = 6
GROUP BY wsid, year, month, day;

The 0.0 is an artifact, lets filter them out

SELECT wsid, year, month, day, max(temperature) high, min(temperature) low
FROM raw_weather_data
WHERE month = 6
AND temperature !=0.0
GROUP BY wsid, year, month, day;

Now lets do a join!

SELECT ws.name, raw.temperature 
FROM raw_weather_data raw 
JOIN weather_station ws 
ON raw.wsid=ws.id 
WHERE raw.wsid = '722950:23174' 
AND raw.year = 2013 AND raw.month = 6 AND raw.day = 1;

Just for fun. Try out this query. What is it finding?

select weather_station.id,
    weather_station.call_sign,
    weather_station.country_code,
    weather_station.name,
    ranked_temp.avg_temp from
    (
      SELECT wsid, year, month, day, daily.avg_temp,
        dense_rank() OVER (PARTITION BY wsid order by avg_temp desc) as rank
      FROM
      (select wsid, year, month, day, avg(temperature) as avg_temp
      FROM raw_weather_data
    group by wsid, year, month,day ) daily
) ranked_temp
JOIN weather_station on ranked_temp.wsid = weather_station.id
where rank <= 5;