# NOTE
# This notebook server was started with command below, in which case "sc" is a spark context 

*IPYTHON_OPTS="notebook --certfile=~/cert/mycert.pem --keyfile ~/cert/mykey.key" $SPARK_HOME/bin/pyspark --master spark://spark1:7077 --jars $SPARK_HOME/jars/elasticsearch-hadoop-2.2.0.jar*


In [8]:
import os
os.chdir(os.path.expanduser('~'))

In [1]:
#lat/lon grid class
import sys
sys.path.append('../Infrastructure_Capstone')
import os
import math
from shapely.geometry import Polygon
from pyproj import Proj
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch.client import indices
#from dataStorage import upload_to_Elasticsearch
import ConfigParser
from pprint import pprint
from copy import deepcopy
import datetime as dt
from dateutil.parser import parse
import json

#read in the config file
#os.chdir('~/Infrastructure_Capstone')
config = ConfigParser.ConfigParser()
config.read('../Infrastructure_Capstone/config/capstone_config.ini')

ES_url = config.get('ElasticSearch','host')
ES_password = config.get('ElasticSearch','password')
ES_username= config.get('ElasticSearch','username')

print ES_url

169.53.138.84


In [2]:
central_park_wban = "94728"

#functions to add weather data to grid based on closest weather station. If closest station does not have a reading 
#for that time period, use Central Park station
def get_weather_reading(wban,time):
    #searches ES for weather readings from station id wban, closest to the given time
    #find weather observations for the nearest weather station and time
    es_url = 'http://%s:%s@%s:9200' % (ES_username,ES_password,ES_url)
    es = Elasticsearch(es_url)
    

    query = '''{
                "query": {
                    "bool": {
                        "must" : { "term": { "weather_WBAN" : "%s"} },
                        "must" : {
                            "range" : {
                                "weather_DateTime" : {
                                        "gte": "%s",
                                        "lt": "%s",
                                        "format": "MM/dd/yyyy HH:mm"
                                }
                            }
                        } 
                    }
                }
            }''' % (wban,dt.datetime.strftime(time + dt.timedelta(seconds=-3600),'%m/%d/%Y %H:%M'),dt.datetime.strftime(time + dt.timedelta(seconds=3600),'%m/%d/%Y %H:%M'))

    
    observations = list(helpers.scan(es,query=query,index='weather',doc_type='hourly_obs')) #get the first observation returned
    if len(observations) > 0:
        min_diff = float('inf')
        best_obs = None
        #iterate through the returned observations, add the closest observation in time
        for obs in observations:
            obs_time = parse(obs['_source']['weather_DateTime']).replace(tzinfo=None)
            if abs((obs_time - time).total_seconds()) < min_diff:
                best_obs = obs['_source']
                min_diff = abs((obs_time - time).total_seconds())
        return best_obs
    else:
        return None

