# 스파크 실습 환경
> 스파크의 실습을 위한 환경 구성이 정상인지 확인

## 1. 스파크 세션 생성
> 세션 생성에 오류(ERROR) 없이 생성되는지 확인

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

24/08/18 04:28:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 2. 스파크 버전 생성
> Spark 3.1.2 버전 출력이 정상적으로 나오는 지 확인

In [2]:
# !which python
!/opt/conda/bin/python --version
print("spark.version: {}".format((spark.version)))

Python 3.9.6
spark.version: 3.1.2


## 3 PySpark 관련 팁

### 여러 줄의 코드 작성
* python 코드의 경우 괄호로 (python) 묶으면 이스케이핑(\) 하지 않아도 됩니다
* sql 문의 경우  """sql""" 으로 묶으면 이스케이핑(\)하지 않아도 됩니다

### 데이터 출력 함수
* DataFrame.show() - 기본 제공 함수이며, show(n=limit) 통하여 최대 출력을 직접 조정할 수 있으나, 한글 출력 시에 표가 깨지는 경우가 있습니다
* display(DataFrame) - Ipython 함수이며, limit 출력을 위해서는 limit 를 걸어야 하지만, 한글 출력에도 깨지지 않습니다


In [3]:
## 파이썬 코드 여러 줄 작성
json = (
    spark
    .read
    .json(f"{work_data}/tmp/simple.json")
    .limit(2)
)

## 스파크 SQL 여러 줄 작성
json.createOrReplaceTempView("simple")
spark.sql("""
    select * 
    from simple
""")

json.printSchema()
emp_id = json.select("emp_id")

## 표준 데이터 출력함수
json.show()
emp_id.show()

## 노트북 출력함수 
display(json)
display(emp_id)

                                                                                

root
 |-- emp_id: long (nullable = true)
 |-- emp_name: string (nullable = true)

+------+--------+
|emp_id|emp_name|
+------+--------+
|     1|엘지전자|
|     2|엘지화학|
+------+--------+

+------+
|emp_id|
+------+
|     1|
|     2|
+------+



emp_id,emp_name
1,엘지전자
2,엘지화학


emp_id
1
2


In [4]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |   simple|       true|
+--------+---------+-----------+



### 컨테이너 정보 확인
> 컨테이너 내부에 존재하는 파일 등에 대해 직접 접근이 가능합니다 

In [4]:
strings = spark.read.text(f"{home_jovyan}/requirements.txt")
strings.show(5, truncate=False)
count = strings.count()
print("count of word is {}".format(count))

strings.printSchema()

+--------+
|value   |
+--------+
|boto3   |
|scrapy  |
|selenium|
|mrjob   |
|pyspark |
+--------+
only showing top 5 rows

count of word is 9
root
 |-- value: string (nullable = true)



## 4 데이터 읽기
> 아무런 옵션을 주지 않는 경우 스파크가 알아서 컬럼 이름과 데이터 타입을 (string) 지정합니다

In [8]:
log_access = spark.read.csv(f"{work_data}/log_access.csv")
log_access.printSchema()
log_access.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)

+----------+-----+------+
|       _c0|  _c1|   _c2|
+----------+-----+------+
|    a_time|a_uid|  a_id|
|1603645200|    1| login|
|1603647200|    1|logout|
|1603649200|    2| login|
|1603650200|    2|logout|
|1603653200|    2| login|
|1603657200|    3| login|
|1603659200|    3|logout|
|1603660200|    4| login|
|1603664200|    4|logout|
|1603664500|    4| login|
|1603666500|    5| login|
|1603669500|    5|logout|
|1603670500|    6| login|
|1603673500|    7| login|
|1603674500|    8| login|
|1603675500|    9| login|
+----------+-----+------+



> 첫 번째 라인에 헤더가 포함되어 있는 경우 아래와 같이 header option 을 지정하면 컬럼 명을 가져올 수 있습니다

In [9]:
log_access = spark.read.option("header", "true").csv(f"{work_data}/log_access.csv")
log_access.printSchema()
log_access.show()

