# Apache Spark. Create an RDD with Scala

> "Create an RDD with Scala"

- toc:true
- branch: master
- badges: false
- comments: false
- author: Alexandros Giavaras
- categories: [spark, scala, big-data, data-engineering, data-analysis]

## <a name="overview"></a> Overview

In this post we will see how to create a Spark RDD within a Scala application.  Although, modern applications most likely will be using the ```DataFrame``` and/or ```DataSet``` APIs, still the RDD data structure is what lies underneath the latter two and therefore always useful to know. As we will see, there are various methods to create an RDD in Spark. The following example is taken for <a href="https://sparkbyexamples.com/apache-spark-rdd/how-to-create-an-rdd-using-parallelize/">Spark by {Examples}</a>.


## Create Spark RDD with Scala

A Resilient Distributed Datasets, or RDD for short,  is the  fundamental data structure of Spark and underlies both the ```DataFrame``` and ```DataSet``` data structures. An RDD is an immutable collection of objects that can be distributed across a cluster of computers. An RDD collection is divided into a number of partitions so that each node on a Spark cluster  can independently perform computations. . 

There are two main methods available in Spark to create an RDD: 

- ```SparkContext.parallelize``` method
- Read from a file

The first method is illustrated in the example below

```
package train.spark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object CreateRDD {
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("Hello Scala Spark")
    val sc = new SparkContext(conf)
    
    val data = Array(1,2,3,4,5,6,7,8,9,10)
    val rdd = sc.parallelize(data)
    rdd.foreach(println)
    
    println("Number of Partitions: "+rdd.getNumPartitions)
    println("Action: First element: "+rdd.first()) 
  }
}
```

Running the application produces something like the following

```
3
6
1
8
9
2
7
4
5
10
Number of Partitions: 4
Action: First element: 1
```

Note the the output may be different as it depends on which thread is accessing  the standard output first.
Note that the application above has to create a ```SparkContext``` first before we are able to create an RDD.

---
**Remark**

Creating a ```SparkContext``` is not necessary when we use the Scala Spark shell as one such object is already created for us.

---

The second method is to read a file from disk. This is also shown in the snippet below.

```
package train.spark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object CreateRDDFile {
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("Hello Scala Spark")
    val sc = new SparkContext(conf)
    
    // Should be some file on your system
    val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv" 
    val csvRDD = sc.textFile(csvFile)
        
    println("Number of Partitions: "+csvRDD.getNumPartitions)
    println("Action: First element: "+csvRDD.first()) 
  }
}
```

Upon executing this code, we get

```
Number of Partitions: 2
Action: First element: #Duplicate: 0, Delete: 1, Normal-1: 2, TUF: 3, Normal-2: 4
```

However, we are interested in converting the contents of the file into floating point numbers so that we can feed them to a machine learning algorithm. We can do this as follows.  we can use the ```map()``` function to convert the ```RDD[String]``` into an ```RDD[Array[Double]]```

```
val doubleRDD = csvRDD.map(line => {line.split(",")})
                      .map( arrString => {Try(Array(arrString(0).toDouble, arrString(1).toDouble,                                                                   arrString(2).toDouble))})
                      .map(_ match {case Success(res) => res
                                         case Failure(res) => Array(-100, -100, -100)})
```

We can also use a schema in order to let Spark know the type of the data but this requires that we use a ```DataFrame``` instead and not an RDD.

Note also that Spark divides by default data into two partitions and distributes them across a cluster. The number of partitions can be specified while creating an RDD as shown below.

```
// Should be some file on your system
val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv" 

// use four partitions for the data
val csvRDD = sc.textFile(csvFile, 4)
```

## Other methods

As an aside, we can create an RDD by using the following also:

- JDBC
- Cassandra
- HBase
- Elasticsearch

## References

1. <a href="https://spark.apache.org/docs/latest/rdd-programming-guide.html">RDD Programming Guide</a>