Create a local file 

In [1]:
%%file inflation.txt
Downtown 2.1
Hilltop 4.5

Overwriting inflation.txt


In [2]:
!cat inflation.txt

Downtown 2.1
Hilltop 4.5

Copy the file to the distributed file system HDFS

In [3]:
!hdfs dfs -copyFromLocal inflation.txt /data/

16/08/03 11:01:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load house prices

In [4]:
text_RDD = sc.textFile("/data/houses.txt")

In [5]:
def mapper_parse_lines(line):
    """Parse line into (neighborhoood, price) pair"""
    words = line.split()
    return (words[1], float(words[2]))

In [6]:
house_prices_RDD = text_RDD.map(mapper_parse_lines)

In [7]:
house_prices_RDD.collect()

[(u'Downtown', 400000.0), (u'Downtown', 240000.0), (u'Hilltop', 650000.0)]

## Load inflation

In [8]:
inflation_text_RDD = sc.textFile("/data/inflation.txt")

In [9]:
def mapper_parse__inflation_lines(line):
    """Parse line into (neighborhoood, inflation) pair"""
    words = line.split()
    return (words[0], float(words[1]))

In [10]:
inflation_RDD = inflation_text_RDD.map(mapper_parse__inflation_lines)

In [11]:
inflation_RDD.collect()

[(u'Downtown', 2.1), (u'Hilltop', 4.5)]

## join

In [12]:
house_prices_RDD.join(inflation_RDD).collect()

[(u'Downtown', (400000.0, 2.1)),
 (u'Downtown', (240000.0, 2.1)),
 (u'Hilltop', (650000.0, 4.5))]

In [13]:
def mapper_multiply_price_inflation(pair):
    inflation_ratio = 1 + pair[1][1]/100.
    return (pair[0], pair[1][0]*inflation_ratio)

In [14]:
house_prices_nextyear_RDD = house_prices_RDD.join(
    inflation_RDD).map(mapper_multiply_price_inflation)

In [15]:
house_prices_nextyear_RDD.collect()

[(u'Downtown', 408399.99999999994),
 (u'Downtown', 245039.99999999997),
 (u'Hilltop', 679250.0)]

## reduce

In [16]:
def reducer_sum(a,b):
    return a+b

In [17]:
total_nextyear = house_prices_nextyear_RDD.reduceByKey(reducer_sum)

In [18]:
total_nextyear.collect()

[(u'Downtown', 653439.9999999999), (u'Hilltop', 679250.0)]

## Excercise

List neighborhood and house price only for the neighborhoods where inflation is low (less than 4%)

(Advanced: for each of those neighborhoods, find the more expensive house)

In [20]:
# %load ./solution_house_price_join.py
def is_inflation_low(pair):
    return pair[1][1] < 4
def reducer_max_price(a,b):
    return max(a[0], b[0])
house_prices_RDD.join(inflation_RDD).filter(is_inflation_low).reduceByKey(reducer_max_price).collect()


[(u'Downtown', 400000.0)]

## Print DAG

In [21]:
print(total_nextyear.toDebugString())

(4) PythonRDD[26] at collect at <ipython-input-18-4646dc69b0ef>:1 []
 |  MapPartitionsRDD[25] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[24] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(4) PairwiseRDD[23] at reduceByKey at <ipython-input-17-6168dd502f27>:1 []
    |  PythonRDD[22] at reduceByKey at <ipython-input-17-6168dd502f27>:1 []
    |  MapPartitionsRDD[20] at mapPartitions at PythonRDD.scala:374 []
    |  ShuffledRDD[19] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    +-(4) PairwiseRDD[18] at join at <ipython-input-14-fb2153d68f72>:2 []
       |  PythonRDD[17] at join at <ipython-input-14-fb2153d68f72>:2 []
       |  UnionRDD[16] at union at NativeMethodAccessorImpl.java:-2 []
       |  PythonRDD[14] at RDD at PythonRDD.scala:43 []
       |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
       |  /data/houses.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []
       |  PythonRDD[15] at RDD at PythonRD