# **스파크 colab에 환경 구현**

In [1]:
# jdk 툴 설치
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# apache spark download - 하둡 버전에 따라 spakr 와 하둡 변경 (밑에 tgz 파일 설치 및 pip install)
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
# 명령어로 spark - hadoop 파일 압축 풀기
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
# spark 설치
!pip install -q findspark

In [2]:
import os
# spark 환경변수 설정 - java(jdk) ,spark(spark) 2개다 설정해줘야함
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
#import spark 말고 findspark로 spark파일을 못찾는경우에 빠르게 설치 가능
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

# **연습창 (spark)**

In [5]:
from pyspark import SparkContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [6]:
tichers = sc.parallelize({
    (1, ('Google', 'GOOGL', 'USA')),
    (2, ('Netflix', 'NFLX', 'USA')),
    (3, ('Amazon', 'AMZN', 'USA')),
    (4, ('Tesla', 'TSLA', 'USA')),
    (5, ('Samsung', '005930', 'Korea')),
    (6, ('KaKao', '035720', 'Korea'))
})

prices = sc.parallelize([
    (1, (2984, 'USD')),
    (2, (645, 'USD')),
    (3, (3518, 'USD')),
    (4, (1222, 'USD')),
    (5, (70600, 'USD')),
    (6, (125000, 'USD')),
])

만약 위 코드에서 미국의 2000불 이상의 주식만 가져오기 위한 방법은 3가지로 생각해볼 수 있다.

- Inner Join
 
- Filter by Coutry

- Filter by Currency


In [7]:
# CASE 1: join 먼저, filter 나중에
tickerPrice = tichers.join(prices)
tickerPrice.filter(lambda x: x[1][0][2] == 'USA' and x[1][1][0] > 2000).collect()
'''
[(1, (('Google', 'GOOGL', 'USA'), (2984, 'USD'))), 3, (('Amazon', 'AMZN', 'USA'), (3518, 'USD')))]
'''

# CASE 2: filter 먼저, join 나중에
filteredTicker = tichers.filter(lambda x: x[1][2] == 'USA')
filteredTicker = prices.filter(lambda x: x[1][0] > 2000)
filteredTicker.join(tickerPrice).collect()

[(6, ((125000, 'USD'), (('KaKao', '035720', 'Korea'), (125000, 'USD')))),
 (1, ((2984, 'USD'), (('Google', 'GOOGL', 'USA'), (2984, 'USD')))),
 (3, ((3518, 'USD'), (('Amazon', 'AMZN', 'USA'), (3518, 'USD')))),
 (5, ((70600, 'USD'), (('Samsung', '005930', 'Korea'), (70600, 'USD'))))]

두 가지의 경우 같은 결과를 낳지만 퍼포먼스 자체는 두 번째 케이스가 좋다.

연산에 대하여 일일이 신경쓰기란 까다롭다.

네트워크 연산 성능에 대하여 만약 데이터가 구조화되어 있다면 자동으로 최적화가 가능하다
.

구조화된 데이터란 정형, 비정형, 반정형데이터를 뜻한다.

# **정형(Structured), 비정형(Unstructured), 반정형(Semi structured)**


정형: 행과 열이 있고 데이터 타입이 스키마인 데이터이다.

데이터 베이스

비정형: 자유 형식으로 정리가 되지 않은 파일이다.

로그 파일
이미지

반정형: 행과 열이 있는 데이터이다.

CSV
JSON
XML

# **정형 데이터와 RDD**

RDD에서는 데이터의 구조를 모르기 때문에 데이터를 다루는 것을 개발자에게 의존할 수 밖에 없다.

map, flatMap, filter 등을 통해 유저가 만든 함수를 수행한다.

하지만 정형 데이터에서는 데이터의 구조를 이미 알고 있으므로 어떤 테스크를 수행할 것인지 정의만 하면 된다.

최적화도 자동으로 진행된다.

Spark SQL

Spark SQL은 구조화된 데이터를 다룰 수 있게 해준다.

유저가 일일이 함수를 정의하는 일 없이 작업을 수행할 수 있고 자동으로 연산이 최적화된다.

# **스파크(Spark) SQL**
# 목적
스파크 프로그래밍 내부에서 관계형 처리를 할 수 있다.

스키마 정보를 이용해 자동으로 최적화를 할 수 있다.

외부 데이터셋을 쉽게 사용할 수 있다.
# 소개
스파크 위에 구현된 하나의 패키지이다.

