# Loading Data into RDD

Data files are stored at
<pre>
    $ hdfs dfs -ls /user/pmolnar/data/AdventureWorks
    Found 4 items
    -rw-r--r--   3 pmolnar hadoop        466 2017-02-07 22:46 /user/pmolnar/data/AdventureWorks/Customer.csv.gz
    -rw-r--r--   3 pmolnar hadoop      18125 2017-02-07 22:46 /user/pmolnar/data/AdventureWorks/Employees.csv.gz
    -rw-r--r--   3 pmolnar hadoop        603 2017-02-07 22:46 /user/pmolnar/data/AdventureWorks/ItemsOrdered.csv.gz
    -rw-r--r--   3 pmolnar hadoop        404 2017-02-07 22:46 /user/pmolnar/data/AdventureWorks/SalesTerritory.csv.gz
</pre>

In [3]:
val customer_rdd = sc.textFile("/user/pmolnar/data/AdventureWorks/Customer.csv.gz")

In [4]:
println(customer_rdd.take(10).mkString("\n"))

CustomerID,SalesTerritoryID,FirstName,LastName,City,StateName
10101,1,John,Gray,Lynden,Washington
10298,4,Leroy,Brown,Pinetop,Arizona
10299,1,Elroy,Keller,Snoqualmie,Washington
10315,3,Lisa,Jones,Oshkosh,Wisconsin
10325,1,Ginger,Schultz,Pocatello,Idaho
10329,5,Kelly,Mendoza,Kailua,Hawaii
10330,1,Shawn,Dalton,Cannon Beach,Oregon
10338,1,Michael,Howell,Tillamook,Oregon
10339,4,Anthony,Sanchez,Winslow,Arizona


In [5]:
val hdr = customer_rdd.first()

In [7]:
val customer2_rdd = customer_rdd.repartition(4).filter(x => x!=hdr)

In [8]:
println(customer2_rdd.take(10).mkString("\n"))

10101,1,John,Gray,Lynden,Washington
10325,1,Ginger,Schultz,Pocatello,Idaho
10339,4,Anthony,Sanchez,Winslow,Arizona
10419,4,Linda,Sakahara,Nogales,Arizona
10449,4,Isabela,Moore,Yuma,Arizona
10298,4,Leroy,Brown,Pinetop,Arizona
10329,5,Kelly,Mendoza,Kailua,Hawaii
10408,4,Elroy,Cleaver,Globe,Arizona
10429,5,Sarah,Graham,Greensboro,North Carolina
10299,1,Elroy,Keller,Snoqualmie,Washington


In [24]:
val customer3_rdd = customer2_rdd.map(x => x.split(',')).map(x => (x(0).toInt, x(1).toInt, x(2), x(3), x(4), x(5)))

In [25]:
for (t <- customer3_rdd.take(10)) {
    println(t)
}

(10101,1,John,Gray,Lynden,Washington)
(10325,1,Ginger,Schultz,Pocatello,Idaho)
(10339,4,Anthony,Sanchez,Winslow,Arizona)
(10419,4,Linda,Sakahara,Nogales,Arizona)
(10449,4,Isabela,Moore,Yuma,Arizona)
(10298,4,Leroy,Brown,Pinetop,Arizona)
(10329,5,Kelly,Mendoza,Kailua,Hawaii)
(10408,4,Elroy,Cleaver,Globe,Arizona)
(10429,5,Sarah,Graham,Greensboro,North Carolina)
(10299,1,Elroy,Keller,Snoqualmie,Washington)


# RDD Transformations

## map()

In [2]:
val x = sc.parallelize(Array("b", "a", "c"))
val y = x.map(z => (z,1))
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

b, a, c
(b,1), (a,1), (c,1)


## filter()

In [3]:
val x = sc.parallelize(Array(1,2,3))
val y = x.filter(n => n%2 == 1)
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

1, 2, 3
1, 3


## flatMap()

In [4]:
val x = sc.parallelize(Array(1,2,3))
val y = x.flatMap(n => Array(n, n*100, 42))
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

1, 2, 3
1, 100, 42, 2, 200, 42, 3, 300, 42


## groupBy()

In [5]:
val x = sc.parallelize(
Array("John", "Fred", "Anna", "James"))
val y = x.groupBy(w => w.charAt(0))
println(y.collect().mkString(", "))

(A,CompactBuffer(Anna)), (F,CompactBuffer(Fred)), (J,CompactBuffer(John, James))


## groupByKey()

In [6]:
val x = sc.parallelize( Array(('B',5),('B',4),('A',3),('A',2),('A',1)))
val y = x.groupByKey()
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

(B,5), (B,4), (A,3), (A,2), (A,1)
(A,CompactBuffer(3, 2, 1)), (B,CompactBuffer(5, 4))


###  groupByKey() vs reduceByKey()

In [12]:
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD.
    reduceByKey(_ + _).
    collect()

val wordCountsWithGroup = wordPairsRDD.
    groupByKey().
    map(t => (t._1, t._2.sum)).
    collect()

## mapPartitions()

In [17]:
val x = sc.parallelize(Array(1,2,3), 2)

def f(i:Iterator[Int])={
    (i.sum,42).productIterator
}

val y = x.mapPartitions(f)
// glom() flattens elements on the same partition
val xOut = x.glom().collect()
val yOut = y.glom().collect()

