# PART 3 - Batch Analysis of Historical NEXRAD Data with Hadoop MapReduce
credits: [Purdue University](github.com/stephenlienharrell/WeatherPipe)

Radar data requires a lot of storage and computation when processed. This tools helps simplify the process of starting up a EMR cluster and runing a Hadoop MapReduce job of your choice.

The [NEXRAD data](https://aws.amazon.com/noaa-big-data/nexrad/) is a public dataset hosted on amazon. It contains historical records of 160 NEXRAD radars across the US from 1990 to today.

![](image/hadoop.jpg)

### Hadoop MapReduce Weather Pipeline:
1. Query desired time period and radar station
1. Compiling MapReduce Jar
2. Searching the archive NEXRAD data on S3
3. Configure and start up EMR cluster
3. Running the MapReduce Job
4. Mapping function provided by user
5. Reducing function provided by user
6. Writing Output NetCDF files back to S3 for visualization

### Download WeatherPipe tool

In [21]:
! wget https://github.com/stephenlienharrell/WeatherPipe/archive/v0.2.tar.gz

--2016-03-05 12:34:12--  https://github.com/stephenlienharrell/WeatherPipe/archive/v0.2.tar.gz
Resolving github.com... 192.30.252.128
Connecting to github.com|192.30.252.128|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://codeload.github.com/stephenlienharrell/WeatherPipe/tar.gz/v0.2 [following]
--2016-03-05 12:34:12--  https://codeload.github.com/stephenlienharrell/WeatherPipe/tar.gz/v0.2
Resolving codeload.github.com... 192.30.252.162
Connecting to codeload.github.com|192.30.252.162|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 25779 (25K) [application/x-gzip]
Saving to: 'v0.2.tar.gz'


2016-03-05 12:34:12 (243 KB/s) - 'v0.2.tar.gz' saved [25779/25779]



### Extract file

In [22]:
!tar -xvzf v0.2.tar.gz

x WeatherPipe-0.2/
x WeatherPipe-0.2/.gitmodules
x WeatherPipe-0.2/LICENSE
x WeatherPipe-0.2/README.md
x WeatherPipe-0.2/WeatherPipe/
x WeatherPipe-0.2/WeatherPipe/.gitignore
x WeatherPipe-0.2/WeatherPipe/build.gradle
x WeatherPipe-0.2/WeatherPipe/src/
x WeatherPipe-0.2/WeatherPipe/src/main/
x WeatherPipe-0.2/WeatherPipe/src/main/dist/
x WeatherPipe-0.2/WeatherPipe/src/main/dist/ExampleAnalyses/
x WeatherPipe-0.2/WeatherPipe/src/main/dist/ExampleAnalyses/ResearcherMapReduceAnalysisArrayAverageExample.java
x WeatherPipe-0.2/WeatherPipe/src/main/dist/ExampleAnalyses/ResearcherMapReduceAnalysisEmpty.java
x WeatherPipe-0.2/WeatherPipe/src/main/dist/ExampleAnalyses/ResearcherMapReduceAnalysisReflectivityAverageWithNetcdfOutput.java
x WeatherPipe-0.2/WeatherPipe/src/main/dist/ExampleAnalyses/ResearcherMapReduceAnalysisScalarAverageExample.java
x WeatherPipe-0.2/WeatherPipe/src/main/dist/WeatherPipe.ini
x WeatherPipe-0.2/WeatherPipe/src/main/dist/WeatherPipeMapReduce/
x Weath

### Set up a Working Environment

In [31]:
! rm v0.2.tar.gz
# run: gradle build in WeatherPipe-0.2/WeatherPipe/
! mkdir WeatherPipe-0.2/WeatherPipe/working
! cp WeatherPipe-0.2/WeatherPipe/build/distributions/WeatherPipe.tar WeatherPipe-0.2/WeatherPipe/working

### Extract Required .jar Files

In [33]:
! tar xfv WeatherPipe-0.2/WeatherPipe/working/WeatherPipe.tar

x WeatherPipe/
x WeatherPipe/ExampleAnalyses/
x WeatherPipe/ExampleAnalyses/ResearcherMapReduceAnalysisArrayAverageExample.java
x WeatherPipe/ExampleAnalyses/ResearcherMapReduceAnalysisEmpty.java
x WeatherPipe/ExampleAnalyses/ResearcherMapReduceAnalysisReflectivityAverageWithNetcdfOutput.java
x WeatherPipe/ExampleAnalyses/ResearcherMapReduceAnalysisScalarAverageExample.java
x WeatherPipe/lib/
x WeatherPipe/lib/log4j.properties
x WeatherPipe/WeatherPipe.ini
x WeatherPipe/WeatherPipeMapReduce/
x WeatherPipe/WeatherPipeMapReduce/build.gradle
x WeatherPipe/WeatherPipeMapReduce/src/
x WeatherPipe/WeatherPipeMapReduce/src/main/
x WeatherPipe/WeatherPipeMapReduce/src/main/java/
x WeatherPipe/WeatherPipeMapReduce/src/main/java/edu/
x WeatherPipe/WeatherPipeMapReduce/src/main/java/edu/purdue/
x WeatherPipe/WeatherPipeMapReduce/src/main/java/edu/purdue/eaps/
x WeatherPipe/WeatherPipeMapReduce/src/main/java/edu/purdue/eaps/weatherpipe/
x WeatherPipe/WeatherPipeMapReduce/src/main/java/edu/purdue/e

## User Input: construct MapReduce Analysis of your choice
### Mapper: Extract Reflectivity Variable

In [36]:
! cat JavaMapReduce/Mapper.java

protected double[] mapAnalyze(NetcdfFile nexradNetCDF) {
		int i;
		Array dataArray;
		double[] retArray = null;
		byte[] bytes;
		
		// SHAPE is scanR=7, radialR=360, gateR=1336


		Variable reflectivity = nexradNetCDF.findVariable("Reflectivity");
		if(reflectivity == null) return null;
		System.out.println("reflectivity shape - " + Arrays.toString(reflectivity.getShape()));
		
		try {
			dataArray = reflectivity.read();
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
		
		bytes = (byte[]) dataArray.copyTo1DJavaArray();
		
		retArray = new double[bytes.length];
		
		for(i = 0; i < bytes.length; i++) {
			retArray[i] = (double) bytes[i];
		}
				
		return retArray;
	
	}

	double[] runningSumsArray = null;
	int numberOfDataPoints = 0;

### Reducer: Reflectivity Average

In [37]:
# Compute average
! cat JavaMapReduce/Reducer.java

protected double[] reduceAnalyze(double[] input) {
		if(input == null) return null;
		if(runningSumsArray != null && input.length != runningSumsArray.length) return null;
		
		if(runningSumsArray == null) runningSumsArray = new double[input.length];
		double[] averageArray = new double[runningSumsArray.length];
		
		numberOfDataPoints++;

		for(int i = 0; i < input.length; i++) {
			runningSumsArray[i] += input[i];
			averageArray[i] = runningSumsArray[i]/numberOfDataPoints;
		}
		
		
		return averageArray;
	}

### Add Costumized Functions to WeatherPipe

In [34]:
! cp WeatherPipe/ExampleAnalyses/ResearcherMapReduceAnalysisReflectivityAverageWithNetcdfOutput.java WeatherPipe/WeatherPipeMapReduce/src/main/java/edu/purdue/eaps/weatherpipe/weatherpipemapreduce/ResearcherMapReduceAnalysis.java

## Query and Run MapReduce Job in One Line
### Analysis and EMR Flags
- s,--start_time - Start search boundary for NEXRAD data search. Date Format is dd/MM/yyyy HH:mm:ss
- e,--end_time - End search boundary for NEXRAD data search. Date Format is dd/MM/yyyy HH:mm:ss
- st,--station - Radar station abbreviation ex. "KIND"
- b,--bucket_name - Bucket name in S3 to place input and output data. Will be auto-generated if not given
- i,--instance_count - The amount of instances to run the analysis on. Default is 1.
- t,--instance_type - Instance type for EMR job. Default is c3.xlarge.


In [35]:
! WeatherPipe/bin/WeatherPipe -s "01/01/2010 12:30:00" -e "01/01/2010 23:00:00" -st KIND -b data-eng-project -i 1 -t c3.2xlarge

Attempting to build Map Reduce with
gradle: /usr/local/bin/gradle
build directory: /Users/marvinbertin/Github/Weather-Data-Eng-Pipeline/WeatherPipe/WeatherPipeMapReduce

Download https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.4/log4j-core-2.4.pom
Download https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j/2.4/log4j-2.4.pom
Download https://artifacts.unidata.ucar.edu/content/repositories/unidata-releases/edu/ucar/netcdf4/4.6.3/netcdf4-4.6.3.pom
Download https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/2.4.0/hadoop-mapreduce-client-core-2.4.0.pom
Download https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client/2.4.0/hadoop-mapreduce-client-2.4.0.pom
Download https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-api/2.4/log4j-api-2.4.pom
Download https://artifacts.unidata.ucar.edu/content/repositories/unidata-releases/edu/ucar/cdm/4.6.3/cdm-4.6.3.pom
Download https://repo1.maven.org/maven2/org/apache/ha

### Output directory

In [47]:
! ls WeatherPipeJob2016-03-05T20.46.157/

2016-03-05T20.46.157_raw_map_reduce_output
[34mjob_setup[m[m
jsonOutputFile.json
[34mlogs[m[m
netcdfOut


### Average Reflectivity

In [49]:
! cat WeatherPipeJob2016-03-05T20.46.157/jsonOutputFile.json

{"average": [9.32856374]}

### Check out your S3 Bucket for complete outputs