Create a local file just on the Driver, this is not accessible by the Spark Cluster

In [None]:
%%file inflation.txt
Downtown 2.1
Hilltop 4.5

In [None]:
!cat inflation.txt

Copy the file to the distributed file system HDFS

In [None]:
!hdfs dfs -copyFromLocal inflation.txt /data/

Verify that the file is replicated 3 times

In [None]:
!hdfs fsck /data/inflation.txt -files -blocks -locations -racks  

## Load house prices

In [None]:
text_RDD = sc.textFile("/data/houses.txt")

In [None]:
def mapper_parse_lines(line):
    """Parse line into (neighborhoood, price) pair"""
    words = line.split()
    return (words[1], float(words[2]))

In [None]:
house_prices_RDD = text_RDD.map(mapper_parse_lines)

In [None]:
house_prices_RDD.collect()

## Load inflation

In [None]:
inflation_text_RDD = sc.textFile("/data/inflation.txt")

In [None]:
def mapper_parse__inflation_lines(line):
    """Parse line into (neighborhoood, inflation) pair"""
    words = line.split()
    return (words[0], float(words[1]))

In [None]:
inflation_RDD = inflation_text_RDD.map(mapper_parse__inflation_lines)

In [None]:
inflation_RDD.collect()

## join

In [None]:
house_prices_RDD.join(inflation_RDD).collect()

In [None]:
def mapper_multiply_price_inflation(pair):
    inflation_ratio = 1 + pair[1][1]/100.
    return (pair[0], pair[1][0]*inflation_ratio)

In [None]:
house_prices_nextyear_RDD = house_prices_RDD.join(inflation_RDD).map(mapper_multiply_price_inflation)

In [None]:
house_prices_nextyear_RDD.collect()

## reduce

In [None]:
def reducer_sum(a,b):
    return a+b

In [None]:
total_nextyear = house_prices_nextyear_RDD.reduceByKey(reducer_sum)

In [None]:
total_nextyear.collect()

## Excercise

List neighborhood and house price only for the neighborhoods where inflation is low (less than 4%)

(Advanced: for each of those neighborhoods, find the more expensive house)

In [None]:
prices_inflation_RDD = house_prices_RDD.join(inflation_RDD)

In [None]:
prices_inflation_RDD.collect()

In [None]:
def has_low_inflation(pair):
    return pair[1][1] < 4

In [None]:
has_low_inflation((u'Downtown', (400000.0, 2.1)))

In [None]:
has_low_inflation((u'Hilltop', (650000.0, 4.5)))

In [None]:
prices_inflation_RDD.filter(has_low_inflation).collect()

In [None]:
# %load solution_house_price_join.py
def is_inflation_low(pair):
    return pair[1][1] < 4
def reducer_max_price(a,b):
    return max(a[0], b[0])
house_prices_RDD.join(inflation_RDD).filter(is_inflation_low). \
       reduceByKey(reducer_max_price).collect()


## Print DAG

In [None]:
print(total_nextyear.toDebugString())

## Cache

In [None]:
%time house_prices_nextyear_RDD.reduceByKey(max).collect()

In [None]:
%time house_prices_nextyear_RDD.reduceByKey(min).collect()

In [None]:
house_prices_nextyear_RDD.cache()

In [None]:
%time house_prices_nextyear_RDD.reduceByKey(max).collect()

In [None]:
%time house_prices_nextyear_RDD.reduceByKey(min).collect()