## mapPartitionWithIndex

In [18]:
val x = sc.parallelize(Array(1,2,3), 2)

def f(partitionIndex:Int, i:Iterator[Int]) = {
    (partitionIndex, i.sum).productIterator
}

val y = x.mapPartitionsWithIndex(f)

// glom() flattens elements on the same partition 
val xOut = x.glom().collect()
val yOut = y.glom().collect()

## sample()

In [19]:
val x = sc.parallelize(Array(1, 2, 3, 4, 5))
val y = x.sample(false, 0.4)

// omitting seed will yield different output
println(y.collect().mkString(", "))

1, 3, 4


## union()

In [23]:
val x = sc.parallelize(Array(1,2,3), 2)
val y = sc.parallelize(Array(3,4), 1)
val z = x.union(y)
val zOut = z.glom().collect()
zOut

Array(Array(1), Array(2, 3), Array(3, 4))

Array(Array(1), Array(2, 3), Array(3, 4))

## join()

In [24]:
val x = sc.parallelize(Array(("a", 1), ("b", 2)))
val y = sc.parallelize(Array(("a", 3), ("a", 4), ("b", 5)))

val z = x.join(y)
println(z.collect().mkString(", "))

(a,(1,3)), (a,(1,4)), (b,(2,5))


## distinct()

In [25]:
val x = sc.parallelize(Array(1,2,3,3,4))
val y = x.distinct()
println(y.collect().mkString(", "))

1, 2, 3, 4


## coalesce()

In [None]:
val x = sc.parallelize(Array(1, 2, 3, 4, 5), 3)
val y = x.coalesce(2)
val xOut = x.glom().collect()
val yOut = y.glom().collect()

xOut

In [30]:
yOut

Array(Array(1), Array(2, 3, 4, 5))

## keyBy()

In [32]:
val x = sc.parallelize(
Array("John", "Fred", "Anna", "James"))
val y = x.keyBy(w => w.charAt(0))
println(y.collect().mkString(", "))

(J,John), (F,Fred), (A,Anna), (J,James)


## partitionBy()

In [33]:
import org.apache.spark.Partitioner

val x = sc.parallelize(Array(('J',"James"),('F',"Fred"),
                            ('A',"Anna"),('J',"John")), 3)
                            
val y = x.partitionBy(new Partitioner() {
    val numPartitions = 2
    def getPartition(k:Any) = {
        if (k.asInstanceOf[Char] < 'H') 0 else 1
    }
})

val yOut = y.glom().collect()
yOut

Array(Array((F,Fred), (A,Anna)), Array((J,James), (J,John)))

## zip()

In [34]:
val x = sc.parallelize(Array(1,2,3)) 
val y = x.map(n=>n*n)
val z = x.zip(y)
println(z.collect().mkString(", "))

(1,1), (2,4), (3,9)


# RDD Actions

## get number of partitions

In [36]:
val x = sc.parallelize(Array(1,2,3), 2)
val y = x.partitions.size
val xOut = x.glom().collect()
println(y)

2


## collect()

In [37]:
val x = sc.parallelize(Array(1,2,3), 2)
val y = x.collect()
val xOut = x.glom().collect()
println(y)

[I@5fa59152


## reduce()

In [40]:
val x = sc.parallelize(Array(1,2,3,4))
val y = x.reduce((a,b) => a+b)
println(x.collect.mkString(", "))
println(y)

1, 2, 3, 4
10


## aggregate()

In [44]:
def seqOp = (data:(Array[Int], Int), item:Int) =>
    (data._1 :+ item, data._2 + item)
def combOp = (d1:(Array[Int], Int), d2:(Array[Int], Int)) =>
    (d1._1.union(d2._1), d1._2 + d2._2)
    
val x = sc.parallelize(Array(1,2,3,4))
val y = x.aggregate((Array[Int](), 0))(seqOp, combOp)


In [45]:
y

(Array(3, 4, 1, 2),10)

## max()

In [46]:
val x = sc.parallelize(Array(2,4,1))
val y = x.max
println(x.collect().mkString(", "))
println(y)

2, 4, 1
4


## sum()

In [47]:
val x = sc.parallelize(Array(2,4,1))
val y = x.sum
println(x.collect().mkString(", "))
println(y)

2, 4, 1
7.0


## mean()

In [48]:
val x = sc.parallelize(Array(2,4,1))
val y = x.mean
println(x.collect().mkString(", "))
println(y)

2, 4, 1
2.3333333333333335


## stdev()

In [49]:
val x = sc.parallelize(Array(2,4,1))
val y = x.stdev
println(x.collect().mkString(", "))
println(y)

2, 4, 1
1.247219128924647


## countByKey()

In [50]:
val x = sc.parallelize(Array(('J',"James"),('F',"Fred"), ('A',"Anna"),('J',"John")))
val y = x.countByKey()
println(y)

Map(A -> 1, F -> 1, J -> 2)


# Saving RDD

## saveAsTextFile()
Make sure output file does not exist

In [53]:
val x = sc.parallelize(Array(2,4,1))
x.saveAsTextFile("/user/pmolnar/demo")
val y = sc.textFile("/user/pmolnar/demo")
println(y.collect().mkString(", "))

2, 4, 1