3개의 주요 API가 존재한다.

SQL

DataFrame

Datasets

2개의 백엔드 컴포넌트로 최적화를 진행한다.

Catalyst: 쿼리 최적화 엔진

Tungsten: 시리얼라이저(용량)
# 데이터 프레임(DataFrame)
스파크 코어(Core)에 RDD가 있다면 스파크 SQL에는 데이터 프레임이 있다.

데이터 프레임은 테이블 데이터셋이다.

개념적으로는 RDD에 스키마가 적용된 것이라 볼 수 있다.

**데이터 프레임 생성**

RDD에서 스키마를 정의한 다음 변형 하는 방법과 CSV, JSON 등의 데이터를 받아오는 방법이 있다.

**RDD로 데이터 프레임 생성**

스키마를 자동으로 유추하여 데이터 프레임을 만들거나, 스키마를 사용자가 정의하는 방법이 있다.

**createOrReplaceTempView**

데이터 프레임을 하나의 데이터 베이스 테이블 처럼 사용하려면 createOrReplaceTempView()함수로 temporary view를 만들어줘야한다

In [None]:
data.creatOrReplaceTempView('mobility_data') # 닉네임 지정
spark.sql('SELECT pickup_datetime FROM mobility_data LIMIT 5').show()


# **스파크 세션(SparkSession)**

스파크 코어에 스파크 컨텍스트가 있었다면 스파크 SQL엔 스파크 세션이 있다.

파이썬에서 스파크 SQL을 사용하기 위한 방법이며 스파크 세션으로 불러오는 데이터는 데이터 프레임이다.

In [None]:
spark = SparkSession.builder.appName("test-app").getOrCreate()


위와 같은 코드로 스파크 세션을 만들어 줄 것이다.

SQL문 뿐만 아니라 함수를 사용해서도 가능하다.

데이터 프레임을 RDD로 변환하여 사용할 수도 있지만(rdd = df.rdd.map(tuple)), RDD를 덜 사용하는 쪽이 좋다.

**스파크에서 사용할 수 있는 SQL문**

하이브 쿼리 언어(Hive Query Language)와 거의 동일하다.

Select

From

Where

Count

Having

Group By

Order By

Sort By

Distinct

Join

**데이터 프레임의 이점**

위에서 RDD를 덜 사용하는 편이 좋다고 했는데, 그 이유는 MLlib이나 스파크 스트리밍(Spark Streaming)과 같은 다른 스파크 모듈과 사용하기엔 데이터 프레임이 좋기 때문이다.

개발하기에도 편하고 최적화도 알아서 된다.

**데이터셋(Datasets)**

타입이 있는 데이터프레임이며 파이스파크에선 크게 신경쓰지 않아도 된다.

In [8]:
from pyspark.sql import SparkSession


# 스파크 세션 생성
spark = SparkSession.builder.master("local").appName("learn-sql").getOrCreate()


# 주식 데이터 생성
stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]


In [51]:
# 스키마 생성
# 컬럼의 이름만 입력하고 데이터 타입은 정하지 않는다.
stockSchema = ["name", "ticker", "country", "price", "currency"]


# 데이터 프레임 생성
df = spark.createDataFrame(data=stocks, schema=stockSchema)


# 데이터 타입 확인
df.dtypes
'''
[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]
'''


# 데이터 프레임 확인
df.show()


+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+



In [24]:
# TempView에 등록을 하여야 사용할 수 있다.
df.createOrReplaceTempView("stocks")

In [28]:
spark.sql("SELECT name FROM stocks").show()

+-------+
|   name|
+-------+
| Google|
|Netflix|
| Amazon|
|  Tesla|
|Tencent|
| Toyota|
|Samsung|
|  Kakao|
+-------+



In [29]:
spark.sql("SELECT name, price FROM stocks").show()

+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
|Netflix|   645|
| Amazon|  3518|
|  Tesla|  1222|
|Tencent|   483|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



In [30]:
spark.sql("SELECT name, price FROM stocks WHERE country = 'Korea'").show(), spark.sql("SELECT name, price FROM stocks WHERE price > 2000").show()

+-------+------+
|   name| price|
+-------+------+
|Samsung| 70600|
|  Kakao|125000|
+-------+------+

+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
| Amazon|  3518|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



(None, None)

In [31]:
spark.sql("SELECT name, price FROM stocks WHERE price > 2000 and country = 'USA'").show()

