In [31]:
import os
import sys
from time import time

# Get enviroment variables set from utils/setup.sh
home_dir = os.environ['HOME']
root_dir = os.environ['BD_GitRoot']

# Add utils to the python system path
sys.path.append(root_dir + '/utils')
# Read AWS credentials from 'EC2_VAULT'/Creds.pkl 
from read_mrjob_creds import *
(key_id, secret_key, s3_bucket, username) = read_credentials()
print s3_bucket,key_id,username

from find_waiting_flow import *


s3://yoavfreunddefault/ AKIAIH6ZWS6WZ7FQMZ7A Yoav_Freund


## Data Filter

### Mapper

Filter out data with "TMIN" and "TMAX".

### Reducer

Filter out data with no more than 50 missing entries in a single year. Output them ordered by year.

In [42]:
%%writefile DataFilter.py

import numpy as np

from mrjob.job import MRJob
from math import isnan
from sys import stderr

# logfile=open('log','w')
logfile=stderr
THRESHOLD = 50
nan = float('nan')

class DataFilter(MRJob):

    convert = lambda self, d: nan if d == '' else float(d)

    def mapper(self, _, line):
        station,measurement,year,data = line.strip().split(',', 3)
        if measurement in ['TMAX', 'TMIN']:
            yield (station.strip() + ' ' + year.strip(), line)

    def reducer(self, identity, lines):
        raw = list(lines)
        if len(raw) != 2:
            return
        station,measurement1,year,data1 = raw[0].strip().split(',', 3)
        station,measurement2,year,data2 = raw[1].strip().split(',', 3)

        tmax = None
        tmin = None
        if measurement1 == 'TMAX':
            tmax = [self.convert(d) for d in data1.split(',')]
        if measurement2 == 'TMAX':
            tmax = [self.convert(d) for d in data2.split(',')]
        if measurement1 == 'TMIN':
            tmin = [self.convert(d) for d in data1.split(',')]
        if measurement2 == 'TMIN':
            tmin = [self.convert(d) for d in data2.split(',')]

        if tmax and tmin:
            missing = np.sum([1 for a, b in zip(tmin, tmax) if isnan(a + b)])
            if missing <= THRESHOLD:
                yield (identity, (tmin, tmax))

if __name__ == '__main__':
    DataFilter.run()
    
    

Overwriting DataFilter.py


## Test data filter locally

In [36]:
small_file = '/Users/yoavfreund/BigData/UCSD_BigData/data/weather/F1000.csv'
output_file = 'f_output'
!python DataFilter.py --runner=local $small_file > $output_file

using configs in /Users/yoavfreund/.mrjob.conf
creating tmp directory /var/folders/80/c2kfvdvx5cx570r4vlzqgb840000gq/T/DataFilter.yoavfreund.20150530.152008.739949
writing wrapper script to /var/folders/80/c2kfvdvx5cx570r4vlzqgb840000gq/T/DataFilter.yoavfreund.20150530.152008.739949/setup-wrapper.sh
writing to /var/folders/80/c2kfvdvx5cx570r4vlzqgb840000gq/T/DataFilter.yoavfreund.20150530.152008.739949/step-0-mapper_part-00000
> sh -ex setup-wrapper.sh /Users/yoavfreund/anaconda/bin/python DataFilter.py --step-num=0 --mapper /var/folders/80/c2kfvdvx5cx570r4vlzqgb840000gq/T/DataFilter.yoavfreund.20150530.152008.739949/input_part-00000 > /var/folders/80/c2kfvdvx5cx570r4vlzqgb840000gq/T/DataFilter.yoavfreund.20150530.152008.739949/step-0-mapper_part-00000
writing to /var/folders/80/c2kfvdvx5cx570r4vlzqgb840000gq/T/DataFilter.yoavfreund.20150530.152008.739949/step-0-mapper_part-00001
> sh -ex setup-wrapper.sh /Users/yoavfreund/anaconda/bin/python DataFilter.py --step-num=0 --mapper /var/fo

In [37]:
!wc $output_file
!head -1 $output_file

      79   57828  389634 f_output