def add_weather(row):
    key,val = row

    es_url = 'http://%s:%s@%s:9200' % (ES_username,ES_password,ES_url)
    es = Elasticsearch(es_url)
    
    proj = Proj(init='epsg:2263')
    updates = []
    idx=0
    update = deepcopy(val)
    date_hour = parse(val['grid_fullDate']).replace(tzinfo=None)
    
    query = '{ \
              "query": { \
                "bool": { \
                  "must": { \
                    "wildcard": { "ZCTA5CE10" : "%s*" } \
                  } \
                } \
              } \
            }' % val['grid_zipcode']

    #find the largest zip code area to represent the grid area
    max_area = 0
    wban = None
    #query the zip codes, finding all zip shapes that contain the current colision
    for shape in helpers.scan(es,query=query,index='nyc_zip_codes',doc_type='zip_codes'):
        coords = [proj(lng,lat) for lng,lat in shape['_source']['coords']['coordinates'][0]]
        poly = Polygon(coords)
        if poly.area > max_area:
            #get the largest zip code by geographic area
            max_area = poly.area
            wban = shape['_source']['closest_weather_stations']

    #find weather observations for the nearest weather station and time
    observation = get_weather_reading(wban,date_hour)
    if observation:
        #numerical fields, change 99999 values to NA
        update['weather_WetBulbFarenheit'] = observation['weather_WetBulbFarenheit'] 
        update['weather_WindSpeed'] = observation['weather_WindSpeed']
        update['weather_Visibility'] = observation['weather_Visibility']
        update['weather_HourlyPrecip'] = observation['weather_HourlyPrecip']           

        #string fields

        update['weather_SkyCondition'] = observation['weather_SkyCondition'] #Adjust this for just condition?
        update['weather_WeatherType'] = observation['weather_WeatherType']

        weather_list = observation['weather_WeatherType'].split(' ') #space delimited list
        #Types of rain
        update['weather_Rain'] = 0 #no rain
        if '-RA' in weather_list or '-DZ' in weather_list or '-SH' in weather_list or '-FZRA' in weather_list: 
            update['weather_Rain'] = 1 #light rain
        if 'RA' in weather_list or 'DZ' in weather_list or 'SH' in weather_list or 'FZRA' in weather_list: 
            update['weather_Rain'] = 2 #moderate rain
        if '+RA' in weather_list or '+DZ' in weather_list or '+SH' in weather_list or '+FZRA' in weather_list: 
            update['weather_Rain'] = 3 #heavy rain


        #Types of snow/hail/ice
        update['weather_SnowHailIce'] = 0 #none
        if '-SN' in weather_list or '+SG' in weather_list or '-GS' in weather_list or '-GR' in weather_list or '-PL' in weather_list or '-IC' in weather_list:
            update['weather_SnowHailIce'] = 1 #light
        if 'SN' in weather_list or '+SG' in weather_list or 'GS' in weather_list or 'GR' in weather_list or 'PL' in weather_list or 'IC' in weather_list:
            update['weather_SnowHailIce'] = 2 #moderate 
        if '+SN' in weather_list or '+SG' in weather_list or '+GS' in weather_list or '+GR' in weather_list or '+PL' in weather_list or '+IC' in weather_list:
            update['weather_SnowHailIce'] = 3 #heavy             


        #Types of fog/mist
        update['weather_Fog'] = 0 #none
        if '-FG' in weather_list or '-BR' in weather_list or '-HZ' in weather_list:
            update['weather_Fog'] = 1 #light
        if 'FG' in weather_list or 'BR' in weather_list or 'HZ' in weather_list:
            update['weather_Fog'] = 2 #moderate 
        if '+FG' in weather_list or '+BR' in weather_list or '+HZ' in weather_list or 'FG+' in weather_list:
            update['weather_Fog'] = 3 #heavy 

    else:
        #find weather observations for central park station and time
        observation = get_weather_reading(central_park_wban,date_hour)
        if observation:
            #numerical fields, change 99999 values to NA
            update['weather_WetBulbFarenheit'] = observation['weather_WetBulbFarenheit']
            update['weather_WindSpeed'] = observation['weather_WindSpeed']
            update['weather_Visibility'] = observation['weather_Visibility']
            update['weather_HourlyPrecip'] = observation['weather_HourlyPrecip']            

            #string fields

            update['weather_SkyCondition'] = observation['weather_SkyCondition'] #Adjust this for just condition?
            update['weather_WeatherType'] = observation['weather_WeatherType']

            weather_list = observation['weather_WeatherType'].split(' ') #space delimited list
            #Types of rain
            update['weather_Rain'] = 0 #no rain
            if '-RA' in weather_list or '-DZ' in weather_list or '-SH' in weather_list or '-FZRA' in weather_list: 
                update['weather_Rain'] = 1 #light rain
            if 'RA' in weather_list or 'DZ' in weather_list or 'SH' in weather_list or 'FZRA' in weather_list: 
                update['weather_Rain'] = 2 #moderate rain
            if '+RA' in weather_list or '+DZ' in weather_list or '+SH' in weather_list or '+FZRA' in weather_list: 
                update['weather_Rain'] = 3 #heavy rain
            

            #Types of snow/hail/ice
            update['weather_SnowHailIce'] = 0 #none
            if '-SN' in weather_list or '+SG' in weather_list or '-GS' in weather_list or '-GR' in weather_list or '-PL' in weather_list or '-IC' in weather_list:
                update['weather_SnowHailIce'] = 1 #light
            if 'SN' in weather_list or '+SG' in weather_list or 'GS' in weather_list or 'GR' in weather_list or 'PL' in weather_list or 'IC' in weather_list:
                update['weather_SnowHailIce'] = 2 #moderate 
            if '+SN' in weather_list or '+SG' in weather_list or '+GS' in weather_list or '+GR' in weather_list or '+PL' in weather_list or '+IC' in weather_list:
                update['weather_SnowHailIce'] = 3 #heavy             
            

            #Types of fog/mist
            update['weather_Fog'] = 0 #none
            if '-FG' in weather_list or '-BR' in weather_list or '-HZ' in weather_list:
                update['weather_Fog'] = 1 #light
            if 'FG' in weather_list or 'BR' in weather_list or 'HZ' in weather_list:
                update['weather_Fog'] = 2 #moderate 
            if '+FG' in weather_list or '+BR' in weather_list or '+HZ' in weather_list or 'FG+' in weather_list:
                update['weather_Fog'] = 3 #heavy 
            
            
            
        #otherwise, no weather data, fill values with NA        
        else:
            #numerical fields, change 99999 values to NA
            update['weather_WetBulbFarenheit'] = 99999
            update['weather_WindSpeed'] = 99999
            update['weather_Visibility'] = 99999
            update['weather_HourlyPrecip'] = 99999            

            #string fields
            update['weather_SkyCondition'] = 'NA'   
            update['weather_WeatherType'] = 'NA'
            update['weather_Rain'] = 99999
            update['weather_SnowHailIce'] = 99999
            update['weather_Fog'] = 99999

    return (key,update)

In [3]:
#print sc to see what it is
print sc

<pyspark.context.SparkContext object at 0x7faf140beb50>


In [4]:
es_write_conf = {
        "es.nodes" : ES_url,
        "es.port" : "9200",
        "es.net.http.auth.user" : ES_username, 
        "es.net.http.auth.pass" : ES_password,
        "es.resource" : "dataframe_plus_weather/rows",
        "es.mapping.id" : "grid_id"
    } 

In [5]:
grid_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ "es.resource" : "dataframe/rows", "es.nodes" : ES_url, 
          "es.net.http.auth.user" : ES_username, 
          "es.net.http.auth.pass" : ES_password })

grid_rdd.first()

(u'2013-02-06T13:00:00_10306',
 {u'grid_day': 6,
  u'grid_dayOfWeek': 3,
  u'grid_fullDate': u'2013-02-06T12:00:00-06:00',
  u'grid_hourOfDay': 13,
  u'grid_id': u'2013-02-06T13:00:00_10306',
  u'grid_isAccident': 0,
  u'grid_month': 2,
  u'grid_zipcode': 10306})

In [6]:
updated_rdd = grid_rdd.map(add_weather)

In [7]:
updated_rdd.saveAsNewAPIHadoopFile(
            path='-', 
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
            keyClass="org.apache.hadoop.io.NullWritable", 
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
            conf=es_write_conf)
    