+------+-----+
|  name|price|
+------+-----+
|Google| 2984|
|Amazon| 3518|
+------+-----+



In [32]:
spark.sql("SELECT name, price FROM stocks WHERE country LIKE 'U%'").show() # U로 시작

+-------+-----+
|   name|price|
+-------+-----+
| Google| 2984|
|Netflix|  645|
| Amazon| 3518|
|  Tesla| 1222|
+-------+-----+



In [33]:
spark.sql("SELECT name, price, currency FROM stocks \
WHERE currency = 'USD' AND \
price > (SELECT price FROM stocks WHERE NAME = 'Tesla')").show()

+------+-----+--------+
|  name|price|currency|
+------+-----+--------+
|Google| 2984|     USD|
|Amazon| 3518|     USD|
+------+-----+--------+



In [34]:
spark.sql("SELECT sum(price) FROM stocks WHERE country = 'Korea'").show()

+----------+
|sum(price)|
+----------+
|    195600|
+----------+



In [35]:
spark.sql("SELECT mean(price) FROM stocks WHERE country = 'Korea'").show()

+-----------+
|mean(price)|
+-----------+
|    97800.0|
+-----------+



# **데이터 프레임(DataFrame)**
데이터 프레임은 관계형 데이터셋(RDD + Relation)이다.

RDD가 함수형 API를 가졌다면 데이터 프레임은 선언형 API이다.

스키마를 가졌기 때문에 자동으로 최적화가 가능하다.

타입이 없다.(데이터 프레임 내부적으로 타입을 관제하지 않는다.)

# **데이터 프레임의 특징**
데이터 프레임은 RDD의 확장판이다.

RDD와 같이 지연 실행(Lazy Execution)된다.
분산 저장된다.

불변(immutabel) 데이터이다.

열(row) 객체가 있다.

SQL 쿼리를 직접 바로 실행할 수 있다.

스키마를 가질 수 있고, 이를 통해 성능을 더욱 최적화 할 수 있다.

CSV, JSON, Hive 등으로 읽어오거나 변환이 가능하다.


# **데이터 프레임의 스키마 확인**
**dtypes**
내부 스키마를 볼 수 있다.

In [45]:
df.dtypes

[('_1', 'bigint'), ('_2', 'struct<_1:bigint,_2:string>')]

**show**()
테이블 형태로 데이터를 출력하며 첫 20개의 열만 전시한다.

디버깅할 때 유용하게 쓰인다.

In [46]:
df.show()

+---+-------------+
| _1|           _2|
+---+-------------+
|  1|  {2984, USD}|
|  2|   {645, USD}|
|  3|  {3518, USD}|
|  4|  {1222, USD}|
|  5| {70600, USD}|
|  6|{125000, USD}|
+---+-------------+



**printSchema**()
스키마를 트리 형태로 볼 수 있다.
중첩된 스키마라면 이 방법이 편하다.

In [47]:
df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- _1: long (nullable = true)
 |    |-- _2: string (nullable = true)



**복잡한 데이터 타입**

ArrayType: 변수 타입

MapType: 파이썬의 딕셔너리와 같은 형태

StructType: 오브젝트

**데이터 프레임의 작업**

SQL과 비슷한 작업이 가능하다.

Select

Where

Limit

OrderBy

GroupBy

Join

**Select**
사용자가 원하는 컬럼이나 데이터를 추출하는데 사용한다.

In [49]:
df.select('*').collect()

[Row(_1=1, _2=Row(_1=2984, _2='USD')),
 Row(_1=2, _2=Row(_1=645, _2='USD')),
 Row(_1=3, _2=Row(_1=3518, _2='USD')),
 Row(_1=4, _2=Row(_1=1222, _2='USD')),
 Row(_1=5, _2=Row(_1=70600, _2='USD')),
 Row(_1=6, _2=Row(_1=125000, _2='USD'))]

**Agg**
Aggregate의 약자로, 그룹핑 후 데이터를 하나로 합치는 작업이다.

In [53]:
df.agg({'price': 'max'}).collect()

from pyspark.sql import functions as F
df.agg(F.min(df.price)).collect()

[Row(min(price)=483)]

**GropBy**
사용자가 지정한 컬럼을 기준으로 데이터를 그룹핑하는 작업이다.

In [54]:
df.groupBy('currency').agg({'price': 'mean'}).collect()

