In [1]:
val df = spark.read.json("./people.json")

Intitializing Scala interpreter ...

Spark Web UI available at http://5df2e831444a:4040
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1633658133253)
SparkSession available as 'spark'


df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]


In [2]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [3]:
val wordsRDD = sc.parallelize(Array("a", "b", "c", "d", "a", "a", "b", "b", "c", "d", "d", "d", "d"))
val wordsDF = wordsRDD.toDF()


wordsRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:25
wordsDF: org.apache.spark.sql.DataFrame = [value: string]


In [4]:
wordsDF.show()

+-----+
|value|
+-----+
|    a|
|    b|
|    c|
|    d|
|    a|
|    a|
|    b|
|    b|
|    c|
|    d|
|    d|
|    d|
|    d|
+-----+



In [5]:
val wordsDF = wordsRDD.toDF("word")

wordsDF: org.apache.spark.sql.DataFrame = [word: string]


In [6]:
wordsDF.show()

+----+
|word|
+----+
|   a|
|   b|
|   c|
|   d|
|   a|
|   a|
|   b|
|   b|
|   c|
|   d|
|   d|
|   d|
|   d|
+----+



In [7]:
val peopleRDD = sc.parallelize(
  Seq( ("David", 150),
       ("White", 200),
       ("Paul",  170) )
)

peopleRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:25


In [8]:
val peopleDF = peopleRDD.toDF("name", "salary")

peopleDF: org.apache.spark.sql.DataFrame = [name: string, salary: int]


In [9]:
peopleDF.show()

+-----+------+
| name|salary|
+-----+------+
|David|   150|
|White|   200|
| Paul|   170|
+-----+------+



In [10]:
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// RDD를 Row 로 초기화 
val peopleRDD = sc.parallelize(
  Seq(
       Row("David", 150),
       Row("White", 200),
       Row("Paul",  170)
  )
)

import org.apache.spark.sql._
import org.apache.spark.sql.types._
peopleRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[19] at parallelize at <console>:29


In [11]:
val peopleSchema = new StructType().add(StructField("name",   StringType, true)).add(StructField("salary", IntegerType, true))


peopleSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(salary,IntegerType,true))


In [12]:
val peopleDF = spark.createDataFrame(peopleRDD, peopleSchema)


peopleDF: org.apache.spark.sql.DataFrame = [name: string, salary: int]


In [13]:
peopleDF.show()

+-----+------+
| name|salary|
+-----+------+
|David|   150|
|White|   200|
| Paul|   170|
+-----+------+



#  https://wikidocs.net/28556

In [14]:
val seq =   Seq(
       ("David", 150),
       ("White", 200),
       ("Paul",  170)
  )
val peopleDS = seq.toDS()

seq: Seq[(String, Int)] = List((David,150), (White,200), (Paul,170))
peopleDS: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]


In [15]:
peopleDS.show()

+-----+---+
|   _1| _2|
+-----+---+
|David|150|
|White|200|
| Paul|170|
+-----+---+



In [16]:
peopleDS.select("_1").show()

+-----+
|   _1|
+-----+
|David|
|White|
| Paul|
+-----+



In [25]:
case class People(name: String, salary: Int)
val peopleSeq = Seq(
       People("David", 150),
       People("White", 200),
       People("Paul",  170)
)

defined class People
peopleSeq: Seq[People] = List(People(David,150), People(White,200), People(Paul,170))


In [26]:
val peopleDS = peopleSeq.toDS()

peopleDS: org.apache.spark.sql.Dataset[People] = [name: string, salary: int]


In [27]:
peopleDS.show()

+-----+------+
| name|salary|
+-----+------+
|David|   150|
|White|   200|
| Paul|   170|
+-----+------+



In [28]:
peopleDS.select("salary").show()

+------+
|salary|
+------+
|   150|
|   200|
|   170|
+------+



In [31]:
import org.apache.spark.sql._
import org.apache.spark.sql.types._


import org.apache.spark.sql._
import org.apache.spark.sql.types._


In [32]:

val peopleRDD = sc.textFile("./people.txt")


peopleRDD: org.apache.spark.rdd.RDD[String] = ./people.txt MapPartitionsRDD[24] at textFile at <console>:37


In [33]:
val peopleSchema = new StructType().add(StructField("name",   StringType, true)).add(StructField("age", IntegerType, true))


peopleSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))


In [34]:
val sepPeopleRdd = peopleRDD.map(line => line.split(",")).map(x => Row(x(0), x(1).trim.toInt))


sepPeopleRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[26] at map at <console>:38


In [35]:
val peopleDF = spark.createDataFrame(sepPeopleRdd, peopleSchema)


peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]


In [36]:
peopleDF.show()


+----+---+
|name|age|
+----+---+
|   A| 29|
|   B| 30|
|   C| 19|
|   D| 15|
|   F| 20|
+----+---+



In [38]:
case class People(name: String, age: Long)


defined class People


In [39]:
val peopleDS = peopleDF.as[People]


peopleDS: org.apache.spark.sql.Dataset[People] = [name: string, age: int]


In [40]:
peopleDS.show()

+----+---+
|name|age|
+----+---+
|   A| 29|
|   B| 30|
|   C| 19|
|   D| 15|
|   F| 20|
+----+---+



In [41]:
case class People(name: String, age: Long)


defined class People


In [42]:
val peopleDF = spark.read.json("./people.json")


peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]


In [43]:
val peopleDS = peopleDF.as[People]


peopleDS: org.apache.spark.sql.Dataset[People] = [age: bigint, name: string]


In [44]:
peopleDS.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [45]:
case class People(name: String, age: Long)


defined class People


In [46]:
val peopleDF = spark.read.json("./people.json")


peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]


In [47]:
val peopleDS = peopleDF.as[People]

peopleDS: org.apache.spark.sql.Dataset[People] = [age: bigint, name: string]


In [48]:
peopleDS.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [49]:
peopleDS.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [50]:
peopleDS.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [51]:
peopleDS.select($"name", $"age" + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [52]:
peopleDS.filter("age is not null").show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [53]:
peopleDS.select($"name", $"age").filter($"age" > 20).show()

+----+---+
|name|age|
+----+---+
|Andy| 30|
+----+---+



In [54]:
peopleDS.select($"name", $"age").filter("age > 20").show()

+----+---+
|name|age|
+----+---+
|Andy| 30|
+----+---+



In [56]:
peopleDS.filter("age is not null").map(p=>p.name+","+p.age).show()

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 24.0 failed 1 times, most recent failure: Lost task 0.0 in stage 24.0 (TID 32) (5df2e831444a executor driver): java.lang.ClassCastException: class $iw cannot be cast to class $iw ($iw is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @20e2b45d; $iw is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @37c27db9)