In [1]:
%run -i /opt/setup_spark.py

Create a local file just on the Driver, this is not accessible by the Spark Cluster

In [2]:
!echo "Downtown 2.1\nHilltop 4.5" > inflation.txt

In [3]:
!cat inflation.txt

Downtown 2.1
Hilltop 4.5


Copy the file to the distributed file system HDFS

In [4]:
!hdfs dfs -rm inflation.txt

2015-11-17 00:37:53,881 INFO  [main] fs.TrashPolicyDefault (TrashPolicyDefault.java:initialize(92)) - Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted inflation.txt


In [5]:
!hdfs dfs -copyFromLocal inflation.txt

Verify that the file is replicated 3 times on all nodes `10.128.1.6`, `10.128.1.7`, `10.128.1.8`

In [6]:
!hdfs fsck /user/root/inflation.txt -files -blocks -locations -racks  

Connecting to namenode via http://sparkcluster-2:50070
FSCK started by root (auth:SIMPLE) from /10.128.1.5 for path /user/root/inflation.txt at Tue Nov 17 00:37:59 UTC 2015
/user/root/inflation.txt 25 bytes, 1 block(s):  OK
0. BP-545221491-10.128.1.5-1447628910426:blk_1073741834_1010 len=25 repl=3 [/default-rack/10.128.1.8:50010, /default-rack/10.128.1.7:50010, /default-rack/10.128.1.6:50010]

Status: HEALTHY
 Total size:	25 B
 Total dirs:	0
 Total files:	1
 Total symlinks:		0
 Total blocks (validated):	1 (avg. block size 25 B)
 Minimally replicated blocks:	1 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	3
 Average block replication:	3.0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Number of data-nodes:		3
 Number of racks:		1
FSCK ended at Tue Nov 17 00:37:59 UTC 2015 in 1 milliseconds


The filesystem under path '/user/root/inflation.txt' is HEALTHY


## Load house prices

In [7]:
text_RDD = sc.textFile("houses.txt")

In [8]:
def mapper_parse_lines(line):
    """Parse line into (neighborhoood, price) pair"""
    words = line.split()
    return (words[1], float(words[2]))

In [9]:
house_prices_RDD = text_RDD.map(mapper_parse_lines)

In [10]:
house_prices_RDD.collect()

[(u'Downtown', 400000.0), (u'Downtown', 240000.0), (u'Hilltop', 650000.0)]

## Load inflation

In [11]:
inflation_text_RDD = sc.textFile("inflation.txt")

In [12]:
def mapper_parse__inflation_lines(line):
    """Parse line into (neighborhoood, inflation) pair"""
    words = line.split()
    return (words[0], float(words[1]))

In [13]:
inflation_RDD = inflation_text_RDD.map(mapper_parse__inflation_lines)

In [14]:
inflation_RDD.collect()

[(u'Downtown', 2.1), (u'Hilltop', 4.5)]

## join

In [15]:
house_prices_RDD.join(inflation_RDD).collect()

[(u'Downtown', (400000.0, 2.1)),
 (u'Downtown', (240000.0, 2.1)),
 (u'Hilltop', (650000.0, 4.5))]

In [16]:
def mapper_multiply_price_inflation(pair):
    return (pair[0], pair[1][0]*(1 + pair[1][1]/100.))

In [17]:
house_prices_nextyear_RDD = house_prices_RDD.join(inflation_RDD).map(mapper_multiply_price_inflation)

In [18]:
house_prices_nextyear_RDD.collect()

[(u'Downtown', 408399.99999999994),
 (u'Downtown', 245039.99999999997),
 (u'Hilltop', 679250.0)]

## reduce

In [19]:
def reducer_sum(a,b):
    return a+b

In [20]:
total_nextyear = house_prices_nextyear_RDD.reduceByKey(reducer_sum)

In [21]:
total_nextyear.collect()

[(u'Downtown', 653439.9999999999), (u'Hilltop', 679250.0)]

## Print DAG

In [22]:
print(total_nextyear.toDebugString())

(2) PythonRDD[26] at collect at <ipython-input-21-4646dc69b0ef>:1 []
 |  MapPartitionsRDD[25] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[24] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(2) PairwiseRDD[23] at reduceByKey at <ipython-input-20-6168dd502f27>:1 []
    |  PythonRDD[22] at reduceByKey at <ipython-input-20-6168dd502f27>:1 []
    |  MapPartitionsRDD[20] at mapPartitions at PythonRDD.scala:374 []
    |  ShuffledRDD[19] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    +-(2) PairwiseRDD[18] at join at <ipython-input-17-983588daccd8>:1 []
       |  PythonRDD[17] at join at <ipython-input-17-983588daccd8>:1 []
       |  UnionRDD[16] at union at NativeMethodAccessorImpl.java:-2 []
       |  PythonRDD[14] at RDD at PythonRDD.scala:43 []
       |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
       |  houses.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []
       |  PythonRDD[15] at RDD at PythonRDD.scal

## Cache

In [23]:
%time house_prices_nextyear_RDD.reduceByKey(max).collect()

CPU times: user 17.2 ms, sys: 3.61 ms, total: 20.8 ms
Wall time: 327 ms


[(u'Downtown', 408399.99999999994), (u'Hilltop', 679250.0)]

In [24]:
%time house_prices_nextyear_RDD.reduceByKey(min).collect()

CPU times: user 25.8 ms, sys: 4.11 ms, total: 29.9 ms
Wall time: 339 ms


[(u'Downtown', 245039.99999999997), (u'Hilltop', 679250.0)]

In [25]:
house_prices_nextyear_RDD.cache()

PythonRDD[21] at collect at <ipython-input-18-0f19c536a678>:1

In [26]:
%time house_prices_nextyear_RDD.reduceByKey(max).collect()

CPU times: user 23 ms, sys: 0 ns, total: 23 ms
Wall time: 412 ms


[(u'Downtown', 408399.99999999994), (u'Hilltop', 679250.0)]

In [27]:
%time house_prices_nextyear_RDD.reduceByKey(min).collect()

CPU times: user 5.07 ms, sys: 10.4 ms, total: 15.5 ms
Wall time: 349 ms


[(u'Downtown', 245039.99999999997), (u'Hilltop', 679250.0)]