### DataFrame 

데이터프레임은 관계형 데이터베이스의 테이블에서 컬럼으로 구성된 변경 불가능한 분산 데이터 컬렉션이다   
아파치 스파크 1.0에서 스키마 RDD 가 소개되었고  스키마 RDD가 아파치 스파크 1.3버전에서 데이터프레임 이라는 이름으로 바뀜  
파이썬 Pandas의 데이터프레임 또는 R의 데이터프레임에 익숙한 사람들은   
스파크 데이터프레임은 구조적인 데이터로 작업을 쉽게 해준다.  

파이썬 RDD의 약점   

스파크 API를 파이썬에서 실행하는 것은 자바 JVM 과 py4j사이의 커뮤니케이션 오버헤드가 발생해 느려질 수 있다.  

스파크 1.x 버전에서의 데이터프레임에 익숙하다면  
스파크 2.x 버전에서는 SQLContext 대신 SparkSession을 사용한다는 걸 알 수 있다.  
HiveContex, SQLContext, StreamingContext, SparkContext 등이 SparkSession으로 모두 통합되었다.  (스파크 2.x) 

In [16]:
findspark.find()

'C:\\bigdata\\spark-2.4.5-bin-hadoop2.7'

In [3]:
import findspark, pyspark 
findspark.find()

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession 
conf = pyspark.SparkConf().setAppName('appName').setMaster('local[2]')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc) # 데이터프레임은 session 에서 시작한다 

In [18]:
swimmersJSON = spark.read.json('./data/stringJSONRDD.json')

In [19]:
print(type(swimmersJSON)) # 타입 : DATAFRAME 

<class 'pyspark.sql.dataframe.DataFrame'>


In [20]:
stringJSONRDD = sc.parallelize((
    """
      { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }
    """
))

In [21]:
dir(swimmersJSON) # 스파크에서 지원되는 함수들 

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collectAsArrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_sort_cols',
 '_support_repr_html',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'exceptAll',
 'explain',
 'fillna',
 'filter',
 'first',
 'foreach',
 'f

In [26]:
swimmersJSON = spark.read.json(stringJSONRDD)

In [30]:
swimmersJSON.createOrReplaceTempView('swimmersJSON')

In [31]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [32]:
spark.sql("select * from swimmersJSON").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

RDD 를 데이터프레임으로 변경하는 두가지 방법  
리플렉션을 사용해 스키마를 추측하는 방법  
스키마를 직접 코드상에 명시하는 방법  

In [35]:
swimmersJSON.printSchema() # nullable : null 이 있어도 된다....?  

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [39]:
from pyspark.sql.types import *

In [40]:
stringCSVRDD = sc.parallelize([
    (123, 'Katie', 19, 'brown'),
    (234, 'Michael', 22, 'green'), 
    (345, 'Simone', 23, 'blue')
])

In [41]:
shcemaString = "id name age eyeColor"

In [42]:
schema = StructType([
    StructField("id", LongType(), True), 
    StructField("name", StringType(), True), 
    StructField("age", LongType(), True), 
    StructField("eyeColor", StringType(), True)
])

In [43]:
swimmers = spark.createDataFrame(stringCSVRDD, schema)

In [44]:
swimmers.createGlobalTempView("swimmers")

In [45]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



## Querying with SQL 

In [1]:
# DataBricks notebook에서만 지원되는 방법
%sql 
--Query all data
select * from swimmers

SyntaxError: invalid syntax (<ipython-input-1-868af9d47b03>, line 3)

In [4]:
spark.sql("select * from swimmers").show()

AnalysisException: 'Table or view not found: swimmers; line 1 pos 14'

In [None]:
spark.sql("select count(*) from swimmers").show()

In [None]:
spark.sql("select id, age from swimmers").show()

In [None]:
spark.sql("select id, age from swimmers where age=22").show()

In [None]:
# spark.sql("select name, eyeColor from swimmers where eyeColor like \'b%\'").show()
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%' ").show()

### Querying with the DataFrame API 

In [54]:
display(swimmers)

DataFrame[id: bigint, name: string, age: bigint, eyeColor: string]

In [55]:
swimmers.count()

3

In [57]:
swimmers.select("id", "age").show()

+---+---+
| id|age|
+---+---+
|123| 19|
|234| 22|
|345| 23|
+---+---+



In [58]:
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [59]:
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



스파크 SQL과 데이터프레임으로 작업할 때 잊으면 안되는 중요한 점  
: CSV, JSON을 비롯한 다양한 데이터 포맷을 작업하기에는 쉽지만,  
  가장 일반적인 포맷은 파케이(parquet) 파일 포맷이다.    
  스파크에서는 읽기/쓰기 모두 지원하며, 원본데이터의 스키마 정보를 잘 보존해준다. 

In [None]:
# On-time Flight Performance (http://bit.ly/2cck5hw)
# DataFrame Queries 
flightPerfFilePath = ''

In [None]:
airports = spark.read.csv(airportsFilePath, header='true', interSchema='true', sep='\')

spark.read.csv()  
첫번째 파라미터 : 데이터 파일 경로   
두번째 파라미터 : 첫번째 줄이 헤더인지 여부   
세번째 파라미터 : 스파크가 리플렉션으로 데이터 타입 추측 여부   
네번째 파라미터 : 구분자 지정

In [None]:
airports.createOrReplaceTempView('airports')

In [None]:
flightPerf = spark.read.csv(flightPerfFilePath, header='true')

In [None]:
flightPerf.createOrReplaceTempView("FlightPerformance")

In [None]:
flightPerf.cache()

In [None]:
spark.read.format("com.databricks.spark.csv")\
                .option("header", "true")\
                .load(airportsFilePath)

In [None]:
# 워싱턴에 위치해 있는 공항
# spark.sql("select * from airports where State = 'WA'").show()
airports.select("*").filter("State = 'WA'").show()

In [None]:
# 워싱턴에서 출발하는 비행기 중에서 지연이 된 경로 검색
spark.sql("select a.City, f.origin, sum(f.delay) as Delays \
            from FlightPerformance f \
            join airports a on a.IATA = f.origin \
            where a.State = 'WA' \
            group by a.City, f.origin \
            order by sum(f.delay) desc").show()

In [None]:
# 미국의 주별 지연 정보 검색
spark.sql("select a.State, sum(f.delay) as Delays \
            from FlightPerformance f \
            join airports a on a.IATA = f.origin \
            where a.Country = 'USA' \
            group by a.State").show()

데이터프레임은 스칼라나 자바에서 프로그래머가 정의한 사용자 클래스에 의해 타입을 확실히 명시해야 한는 JVM 객체이다. (중요!!)
이는 타입에 대한 견고함이 적은 파이스파크에서는 테이터프레임 API를 지원하지 않는 경우가 있다.
파이스파크에서 사용 가능하지 않은 데이터프레임 API의 부분은 RDD로 변환하거나 UDF를 사용해야 가능해진다.

(http://bit.ly/2dbfoFT)