##Read in latest weather forecast from Cloudant



In [1]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

In [20]:
val df = sqlContext.read.format("com.cloudant.spark").
option("cloudant.host","xxx").
option("cloudant.username", "xxx").
option("cloudant.password","xxx").
load("weather")

df.cache

Use dbName=weather, indexName=null, jsonstore.rdd.partitions=5, jsonstore.rdd.maxInPartition=-1, jsonstore.rdd.minInPartition=10, jsonstore.rdd.requestTimeout=100000,jsonstore.rdd.concurrentSave=-1,jsonstore.rdd.bulkSize=1


[_id: string, _rev: string, airport: string, forecasts: array<struct<blurb:string,blurb_author:string,class:string,day:struct<accumulation_phrase:string,alt_daypart_name:string,clds:bigint,day_ind:string,daypart_name:string,fcst_valid:bigint,fcst_valid_local:string,golf_category:string,golf_index:bigint,hi:bigint,icon_code:bigint,icon_extd:bigint,long_daypart_name:string,narrative:string,num:bigint,phrase_12char:string,phrase_22char:string,phrase_32char:string,pop:bigint,pop_phrase:string,precip_type:string,qpf:double,qualifier:string,qualifier_code:string,rh:bigint,shortcast:string,snow_code:string,snow_phrase:string,snow_qpf:double,snow_range:string,subphrase_pt1:string,subphrase_pt2:string,subphrase_pt3:string,temp:bigint,temp_phrase:string,thunder_enum:bigint,thunde...

In [4]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- _rev: string (nullable = true)
 |-- forecasts: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- blurb: string (nullable = true)
 |    |    |-- blurb_author: string (nullable = true)
 |    |    |-- class: string (nullable = true)
 |    |    |-- day: struct (nullable = true)
 |    |    |    |-- accumulation_phrase: string (nullable = true)
 |    |    |    |-- alt_daypart_name: string (nullable = true)
 |    |    |    |-- clds: long (nullable = true)
 |    |    |    |-- day_ind: string (nullable = true)
 |    |    |    |-- daypart_name: string (nullable = true)
 |    |    |    |-- fcst_valid: long (nullable = true)
 |    |    |    |-- fcst_valid_local: string (nullable = true)
 |    |    |    |-- golf_category: string (nullable = true)
 |    |    |    |-- golf_index: long (nullable = true)
 |    |    |    |-- hi: long (nullable = true)
 |    |    |    |-- icon_code: long (nullable = true)
 |    |    |    |-

In [3]:
df.show

+----------+--------------------+--------------------+--------------------+
|       _id|                _rev|           forecasts|            metadata|
+----------+--------------------+--------------------+--------------------+
| Amsterdam|1-08065c0d6139c3c...|List([null,null,f...|[1450300349,en-US...|
|    Athens|1-104084cae4f985a...|List([null,null,f...|[1450300441,en-US...|
| Barcelona|1-20b3a6e72edcba2...|List([null,null,f...|[1450300038,en-US...|
|  Belgrade|1-1ab7dc68b695ed6...|List([null,null,f...|[1450300503,en-US...|
|    Bergen|1-3d529ab9932aae5...|List([null,null,f...|[1450299367,en-US...|
|    Berlin|1-c86342015754d4d...|List([null,null,f...|[1450299703,en-US...|
|Bratislava|1-270a6691df02f71...|List([null,null,f...|[1450299512,en-US...|
|  Brussels|1-8d9b7a564638239...|List([null,null,f...|[1450300780,en-US...|
| Bucharest|1-e4beec7e45e0552...|List([null,null,f...|[1450300702,en-US...|
|  Budapest|1-e06b369545e4d73...|List([null,null,f...|[1450300697,en-US...|
|Copenhagen|

### Perform ETL on Weather Data

In [21]:
df.registerTempTable("weather")

In [22]:
case class Weather(dow: String, fcst_valid_local: String, num: Long,
                  max_temp: Long, min_temp: Long,
                  hi: Long, wspd: Long, wdir_cardinal: String, icon_code: Long,
                  precip_type: String, pop: Long, city: String, airport: String, _id: String) extends java.io.Serializable

In [23]:
val usefulWeatherData = sqlContext.sql("""SELECT forecasts.dow, forecasts.fcst_valid_local, 
forecasts.num, forecasts.max_temp, forecasts.min_temp, forecasts.day.hi, 
forecasts.day.wspd, forecasts.day.wdir_cardinal, forecasts.day.icon_code, 
forecasts.day.precip_type, forecasts.day.pop, _id, airport FROM weather""")


In [24]:
usefulWeatherData.show

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-------+
|                 dow|    fcst_valid_local|                 num|            max_temp|            min_temp|                  hi|                wspd|       wdir_cardinal|           icon_code|         precip_type|                 pop|       _id|airport|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-------+
|List(Monday, Tues...|List(2015-12-21T0...|List(1, 2, 3, 4, ...|List(null, 15, 11...|List(9, 11, 7, 7,...|List(null, 15, 11...|List(null, 42, 34...|List(null, SW, WS...|List(null, 11, 24...|List(null, rain, ...|List(null, 90, 20...| Amsterdam| 

In [33]:
val formatted = usefulWeatherData.flatMap(r => for {
      x <- Range(1,11)
      y = Weather(r.getList[String](0).get(x), r.getList[String](1).get(x).split("T")(0),
                 r.getList[Long](2).get(x), r.getList[Long](3).get(x),
                 r.getList[Long](4).get(x), r.getList[Long](5).get(x), r.getList[Long](6).get(x),
                 r.getList[String](7).get(x), r.getList[Long](8).get(x), r.getList[String](9).get(x),
                 r.getList[Long](10).get(x), r.getString(11), 
                 r.getString(12), r.getString(11) + "-" + r.getList[Long](2).get(x))
    } yield y).toDF

In [34]:
formatted.show

+---------+----------------+---+--------+--------+--+----+-------------+---------+-----------+---+---------+-------+------------+
|      dow|fcst_valid_local|num|max_temp|min_temp|hi|wspd|wdir_cardinal|icon_code|precip_type|pop|     city|airport|         _id|
+---------+----------------+---+--------+--------+--+----+-------------+---------+-----------+---+---------+-------+------------+
|  Tuesday|      2015-12-22|  2|      15|      11|15|  42|           SW|       11|       rain| 90|Amsterdam|    AMS| Amsterdam-2|
|Wednesday|      2015-12-23|  3|      11|       7|11|  34|          WSW|       24|       rain| 20|Amsterdam|    AMS| Amsterdam-3|
| Thursday|      2015-12-24|  4|      12|       7|12|  39|            S|       11|       rain| 60|Amsterdam|    AMS| Amsterdam-4|
|   Friday|      2015-12-25|  5|      10|       7|10|  28|           SW|       30|       rain| 10|Amsterdam|    AMS| Amsterdam-5|
| Saturday|      2015-12-26|  6|      12|       7|12|  23|          SSW|       26|       r

In [35]:
formatted.write.format("com.cloudant.spark").option("cloudant.host","xxx").
option("cloudant.username", "xxx").
option("cloudant.password","xxx").
save("parsed_weather")

Use dbName=parsed_weather, indexName=null, jsonstore.rdd.partitions=5, jsonstore.rdd.maxInPartition=-1, jsonstore.rdd.minInPartition=10, jsonstore.rdd.requestTimeout=100000,jsonstore.rdd.concurrentSave=-1,jsonstore.rdd.bulkSize=1
