In [1]:
sc

In [2]:
sc.parallelize([1,2,3,4,5]).collect()

[1, 2, 3, 4, 5]

json 형식의 RDD를 생성한다.  
트랜스포메이션 연산이므로 spark.read.json에서 호출될 때 실행된다.

In [3]:
stringJsonRdd = sc.parallelize((
    """{"id": "123", "name": "Katie", "age": 19, "eyeColor": "brown"}""",
    """{"id": "234", "name": "Michael", "age": 22, "eyeColor": "green"}""",
    """{"id": "345", "name": "Simone", "age": 29, "eyeColor": "blue"}"""
))

In [4]:
# dataframe 생성
swimmerJson = spark.read.json(stringJsonRdd)

swimmerJson dataframe의 임시 테이블을 생성한다.

In [5]:
swimmerJson.createOrReplaceTempView("swimmerJson")

DataFrame API 1. show(n)  
n개의 행을 출력한다. 디폴트 n == 10  

In [6]:
swimmerJson.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 29|    blue|345| Simone|
+---+--------+---+-------+



spark는 pyspark shell에서 자동으로 생성된 SparkSession이다.  
collect() 함수는 모든 데이터를 행 객체로 변환한다.  

In [7]:
spark.sql("select * from swimmerJson").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=29, eyeColor='blue', id='345', name='Simone')]

기존 RDD를 Dataframe으로 변경하는 방법 1.  
Reflection을 이용한 스키마 추측  

SparkSQL 은 행 객체 RDD를 데이터프레임으로 변경한다.  
이때 데이터 타입을 데이터 샘플링을 통해 추측하여 자동으로 정의한다.

In [8]:
swimmerJson.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



기존 RDD를 Dataframe으로 변경하는 방법 2.  
스키마를 직접 명시하기

In [9]:
from pyspark.sql.types import *

In [10]:
stringCsvRdd = sc.parallelize([
    (123, 'Katie', 19, 'brown'),
    (234, 'Michael', 23, 'green'),
    (345, 'Simone', 29, 'blue')
])

StructField(name, dataType, nullable)

In [11]:
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

In [12]:
# rdd에 정의한 스키마를 적용하여 데이터프레임을 생성
swimmers = spark.createDataFrame(stringCsvRdd, schema)

In [13]:
swimmers.createOrReplaceTempView("swimmers")

In [14]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



### 데이터프레임 API로 쿼리하기

In [15]:
swimmers.count()  # 데이터프레임의 행의 개수

3

In [17]:
# select : 리턴할 컬럼 지정
# filter : 조건 설정
swimmers.select("id", "age").filter("age=23").show()

+---+---+
| id|age|
+---+---+
|234| 23|
+---+---+



In [18]:
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 23).show()

+---+---+
| id|age|
+---+---+
|234| 23|
+---+---+



In [19]:
# eyeColor가 b로 시작하는 데이터 조회
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



### SQL로 쿼리

sql문을 통한 행의 개수 얻기

In [20]:
spark.sql("select count(1) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [21]:
spark.sql("select id, age from swimmers where age = 23").show()

+---+---+
| id|age|
+---+---+
|234| 23|
+---+---+



In [22]:
spark.sql("select id, eyeColor from swimmers where eyeColor like 'b%'").show()

+---+--------+
| id|eyeColor|
+---+--------+
|123|   brown|
|345|    blue|
+---+--------+



### DataFrame Senario

In [1]:
flightFilePath = "file:///home/an/nb-workspace/flight-data/departuredelays.csv"
airportFilePath = "file:///home/an/nb-workspace/flight-data/airport-codes-na.txt"

In [2]:
airports = spark.read.csv(airportFilePath, header='true', inferSchema='true', sep='\t')

In [3]:
airports.createOrReplaceTempView("airports")

In [4]:
fightPerf = spark.read.csv(flightFilePath, header='true')

In [5]:
fightPerf.createOrReplaceTempView("FlightPerformance")

In [6]:
fightPerf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [8]:
spark.sql("""
select a.City, f.origin, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.State = 'WA'
group by a.City, f.origin
order by sum(f.delay) desc
""").show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [12]:
spark.sql("""
select a.State, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.Country = 'USA'
group by a.State
""").show()

+-----+---------+
|State|   Delays|
+-----+---------+
|   SC|  80666.0|
|   AZ| 401793.0|
|   LA| 199136.0|
|   MN| 256811.0|
|   NJ| 452791.0|
|   OR| 109333.0|
|   VA|  98016.0|
| null| 397237.0|
|   RI|  30760.0|
|   WY|  15365.0|
|   KY|  61156.0|
|   NH|  20474.0|
|   MI| 366486.0|
|   NV| 474208.0|
|   WI| 152311.0|
|   ID|  22932.0|
|   CA|1891919.0|
|   CT|  54662.0|
|   NE|  59376.0|
|   MT|  19271.0|
+-----+---------+
only showing top 20 rows