root
 |-- a_time: string (nullable = true)
 |-- a_uid: string (nullable = true)
 |-- a_id: string (nullable = true)

+----------+-----+------+
|    a_time|a_uid|  a_id|
+----------+-----+------+
|1603645200|    1| login|
|1603647200|    1|logout|
|1603649200|    2| login|
|1603650200|    2|logout|
|1603653200|    2| login|
|1603657200|    3| login|
|1603659200|    3|logout|
|1603660200|    4| login|
|1603664200|    4|logout|
|1603664500|    4| login|
|1603666500|    5| login|
|1603669500|    5|logout|
|1603670500|    6| login|
|1603673500|    7| login|
|1603674500|    8| login|
|1603675500|    9| login|
+----------+-----+------+



## 5. 스파크 애플리케이션

| 구분 | 설명 | 기타 |
|---|---|---|
| Application | 스파크 프레임워크를 통해 빌드한 프로그램. 전체 작업을 관리하는 Driver 와 Executors 상에서 수행되는 프로그램으로 구분합니다 | - |
| SparkSession | 스파크의 모든 기능을 사용하기 위해 생성하는 객체 | - |
| Job | 하나의 액션(save, collect 등)을 수행하기 위해 여러개의 타스크로 구성된 병렬처리 단위 | DAG 혹은 Spark Execution Plan |
| Stage | 하나의 잡은 다수의 스테이지라는 것으로 구성되며, 하나의 스테이지는 다수의 타스크 들로 구성됩니다 | - |
| Task | 스파크 익스큐터에 보내지는 하나의 작업 단위 | 하나의 Core 혹은 Partition 단위의 작업 |

## 6. 스파크 UI
> Default 포트는 4040 이므로 http://localhost:4040 에 접속

## 7. 파이썬 vs 스파크

### 파이썬을 이용하여 1에서 100까지 더하는 함수를 계산합니다

In [10]:
result = 0
for number in range(1, 101, 1): result += number
print(result)

# 파이썬 3.0 에서는 reduce 함수를 사용할 수 있습니다
from functools import reduce 
reduce(lambda x, y: x + y, range(101))

5050


5050

### 스파크 Structured API 를 통해서 1 ~ 100 까지 더하는 함수를 구합니다.

In [11]:
from operator import add  # 파이썬의 operator 의 add 함수를 그대로 사용합니다.
sc = spark.sparkContext
parallels = sc.parallelize((range(1, 101, 1))).reduce(add)  # 1 ~ 101 이전까지 1씩 증가하는 숫자를 분산객체인 RDD를 반드시 생성해야 여러 노드의 메모리에 객체가 생성됩니다.
print(parallels)

x = sc.parallelize((range(1, 101, 1))).reduce(lambda x,y: x+y)  # 파이썬 람다함수를 이용해서 익명함수를 직접 생성해서 전달해도 결과는 동일합니다
print(x)

5050
5050


### 기타 날짜 관련 함수
> bigint 값인 날짜는 아래와 같이 from_unixtime 및 to_timestamp 함수를 통해 변환할 수 있습니다.

In [30]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_timestamp, to_date
timestamp = df.select(
    "Arrival_Time",
    to_timestamp(from_unixtime(col('Arrival_Time') / lit(1000)), 'yyyy-MM-dd HH:mm:ss').alias('String_Datetime'),
    to_date(from_unixtime(col('Arrival_Time') / lit(1000)), 'yyyy-MM-dd HH:mm:ss').alias('String_Date')
)
timestamp.show(5)

+-------------+-------------------+-----------+
| Arrival_Time|    String_Datetime|String_Date|
+-------------+-------------------+-----------+
|1424686735175|2015-02-23 19:18:55| 2015-02-23|
|1424686735377|2015-02-23 19:18:55| 2015-02-23|
|1424686735577|2015-02-23 19:18:55| 2015-02-23|
|1424686735776|2015-02-23 19:18:55| 2015-02-23|
|1424686735979|2015-02-23 19:18:55| 2015-02-23|
+-------------+-------------------+-----------+
only showing top 5 rows

