## Create Spark RDD

In [13]:
val columns = Seq("language", "users-account")
val data = Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000))
val rdd = spark.sparkContext.parallelize(data)

columns: Seq[String] = List(language, users-account)
data: Seq[(String, Int)] = List((Java,20000), (Python,100000), (Scala,3000))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:38


## Convert Spark RDD to DataFrame

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

## Convert RDD to DataFrame – Using toDF()

In [19]:
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()

// toDF() has another signature that takes arguments to define column names as shown below.

val dfFromRDD1_1 = rdd.toDF("language", "users_account")
dfFromRDD1_1.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)

root
 |-- language: string (nullable = true)
 |-- users_account: integer (nullable = false)



dfFromRDD1: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
dfFromRDD1_1: org.apache.spark.sql.DataFrame = [language: string, users_account: int]


## Convert RDD to DataFrame – Using createDataFrame()
SparkSession class provides createDataFrame() method to create DataFrame and it takes rdd object as an argument. and chain it with toDF() to specify names to the columns.

In [31]:
val columns = Seq("language", "users_account")
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns: _*)
// Here, we are using scala operator :_*  to explode columns array to comma-separated values

columns: Seq[String] = List(language, users_account)
dfFromRDD2: org.apache.spark.sql.DataFrame = [language: string, users_account: int]


## Using RDD Row type RDD[Row] to DataFrame

Spark createDataFrame() has another signature which takes the RDD[Row] type and schema for column names as arguments. To use this first, we need to convert our “rdd” object from RDD[T] to RDD[Row]. To define a schema, we use StructType that takes an array of StructField. And StructField takes column name, data type and nullable/not as arguments

In [35]:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

//From RDD (USING createDataFrame and Adding schema using StructType)
val schema = StructType(columns.map(fieldName => StructField(fieldName, StringType, nullable=true)))

//convert RDD[T] to RDD[Row]
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))

val dfFromRDD3 = spark.createDataFrame(rowRDD, schema)

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
schema: org.apache.spark.sql.types.StructType = StructType(StructField(language,StringType,true), StructField(users_account,StringType,true))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[11] at map at <console>:52
dfFromRDD3: org.apache.spark.sql.DataFrame = [language: string, users_account: string]


## Convert Spark RDD to Dataset

In [37]:
val ds = spark.createDataset(rdd)
ds.show(false)

+------+------+
|_1    |_2    |
+------+------+
|Java  |20000 |
|Python|100000|
|Scala |3000  |
+------+------+



ds: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
