#Tutorial: Analyzing Weather Data using Apache Spark


[**GETTING STARTED:**](#Data-Scientist-Workbench)  

&nbsp;&nbsp;[**Data Scientist Workbench**](#Data-Scientist-Workbench)   
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[IPython Notebook](#IPython-Notebook)  

&nbsp;&nbsp;[**How to use Data Scientist Workbench**](#How-to-use-Data-Scientist-Workbench)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[How do I upload a CSV file onto Data Scientist Workbench?](#How-do-I-upload-a-CSV-file-onto-Data-Scientist-Workbench?)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[How do I change the name of my uploaded CSV file?](#How-do-I-change-the-name-of-my-uploaded-CSV-file?)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[How do I import a file from the Internet - an external URL?](#How-do-I-import-a-file-from-the-Internet---an-external-URL?)  


&nbsp;&nbsp;[**Big Data University**](#Big-Data-University)

&nbsp;&nbsp;[**Apache Spark**](#Apache-Spark)  

&nbsp;&nbsp;[**SparkSQL Dataframes**](#SparkSQL-Dataframes)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[What is Spark SQL and what is a DataFrame?](#What-is-Spark-SQL-and-what-is-a-DataFrame?)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[More information on Spark SQL and DataFrames](#More-information-on-Spark-SQL-and-DataFrames)  

## [**TUTORIAL: Data Preparation with Spark using Weather Data**](#Tutorial:-Data-Analysis-with-Spark-using-Weather-Data)  

&nbsp;&nbsp;[**The Data:**](#Data-source)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[Understanding the Weather Data](#Understanding-the-Weather-Data)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[Data source](#Data-source)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[Get the data](#Get-the-data)  

&nbsp;&nbsp;[**Initial Steps:**](#Import-modules)    
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[Import modules](#Import-modules)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[Understanding the API](#Understanding-the-API)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[Set the API key](#Set-the-API-key)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[Import weather stations dataset](#Import-weather-stations-dataset)  

&nbsp;&nbsp;[**Creating Functions to Process the Data**](#Creating-Functions-to-Process-the-Data)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[[Function] getDates](#[Function]-getDates)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[[Function] saveUrlRDD](#[Function]-saveUrlRDD)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[[Function] preprocess](#[Function]-preprocess)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[[Function] save_csv](#[Function]-save_csv)  


[**Running the scripts**](#Running-the-scripts)   
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[1. Select start and end date](#1.-Select-start-and-end-date)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[2. Run the scripts](#2.-Run-the-scripts)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[3. Quality-check the results](#3.-Quality-check-the-results)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[4. Check to make sure the output .csv file exists](#4.-Check-to-make-sure-the-output-.csv-file-exists)  

[**Contact the Notebook Authors**](#Contact-the-Notebook-Authors)

# Data Scientist Workbench
The Data Scientist Workbench is built around the IPython Notebook and offers the following capabilities:

1. Runs in the cloud
2. Allows users to tag and organize notebooks and data
3. Allows users to search notebooks
4. Enables users to share and import notebooks

## IPython Notebook

[IPython Notebook](http://ipython.org) is a web-based environment for interactive computing.  IPython Notebook enables you to write and execute code within a "notebook" in your web browser.  You enter code into an input cell, and when you run the cell, the notebook executes the code and prints any output to an output cell.  You can change the code in an input cell and re-run the cell as often as you like.  In this way, the notebook follows a [Read Evaluate Print Loop](http://en.wikipedia.org/wiki/Read–eval–print_loop) paradigm.

But that's not all.  The notebook also supports rendering markup cells (like this one) inline, so you can embed text, [markdown](http://daringfireball.net/projects/markdown/), HTML, images, videos, and even interactive widgets, all within a notebook.

The flow of a notebook is top to bottom, and you can create as many cells as you desire.  The interactive nature and the ability to render text and media makes IPython Notebook a powerful environment for working with data, performing analyses, and documenting results.

<div class="alert alert-block alert-info" style="margin-top: 20px">**Note:** In these tutorials, we assume you have a basic familiarity with the [Python Programming Language](https://docs.python.org/3/) and the [IPython Notebook](http://nbviewer.ipython.org/github/ipython/ipython/blob/2.x/examples/Notebook/Index.ipynb). If you need more background information, we recommend these popular websites and notebooks:

<ul>
<li>[Learn Python the Hard Way](http://learnpythonthehardway.org/book/)
<li>[Introduction to the IPython Notebook](http://nbviewer.ipython.org/github/ipython/ipython/blob/master/examples/Notebook/Index.ipynb)
</ul>
</div>

# How to use Data Scientist Workbench

### How do I upload a CSV file onto Data Scientist Workbench?
From your local folder on your computer, **drag the file into this page.**  
When the upload is complete, you will see your file under **"Recent Data"** in the sidebar to the right!

### How do I change the name of my uploaded CSV file?

To change the name:  
1. Click on the "`>`" button to the left of your file in the **"Recent Data"** sidebar on the right.
2. Click on _Rename_ to change the filename, and press _Enter_.

### How do I import a file from the Internet - an external URL?
You can import files from external URLs to your Data Scientist Workbench. You simply copy the URL into the search bar the top right corner of the page, press _Enter_, and the file will be automatically downloaded to the **"Recent Data"** sidebar. Note that the URL file must meet the following requirements:

1. Supports HTTP or HTTPS protocol
1. Supported file formats:
    * Plain text files
    * CSV format file
    * JSON format files (including notebooks supported * .ipynb format)

# Big Data University
**Big Data University** ([www.BigDataUniversity.com](http://bigdatauniversity.com)) is an educational resource where all of the courses are free, video-based, and you can learn at your own pace.

# Apache Spark

What is Apache Spark [http://spark.apache.org/](http://spark.apache.org/)? Learn more about Apache Spark through [**Big Data University**](http://bigdatauniversity.com):

- [**Spark Fundamentals I**](http://bigdatauniversity.com/bdu-wp/bdu-course/spark-fundamentals/)
    - Describe what Spark is all about know why you would want to use Spark 
    - Use Resilient Distributed Datasets operations 
    - Use Scala, Java, or Python to create and run a Spark application 
    - Create applications using Spark SQL, MLlib, Spark Streaming, and GraphX 
    - Configure, monitor and tune Spark  

- [**Spark Fundamentals II**](http://bigdatauniversity.com/bdu-wp/bdu-course/spark-fundamentals-ii/) 
    - Apache Spark architecture overview 
    - Understanding input, partitioning, and parallelization 
    - Optimizations for efficiently operating on and joining multiple datasets 
    - Understanding how Spark instructions are translated into jobs and what causes multiple stages within a job 
    - Efficiently using Spark’s memory caching for iterative processing 
    - Developing, testing, and debugging Spark applications using SBT, Eclipse, and IntelliJ - 



# SparkSQL Dataframes

###What is Spark SQL and what is a DataFrame?
Spark SQL is Apache Spark's module for working with structured data. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.   
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood. 

###More information on Spark SQL and DataFrames
For more information, see the official guide here: [http://spark.apache.org/docs/latest/sql-programming-guide.html](http://spark.apache.org/docs/latest/sql-programming-guide.html)

#Tutorial: Data Preparation with Spark using Weather Data

##Understanding the Weather Data


###`stations.txt`:
A table containing information on 97 weather stations at international airports throughout the United States.
- **ID**: is the station identification code. Please see 'ghcnd-stations.txt' at ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ for a complete list of stations and their metadata.
- **LATITUDE**: is latitude of the station (in decimal degrees)
- **LONGITUDE**: is the longitude of the station (in decimal degrees)
- **ELEVATION**: is the elevation of the station (in meters, missing = -999.9)
- **NAME**: is the name of the station

## Data source

Check API documentation

### Get the data

Run the command below to download `stations.csv` directly into Data Scientist Workbench

In [None]:
!wget -O /resources/stations.txt https://ibm.box.com/shared/static/pplb4jcylm728lpen0jw5yfr5myhc78j.txt
print 'You should now see stations.csv under "Recent data" in your sidebar'

### Import modules

In [2]:
import json
import csv
import pandas as pd
import datetime
import time
import requests
import shutil
import os
from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql import HiveContext
from datetime import timedelta, date

### Understanding the API

In this tutorial, we send API GET requests to api.weather.com, in the format:

> `http://api.weather.com/v1/geocode/MY_LATITUDE/MY_LONGITUDE/observations/historical.json?apiKey=MY_APIKEY&units=m&startDate=YYYYMMDD&endDate=YYYYMMDD`



Notice in the link that there are parameters that need to be set:
1. **MY_LATITUDE & MY_LONGITUDE:** The latitude and longitude of the weather station. We have prepared a list of weather stations, as described in the next section
1. **MY_APIKEY:** The API key provided to you for api.weather.com
1. **units=m:** You can change m, which is currently set to Metric units:  
    - For en-US or en, the default units of measure code  is English/Imperial. The units code is “e” .
    - For en-gb, the default units of measure is Hybrid-UK. The units code is  “h”.
    - For everything else, the default units of measure is  Metric. The units code is “m”.
1. **startDate and endDate:** The range that you want to retrieve data for. The maximum range between the two dates is 31 days.


### Set the API key

In [3]:
apiKey = 'MY_APIKEY'

### Import weather stations dataset

In [4]:
stRDD = sc.textFile('/resources/stations.txt') #international airports only
stRDD = stRDD.map(lambda x: (x[0:13].strip(), #ID
                           x[12:20].strip(), #LATITUDE
                           x[20:30].strip(), #LONGITUDE
                           x[41:71].strip() #NAME
                          ))

stRDD.cache()

PythonRDD[2] at RDD at PythonRDD.scala:43

#### Check contents of stations.csv

In [5]:
stRDD.take(5)

[(u'USW00003017', u'39.8328', u'-104.6575', u'DENVER INTL AP'),
 (u'USW00003102', u'34.0561', u'-117.6003', u'ONTARIO INTL AP'),
 (u'USW00003196', u'31.4208', u'-110.8458', u'NOGALES INTL AP'),
 (u'USW00003822', u'32.1300', u'-81.2100', u'SAVANNAH INTL AP'),
 (u'USW00003856', u'34.6439', u'-86.7861', u'HUNTSVILLE INTL AP')]

#### How many stations are there in stations.csv?

In [6]:
stRDD.count()

97

<hr>

## Creating Functions to Process the Data

### [Function] getDates 

This function returns a list of tuples containing the start and end date.

In [7]:
def getDates(start_date, end_date, delta):
    '''
    start_date: starting date in format YYYYMMDD
    end_date: end date in format YYYYMMDD
    delta: an integer; the interval between start_date and end_date
    '''
    from datetime import timedelta, date
    delta = delta -1
    #Convert dates to date format
    s = datetime.date(int(str(start_date)[:4]),int(str(start_date)[4:6]),int(str(start_date)[6:]))
    e = datetime.date(int(str(end_date)[:4]),int(str(end_date)[4:6]),int(str(end_date)[6:]))

    #Set delta
    if (int(delta) > 30 or int(delta) < 1):
        raise ValueError('Error: delta out of range. 1 <= delta <= 30')
    else:
        delta = timedelta(days=int(delta))

    days= []
    day_from = s
    while (day_from) < e:
        day_to = day_from + delta
        days.append((day_from,day_to))
        print '...for range %s to %s ...' % (day_from.strftime("%Y%m%d"), day_to.strftime("%Y%m%d"))
        day_from += delta
    return days

#### Testing getDates

In [8]:
getDates(20150101,20150630,30)

...for range 20150101 to 20150130 ...
...for range 20150130 to 20150228 ...
...for range 20150228 to 20150329 ...
...for range 20150329 to 20150427 ...
...for range 20150427 to 20150526 ...
...for range 20150526 to 20150624 ...
...for range 20150624 to 20150723 ...


[(datetime.date(2015, 1, 1), datetime.date(2015, 1, 30)),
 (datetime.date(2015, 1, 30), datetime.date(2015, 2, 28)),
 (datetime.date(2015, 2, 28), datetime.date(2015, 3, 29)),
 (datetime.date(2015, 3, 29), datetime.date(2015, 4, 27)),
 (datetime.date(2015, 4, 27), datetime.date(2015, 5, 26)),
 (datetime.date(2015, 5, 26), datetime.date(2015, 6, 24)),
 (datetime.date(2015, 6, 24), datetime.date(2015, 7, 23))]

### [Function] saveUrlRDD

This function, `saveUrlRDD` does the following:
1. inputs as parameters a start_date and end_date
1. generates a list of date ranges using `getDates` to feed to API request
1. list of dates gets joined with list of stations
1. for each station, an API request is sent per date range, using the latitude and longitude of the station
1. data received from API is obtained as JSON, and written to file using Spark's `saveAsTextFile` command
1. the text file generated is zipped to disk for backup
1. `saveUrlRDD` returns a JSON RDD containing the all of the daily observations for all of the stations between the start and end date

In [9]:
def saveUrlRDD(start_date, end_date, delta = None):
    '''
    start_date: date in the format YYYYMMDD. 
        E.g., 20100101
    end_date: date in the format YYYYMMDD. 
        E.g., 20101231
    delta: interval length between the two dates at which to retrieve day.
        Default: end_date minus start_date in days, to a maximum of 30.
        E.g., Set delta to 1 to retrieve data for every day
        E.g., Set delta to 7 to retrieve data once every seven days
    '''
    print 'Starting...'
    start = time.time()
    
    if delta == None:
        s = datetime.date(int(str(start_date)[:4]),int(str(start_date)[4:6]),int(str(start_date)[6:]))
        e = datetime.date(int(str(end_date)[:4]),int(str(end_date)[4:6]),int(str(end_date)[6:]))
        delta = abs((e-s).days)
        if delta > 30:
            delta = 30

    #Collect RDD list of dates to retrieve data
    days = getDates(start_date, end_date, delta)
    daysRDD = sc.parallelize(days)
    
    #Combine station list with dates to retrieve data on
    stndateRDD=stRDD.cartesian(daysRDD)


    print '...starting API GET request...'
    
    #Send API request based on station list and dates to retrieve data on
    urlRDD = stndateRDD.map(lambda (s,d):requests.get('http://api.weather.com/v1/geocode/'+
                                          str(s[1]) + '/' + str(s[2]) + 
                                          '/observations/historical.json?apiKey=' + 
                                          apiKey + '&units=m&startDate=' + 
                                          d[0].strftime("%Y%m%d") + '&endDate=' + 
                                          d[1].strftime("%Y%m%d"))\
                                          .json()) 
    #Set filename
    filename= 'urlRDD'+'-Date-'+str(start_date)+'-'+str(end_date)
    
    if (os.path.isdir("/resources/"+filename) & os.path.exists("/resources/"+filename)):
        print '...deleting existing folder...'
        shutil.rmtree('/resources/'+filename) 
      
    urlRDD.map(lambda x:json.dumps(x)).saveAsTextFile('/resources/'+filename)
    
    #zip up urlRDD file contents and save as backup
    print '...zipping up the files...'
    shutil.make_archive(filename, format="zip", root_dir=(filename))
    
    #Check filesize of zip
    filesize = os.stat('/resources/'+filename+'.zip').st_size/1000 #returns size of zip in kb
    print '...zipped file is [%s kB] at "/resources/%s.zip" ...' % (filesize, filename)
    
    #Remove original text files after zipping
    shutil.rmtree('/resources/'+filename) 
    
    #check directory
    print '...saveUrlRDD DONE. Elapsed time: [%s secs].' % str(round((time.time() - start),0))
    print '...#Stations: %s...' % str(stRDD.count())
    print '...#Records:  %s...' % str(stndateRDD.count())
    
    #optional for debugging:
    return urlRDD.map(lambda x:json.dumps(x))

### [Function] preprocess

This function, `preprocess`, does the following:
1. Before the function is created, we create some user-defined functions to convert dates into human-readable Years, Months, Days, and Dates in Spark
1. Inputs as its parameter the output of `saveUrlRDD`, a JSON RDD containing the all of the daily observations for all of the stations between the start and end date
1. A Spark SQL Context is created which reads the dataRDD and convert it into a proper JSON RDD object that Spark SQL can use.
1. The table is registered as a table in Spark using `registerTempTable`.
1. The table is queried using `sqlContext.sql`.
1. New columns for year, month, day, and date are created using the previously-defined UDFs.
1. The RDD is cached to optimize processing speed.
1. For each station and date, the MAX, MIN, and MEAN temperature is calculated, using `groupBy` and `agg`.
1. The output, a relatively small dataframe, is exported as a Pandas dataframe.
1. `preprocess` returns the Pandas dataframe.

In [10]:
# Register user-defined functions as Spark UDFs 
udf_yearform = udf(lambda x:time.strftime('%Y', time.localtime(x)), pyspark.sql.types.StringType())
udf_monthform = udf(lambda x:time.strftime('%m', time.localtime(x)), pyspark.sql.types.StringType())
udf_dayform = udf(lambda x:time.strftime('%d', time.localtime(x)), pyspark.sql.types.StringType())
udf_dateform = udf(lambda x:time.strftime('%Y-%m-%d', time.localtime(x)), pyspark.sql.types.StringType())

def preprocess(dataRDD):
    '''
    Takes in a dataRDD containing a list of JSON RDDS, convert to SparkSQL dataframe.
    Data is filtered, aggregated, and saved to file.
    '''
    start = time.time()
    print '...Starting preprocess...'
    
    
    sqlContext = HiveContext(sc)
    Weather_sdf = sqlContext.jsonRDD(dataRDD)
    
    print Weather_sdf.printSchema()
    
    Weather_sdf.registerTempTable("weatherTable")
    Weather_sdf=sqlContext.sql("SELECT metadata.latitude AS LATITUDE,\
                 metadata.longitude AS LONGITUDE,\
                 observation.obs_name,\
                 observation.key,\
                 observation.valid_time_gmt,\
                 observation.temp\
                 FROM weatherTable LATERAL VIEW explode(observations) obstable as observation")
    
    
    # To add new columns
    Weather_sdf=Weather_sdf.withColumn("obsYear", udf_yearform(Weather_sdf['valid_time_gmt']))
    Weather_sdf=Weather_sdf.withColumn("obsMonth", udf_monthform(Weather_sdf['valid_time_gmt']))
    Weather_sdf=Weather_sdf.withColumn("obsDay", udf_dayform(Weather_sdf['valid_time_gmt']))
    Weather_sdf=Weather_sdf.withColumn("obsDate", udf_dateform(Weather_sdf['valid_time_gmt']))
    
    Weather_sdf.cache() #Very important. Speeds up entire pipeline.
    
    time1 = time.time()
    
    print 'PREPROCESS took [' + str(round(time1 - start,1)) + ' seconds] to run.'
    
    print Weather_sdf.show(3)
    
    print 'Processing using groupBy and agg...'
    
    # select data for plot
    pdata = Weather_sdf.groupBy(["obs_name","obsDate","LATITUDE","LONGITUDE"])\
    .agg(F.max("temp"), F.min("temp"), F.mean("temp"))
    
    time2 = time.time()
    
    print 'GROUPBY/AGG took [' + str(round(time2 - time1,1)) + ' seconds] to run.'
    
    pdata.cache()
    
    print pdata.show(3)
    
    
    # to write data
    

    print pdata.count()
    pdf = pdata.toPandas()
    
    return pdf
   
    print '...finished saving to csv.'

### [Function] save_csv

Create a function to save the result of `preprocess` to a .csv file for data visualization

In [11]:
def save_csv(x, start_date, end_date):
    #x is the pandas dataframe
    filename = 'USweather_' + str(start_date) + '-' + str(end_date)
    print filename
    if (os.path.isfile("/resources/" + filename) & os.path.exists("/resources/" + filename)):
        os.remove('/resources/' + filename)
    return x.to_csv('/resources/' + filename + ".csv")
 

<hr>

## Running the scripts

#### 1. Select start and end date

Choose the range in dates that you want to retrieve data for.

In [12]:
#start_date and end_date should be in YYYYMMDD format

start_date = 20140101
end_date = 20140130

#### 2. Run the scripts

**Warning** this may several minutes to run. An API request is sent for each of the 97 stations, for the date range. If the date range exceeds 30 days, there will be an API request for each 30-day increment from the start until end date.

**Estimated combined running time for `saveUrlRDD` and `preprocess`:**  
With date range of
- One month: approx. 2-3 minutes (e.g., 20140101 to 20140131)
- One year: approx. 20 minutes (e.g., 20140101 to 20141231)


Overview of what the script will do:
1. **`saveUrlRDD`** calls on **`getDates`** to create an RDD of dates to send to API.  
1. **`saveUrlRDD`** merges **`getDates`** data with list of weather stations in **stations.csv** 
1. **`saveUrlRDD`** sends an API request for each station, for each of the date intervals generated from **`getDates`**, saves the RDD result to textFile, and zips it up.  
1. **`saveUrlRDD`** returns an RDD of the results in JSON format.  
1. **`preprocess`** takes the JSON RDD output of **`saveUrlRDD`** and converts it to a Spark SQL Dataframe.
1. The dataframe now consists of rows for each date for each station.
1. Dataframe is aggregated by station and by date, and the follow information is returned:  
  - **obs_name**: name of the weather station
  - **obsDate**: the observation date in _YYYY-MM-DD_ format
  - **LATITUDE**: latitude of the station
  - **LONGITUDE**: longitude of the station
  - **MAX(temp)**: maximum temperature
  - **MIN(temp)**: minimum temperature
  - **AVG(temp)**: average temperature
1. The output of **`preprocess`** is a pandas dataframe, which becomes saved to disk as backup at _`/resources/processedYYYYMMDD-YYYYMMDD.csv`_.

### Running `saveUrlRDD`

In [13]:
#Set start time
start = time.time() 


#Run saveUrlRDD
print 'Running saveUrlRDD from ' +str(start_date)+ ' to ' + str(end_date)
result_saveUrlRDD = saveUrlRDD(start_date, end_date)


#Set end time
end = time.time()

print 'TOTAL ELAPSED TIME : ' + str(end-start) + 'seconds'

Running saveUrlRDD from 20140101 to 20140130
Starting...
...for range 20140101 to 20140129 ...
...for range 20140129 to 20140226 ...
...starting API GET request...
...zipping up the files...
...zipped file is [7014 kB] at "/resources/urlRDD-Date-20140101-20140130.zip" ...
...saveUrlRDD DONE. Elapsed time: [173.0 secs].
...#Stations: 97...
...#Records:  194...
TOTAL ELAPSED TIME : 173.155771971seconds


### Running `proprocess`

In [14]:
#Set start time
start = time.time() 


#Run preprocess on the result of saveUrlRDD
pdf = preprocess(result_saveUrlRDD) 


#Set end time
end = time.time()

print 'TOTAL ELAPSED TIME : ' + str(end-start) + 'seconds'

...Starting preprocess...
root
 |-- metadata: struct (nullable = true)
 |    |-- expire_time_gmt: long (nullable = true)
 |    |-- language: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- status_code: long (nullable = true)
 |    |-- transaction_id: string (nullable = true)
 |    |-- units: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- observations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- blunt_phrase: string (nullable = true)
 |    |    |-- class: string (nullable = true)
 |    |    |-- day_ind: string (nullable = true)
 |    |    |-- dewPt: long (nullable = true)
 |    |    |-- expire_time_gmt: long (nullable = true)
 |    |    |-- feels_like: long (nullable = true)
 |    |    |-- gust: long (nullable = true)
 |    |    |-- heat_index: long (nullable = true)
 |    |    |-- icon_extd: long (nullable = true)
 |    |    |-- key: string

### Running `save_csv`

In [15]:
#save to CSV file
save_csv(pdf, start_date, end_date)

pdf.head()

USweather_20140101-20140130


Unnamed: 0,obs_name,obsDate,LATITUDE,LONGITUDE,MAX(temp),MIN(temp),AVG(temp)
0,Niagara Falls,2014-02-04,43.1,-78.94,-4,-11,-7.96
1,Fargo,2014-02-20,46.92,-96.81,4,-11,-3.678571
2,Philadelphia,2014-02-15,39.86,-75.23,3,1,1.612245
3,Houlton,2014-02-26,46.12,-67.79,-7,-24,-13.08
4,Boston,2014-02-09,42.36,-71.01,-1,-6,-3.4375


### Did you notice some files being created during the scripts?
1. Initiating the Spark context automatically generates some files, such as 'derby.log'. These files can be safely ignored.
2. A folder containing a number of 'part' files are created from Spark's `.saveAsTextFile` command, which contains an RDD of all of the weather station observations. Once this folder becomes zipped in the UDF `saveUrlRDD`, we have chosen to delete the folder containing part files to keep the disk clean. 
3. A `.csv` file is created. This file becomes the input for the Data Visualization notebook.

<br>

#### 3. Quality-check the results

In [16]:
check = pd.read_csv('/resources/USweather_'+str(start_date)+'-'+str(end_date)+'.csv')
print check.head(5)
print check[check['obs_name'] == 'Phoenix']

   Unnamed: 0       obs_name     obsDate  LATITUDE  LONGITUDE  MAX(temp)  \
0           0  Niagara Falls  2014-02-04     43.10     -78.94         -4   
1           1          Fargo  2014-02-20     46.92     -96.81          4   
2           2   Philadelphia  2014-02-15     39.86     -75.23          3   
3           3        Houlton  2014-02-26     46.12     -67.79         -7   
4           4         Boston  2014-02-09     42.36     -71.01         -1   

   MIN(temp)  AVG(temp)  
0        -11  -7.960000  
1        -11  -3.678571  
2          1   1.612245  
3        -24 -13.080000  
4         -6  -3.437500  

[5 rows x 8 columns]
      Unnamed: 0 obs_name     obsDate  LATITUDE  LONGITUDE  MAX(temp)  \
92            92  Phoenix  2014-01-03     33.42       -112         23   
296          296  Phoenix  2014-01-14     33.42       -112         23   
354          354  Phoenix  2014-01-30     33.42       -112         24   
381          381  Phoenix  2014-02-10     33.42       -112         26   


#### 4. Check to make sure the output .csv file exists

You should see the file under 'Recent Data' in the sidebar. Here is another way to confirm its existence:

In [17]:
#Search for text files that start with 'USweather_'
!ls /resources/USweather_*.csv

/resources/USweather_20140101-20140130.csv


<br>
<hr>

### Contact the Notebook Authors

1. **[Polong Lin](https://ca.linkedin.com/in/polonglin), Data Scientist, IBM.** polong[at]ca.ibm.com
1. **[Saeed Aghabozorgi](https://ca.linkedin.com/in/saeedaghabozorgi), Data Scientist, IBM.** saeed[at]ca.ibm.com  