# Average House Price

In [1]:
%%writefile AvgPrice.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;


public class AvgPrice 
{

    public static class Map extends Mapper<Object, Text, Text, DoubleWritable> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            try 
            { 
                if(value.toString().contains("Address")) // remove header
                    return;
                else 
                {
                    String[] data = value.toString().split(",");
                    String zipcode = data[3];
                    DoubleWritable price = new DoubleWritable(Double.parseDouble(data[4]));

                    context.write(new Text(zipcode), price);
                }
            } 
            catch (Exception e) 
            {
                e.printStackTrace();
            }
        }
    }

    public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
    {
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException 
        {
            double sum = 0;
            int num_of_prices = 0;

            for(DoubleWritable value : values)
            {
                sum += value.get();
                num_of_prices++;
            }

            context.write(key, new DoubleWritable((double) sum / num_of_prices));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("../data");
        Path output_dir = new Path("average_prices");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job AvgPrice_job = Job.getInstance(conf, "Average Price");
        AvgPrice_job.setJarByClass(AvgPrice.class);
        AvgPrice_job.setMapperClass(Map.class);
        AvgPrice_job.setCombinerClass(Reduce.class);
        AvgPrice_job.setReducerClass(Reduce.class);    
        AvgPrice_job.setMapOutputKeyClass(Text.class);
        AvgPrice_job.setMapOutputValueClass(DoubleWritable.class);
        AvgPrice_job.setOutputKeyClass(Text.class);
        AvgPrice_job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(AvgPrice_job, input_dir);
        FileOutputFormat.setOutputPath(AvgPrice_job, output_dir);
        AvgPrice_job.waitForCompletion(true);
    }
}

Writing AvgPrice.java


In [2]:
%env PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin:/home/ubuntu/bin:/home/ubuntu/bin:/home/ubuntu/miniconda3/bin:/home/ubuntu/miniconda3/condabin:/usr/lib/jvm/java-8-openjdk-amd64/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin
%env HADOOP_CLASSPATH=/usr/lib/jvm/java-8-openjdk-amd64/lib/tools.jar

env: PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin:/home/ubuntu/bin:/home/ubuntu/bin:/home/ubuntu/miniconda3/bin:/home/ubuntu/miniconda3/condabin:/usr/lib/jvm/java-8-openjdk-amd64/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin
env: HADOOP_CLASSPATH=/usr/lib/jvm/java-8-openjdk-amd64/lib/tools.jar


In [3]:
!echo $HADOOP_HOME

/home/ubuntu/hadoop-3.3.1


In [4]:
!$HADOOP_HOME/bin/hadoop com.sun.tools.javac.Main AvgPrice.java

In [5]:
!jar cf AvgPrice.jar AvgPrice*.class

In [10]:
!$HADOOP_HOME/bin/hadoop jar AvgPrice.jar AvgPrice

head: cannot open '10' for reading: No such file or directory
2022-03-21 10:40:34,591 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-03-21 10:40:34,673 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-03-21 10:40:34,673 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2022-03-21 10:40:34,733 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2022-03-21 10:40:34,789 INFO input.FileInputFormat: Total input files to process : 1
2022-03-21 10:40:34,814 INFO mapreduce.JobSubmitter: number of splits:1
2022-03-21 10:40:34,943 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1481753162_0001
2022-03-21 10:40:34,943 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-03-21 10:40:35,054 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2022-03-21 10:4

In [11]:
!$HADOOP_HOME/bin/hadoop fs -cat average_prices/part-r-00000

19044	30580.25
21076	27875.0
28202	44750.0
60173	43300.0
90670	56277.5