[Row(currency='KRW', avg(price)=97800.0),
 Row(currency='JPY', avg(price)=2006.0),
 Row(currency='HKD', avg(price)=483.0),
 Row(currency='USD', avg(price)=2092.25)]

In [55]:
df.groupBy([df.currency, df.price]).count().collect()

[Row(currency='USD', price=1222, count=1),
 Row(currency='USD', price=3518, count=1),
 Row(currency='HKD', price=483, count=1),
 Row(currency='USD', price=645, count=1),
 Row(currency='KRW', price=70600, count=1),
 Row(currency='JPY', price=2006, count=1),
 Row(currency='USD', price=2984, count=1),
 Row(currency='KRW', price=125000, count=1)]

**Join**
다른 데이터 프레임과 사용자가 지정한 컬럼을 기준으로 합치는 작업이다.


In [56]:
df.join(earningsDF, 'name').select(df.name, earningsDF.eps).collect()

[Row(name='Kakao', eps=705.0),
 Row(name='Samsung', eps=1780.0),
 Row(name='Tesla', eps=1.8600000143051147),
 Row(name='Google', eps=27.989999771118164),
 Row(name='Tencent', eps=11.010000228881836),
 Row(name='Toyota', eps=224.82000732421875),
 Row(name='Netflix', eps=2.559999942779541),
 Row(name='Amazon', eps=6.119999885559082)]

In [36]:
earnings = [
    ('Google', 27.99, 'USD'), 
    ('Netflix', 2.56, 'USD'),
    ('Amazon', 6.12, 'USD'),
    ('Tesla', 1.86, 'USD'),
    ('Tencent', 11.01, 'HKD'),
    ('Toyota', 224.82, 'JPY'),
    ('Samsung', 1780., 'KRW'),
    ('Kakao', 705., 'KRW')
]

from pyspark.sql.types import StringType, FloatType, StructType, StructField


# 직접 스키마 타입 설정
earningsSchema = StructType([
    StructField("name", StringType(), True),
    StructField("eps", FloatType(), True),
    StructField("currency", StringType(), True),
])


# 데이터 프레임 생성
earningsDF = spark.createDataFrame(data=earnings, schema=earningsSchema)


earningsDF.dtypes


earningsDF.createOrReplaceTempView("earnings")

In [37]:
earningsDF.select("*").show()

+-------+------+--------+
|   name|   eps|currency|
+-------+------+--------+
| Google| 27.99|     USD|
|Netflix|  2.56|     USD|
| Amazon|  6.12|     USD|
|  Tesla|  1.86|     USD|
|Tencent| 11.01|     HKD|
| Toyota|224.82|     JPY|
|Samsung|1780.0|     KRW|
|  Kakao| 705.0|     KRW|
+-------+------+--------+



In [38]:
spark.sql("SELECT * FROM stocks JOIN earnings ON stocks.name = earnings.name").show()

+-------+------+---------+------+--------+-------+------+--------+
|   name|ticker|  country| price|currency|   name|   eps|currency|
+-------+------+---------+------+--------+-------+------+--------+
|  Kakao|035720|    Korea|125000|     KRW|  Kakao| 705.0|     KRW|
|Samsung|005930|    Korea| 70600|     KRW|Samsung|1780.0|     KRW|
|  Tesla|  TSLA|      USA|  1222|     USD|  Tesla|  1.86|     USD|
| Google| GOOGL|      USA|  2984|     USD| Google| 27.99|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|Tencent| 11.01|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY| Toyota|224.82|     JPY|
|Netflix|  NFLX|      USA|   645|     USD|Netflix|  2.56|     USD|
| Amazon|  AMZN|      USA|  3518|     USD| Amazon|  6.12|     USD|
+-------+------+---------+------+--------+-------+------+--------+



In [39]:
spark.sql("SELECT stocks.name, (stocks.price / earnings.eps) FROM stocks JOIN earnings ON stocks.name = earnings.name").show()

+-------+---------------------------------------------+
|   name|(CAST(price AS DOUBLE) / CAST(eps AS DOUBLE))|
+-------+---------------------------------------------+
|  Kakao|                            177.3049645390071|
|Samsung|                           39.662921348314605|
|  Tesla|                             656.989242258975|
| Google|                            106.6095042658442|
|Tencent|                            43.86920889728746|
| Toyota|                            8.922693419839167|
|Netflix|                            251.9531306315913|
| Amazon|                            574.8366120563447|
+-------+---------------------------------------------+

