#Situation
Data growing faster than your desktop computer can handle.

Know of Spark. Think that adding nodes is an easier way to handle data growth than to buy a faster computer.

Want to test Spark to show your boss why it'll work for your company.

#Local machine
Find a blog that helps you to get started on your local machine.

Test out data frames to see how the language works.

[Blog Link](http://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/)

###RDD
Definition: 
- A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
- Human terms:
    - An RDD is lazy evaluation that allows Spark to determine the best way to complete the request.
    - Spark executes the set of instructions on the RDD when an action is called.


Action
- Example actions:
    - count()
    - collect()
    - take()
    - first()


- [link to Docs on RDD](https://spark.apache.org/docs/1.5.0/programming-guide.html#rdd-operations)

In [4]:
nodes = 1

# partitions - the number of "groupings" Spark cuts the dataset into
# Recommended - 2-4 partitions per node
partitions = nodes * 4
training = sc.textFile('inmar/weather/2007_small.csv', partitions)
training = training.map(lambda x: x.split(','))
training

PythonRDD[8] at RDD at PythonRDD.scala:43

In [3]:
training.take(3)

[[u'CA002303986', u'20070101', u'TMAX', u'-130', u'', u'', u'G', u''],
 [u'CA002303986', u'20070101', u'TMIN', u'-220', u'', u'', u'G', u''],
 [u'CA002303986', u'20070101', u'PRCP', u'0', u'T', u'', u'G', u'']]

###Creating a dataframe

In [17]:
# sc is an existing SparkContext.
from pyspark.sql import Row

###Read in the data

In [18]:
# Load a text file and convert each line to a Row.
rows = sc.textFile('inmar/weather/2007_small.csv')
parts = rows.map(lambda l: l.split(","))

###Set the Schema

In [19]:
# Set the schema for the data
schema_weather = parts.map(lambda p: Row(station=str(p[0]), date=str(p[1]), element=str(p[2]), 
                                         value=float(p[3]), measurement=str(p[4]), quality=str(p[5]), 
                                         source= str(p[5]), hour = str(p[7])))

###Create the dataframe

In [20]:
# Infer the schema, and register the DataFrame as a table.
weather = sqlContext.createDataFrame(schema_weather)

###Take a look at the first ten rows

In [21]:
weather.show(10)

+--------+-------+-----+-----------+-------+------+-----------+------+
|    date|element| hour|measurement|quality|source|    station| value|
+--------+-------+-----+-----------+-------+------+-----------+------+
|20070101|   TMAX|     |           |       |      |CA002303986|-130.0|
|20070101|   TMIN|     |           |       |      |CA002303986|-220.0|
|20070101|   PRCP|     |          T|       |      |CA002303986|   0.0|
|20070101|   PRCP|     |           |       |      |ASN00037003|   0.0|
|20070101|   TMAX|     |           |       |      |NOE00133566|  66.0|
|20070101|   TMIN|     |           |       |      |NOE00133566|  34.0|
|20070101|   PRCP|     |           |       |      |NOE00133566| 151.0|
|20070101|   TMAX|800.0|           |       |      |USC00242347|  89.0|
|20070101|   TMIN|800.0|           |       |      |USC00242347| -39.0|
|20070101|   PRCP|800.0|          P|       |      |USC00242347|   0.0|
+--------+-------+-----+-----------+-------+------+-----------+------+
only s

###How many rows are there in the df

In [22]:
weather.count()

49999

###Unique values in the Elements column

In [23]:
weather[['element']].distinct().show()

+-------+
|element|
+-------+
|   TMAX|
|   TMIN|
|   PRCP|
+-------+



###Max temp look like

In [24]:
t_max = weather[weather['element'] == 'TMAX']
t_max[['value']].describe().show()

+-------+------------------+
|summary|             value|
+-------+------------------+
|  count|             13482|
|   mean| 73.83859961430055|
| stddev|118.50547311680688|
|    min|            -994.0|
|    max|            1283.0|
+-------+------------------+



###Filtering - TMAX and Station: USC00114078

In [29]:
one_station_tmax = weather[(weather['element'] == 'TMAX') & (weather['station'] == 'USC00114078')]
one_station_tmax.show()

+--------+-------+------+-----------+-------+------+-----------+-----+
|    date|element|  hour|measurement|quality|source|    station|value|
+--------+-------+------+-----------+-------+------+-----------+-----+
|20070101|   TMAX|1800.0|           |       |      |USC00114078|128.0|
|20070102|   TMAX|1800.0|           |       |      |USC00114078| 83.0|
+--------+-------+------+-----------+-------+------+-----------+-----+



###Read that 1 unit TMAX is 0.1 degrees celcius

In [26]:
from pyspark.sql import functions as Func
one_station_tmax = one_station_tmax.withColumn('reset_value', Func.lit(0.1))
one_station_tmax = one_station_tmax.withColumn('tmax_celc', one_station_tmax.reset_value * one_station_tmax.value)

In [27]:
one_station_tmax.show(5)

+--------+-------+------+-----------+-------+------+-----------+-----+-----------+---------+
|    date|element|  hour|measurement|quality|source|    station|value|reset_value|tmax_celc|
+--------+-------+------+-----------+-------+------+-----------+-----+-----------+---------+
|20070101|   TMAX|1800.0|           |       |      |USC00114078|128.0|        0.1|     12.8|
|20070102|   TMAX|1800.0|           |       |      |USC00114078| 83.0|        0.1|      8.3|
+--------+-------+------+-----------+-------+------+-----------+-----+-----------+---------+



###Look at min, max, avg by day

In [5]:
one_station_tmax

NameError: name 'one_station_tmax' is not defined

In [28]:
one_station_tmax.groupby('date').agg(Func.max('value'),Func.min('value'), Func.mean('value')).show()

+--------+----------+----------+----------+
|    date|max(value)|min(value)|avg(value)|
+--------+----------+----------+----------+
|20070101|     128.0|     128.0|     128.0|
|20070102|      83.0|      83.0|      83.0|
+--------+----------+----------+----------+



#Google Cloud
Find that Google Cloud offers the first two months free.  So you can do something real with MLlib.

- bdutil
    - https://cloud.google.com/hadoop/setting-up-a-hadoop-cluster

###Spin up 4 worker nodes

###Set up ports to allow Spark UI
- Viewed through port 4040
- Create tag for ports
    - gcloud compute firewall-rules create default-allow-8080 --description="Incoming http 8080 allowed." —allow="tcp:4040" --allow="tcp:8080" --allow="tcp:8081" --target-tags="http-8080-server" 
- Need to open ports  
    - gcloud compute instances add-tags hadoop-m --zone us-central1-a --tags http-8080-server 

###Run logistic regression