"MX000008172 1929"	[[-65.0, -65.0, -73.0, -65.0, -114.0, -106.0, -440.0, -13.0, -73.0, -53.0, -103.0, -64.0, -33.0, -56.0, -45.0, -65.0, -46.0, -76.0, -43.0, -37.0, -14.0, -47.0, -27.0, -43.0, -43.0, -95.0, -92.0, -82.0, -73.0, -63.0, -63.0, -73.0, -76.0, -63.0, -94.0, -83.0, -36.0, -83.0, -76.0, -66.0, -85.0, -93.0, -97.0, 30.0, -65.0, -63.0, -75.0, -63.0, -65.0, -35.0, -2.0, -95.0, -66.0, -69.0, -23.0, -33.0, -66.0, -65.0, -93.0, -93.0, -75.0, -77.0, -96.0, -83.0, -55.0, -63.0, -45.0, -44.0, -34.0, -35.0, -43.0, -63.0, -63.0, -115.0, -14.0, -18.0, -94.0, -28.0, -28.0, -28.0, -63.0, -43.0, -23.0, -13.0, -53.0, -17.0, -13.0, -5.0, 8.0, 8.0, -5.0, 28.0, -13.0, 13.0, 13.0, 53.0, 63.0, 67.0, 3.0, -63.0, -53.0, -53.0, -15.0, 96.0, -53.0, -16.0, -47.0, -13.0, -43.0, -43.0, -33.0, -13.0, -33.0, -24.0, -23.0, 3.0, -13.0, 73.0, 63.0, -3.0, -33.0, -16.0, -16.0, -13.0, -23.0, -23.0, -13.0, 93.0, 83.0, 84.0, 47.0, 3.0, -23.0, 3.0, 33.0, 47.0, -16.0, -26.0, 43.0, 

## Run data filter on EMR

In [38]:
from find_waiting_flow import *
flows_dict = find_waiting_flow(key_id,secret_key)
flow_id, node = (flows_dict[0]['flow_id'],flows_dict[0]['node'])
print flow_id, node 

0 j-2V7DNF88G388D ec2-54-81-125-178.compute-1.amazonaws.com WAITING
1 j-2YTN78MP8WQG9 ec2-54-161-53-198.compute-1.amazonaws.com WAITING
2 j-2HYK32Q58967A ec2-54-81-64-157.compute-1.amazonaws.com WAITING
3 j-11HA0APWXXDQN ec2-54-81-217-77.compute-1.amazonaws.com WAITING
4 j-34852Y03UYJ9J ec2-54-224-0-158.compute-1.amazonaws.com WAITING
5 j-3R0H4CX9PUGQZ ec2-54-162-188-199.compute-1.amazonaws.com WAITING
6 j-1FVV0CQW6H8NB ec2-54-144-57-57.compute-1.amazonaws.com WAITING
7 j-23GGEIWFSWTLY ec2-54-162-37-207.compute-1.amazonaws.com WAITING
j-2V7DNF88G388D ec2-54-81-125-178.compute-1.amazonaws.com


In [None]:
input_file = 'hdfs://'+node+':9000/weather.raw_data/ALL.csv'
#input_file = small_file
print 'input file=',input_file
s3addr = 's3://yoavfreunddefault/homework4.1/'
!s3cmd del --recursive $s3addr

!python DataFilter.py -r emr --emr-job-flow-id=$flow_id --output-dir=$s3addr --no-output $input_file



input file= hdfs://ec2-54-81-125-178.compute-1.amazonaws.com:9000/weather.raw_data/ALL.csv
using configs in /Users/yoavfreund/.mrjob.conf
creating tmp directory /var/folders/80/c2kfvdvx5cx570r4vlzqgb840000gq/T/DataFilter.yoavfreund.20150530.153031.164000
Copying non-input files into s3://yoavfreunddefault/tmp/DataFilter.yoavfreund.20150530.153031.164000/files/
Adding our job to existing job flow j-2V7DNF88G388D
Job launched 32.7s ago, status RUNNING: Running step
Job launched 65.4s ago, status RUNNING: Running step


In [53]:
!s3cmd ls $s3addr
!s3cmd del --recursive s3://yoavfreunddefault/homework4/


2015-05-30 14:21         0   s3://yoavfreunddefault/homework4/_SUCCESS
2015-05-30 14:20 418711380   s3://yoavfreunddefault/homework4/part-00000
2015-05-30 14:20 419792556   s3://yoavfreunddefault/homework4/part-00001
2015-05-30 14:20 419468737   s3://yoavfreunddefault/homework4/part-00002
2015-05-30 14:20 419514240   s3://yoavfreunddefault/homework4/part-00003
2015-05-30 14:20 419163289   s3://yoavfreunddefault/homework4/part-00004
2015-05-30 14:20 419773634   s3://yoavfreunddefault/homework4/part-00005
2015-05-30 14:20 419667917   s3://yoavfreunddefault/homework4/part-00006
2015-05-30 14:20 418651118   s3://yoavfreunddefault/homework4/part-00007
2015-05-30 14:20 420300496   s3://yoavfreunddefault/homework4/part-00008
ERROR: S3 error: Unknown error


In [52]:
!wc output_file2

       0       0       0 output_file2


In [21]:
!s3cmd ls $s3addr/*

2015-05-30 14:21         0   s3://yoavfreunddefault/homework4/_SUCCESS
2015-05-30 14:20 418711380   s3://yoavfreunddefault/homework4/part-00000
2015-05-30 14:20 419792556   s3://yoavfreunddefault/homework4/part-00001
2015-05-30 14:20 419468737   s3://yoavfreunddefault/homework4/part-00002
2015-05-30 14:20 419514240   s3://yoavfreunddefault/homework4/part-00003
2015-05-30 14:20 419163289   s3://yoavfreunddefault/homework4/part-00004
2015-05-30 14:20 419773634   s3://yoavfreunddefault/homework4/part-00005
2015-05-30 14:20 419667917   s3://yoavfreunddefault/homework4/part-00006
2015-05-30 14:20 418651118   s3://yoavfreunddefault/homework4/part-00007
2015-05-30 14:20 420300496   s3://yoavfreunddefault/homework4/part-00008
