# 스파크 SQL과 아파치 하이브

## 사용자 정의 함수(UDF)

### 스파크 SQL UDF

In [2]:
from pyspark.sql.types import LongType
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('chapter5').getOrCreate()

23/09/08 10:13:00 WARN Utils: Your hostname, minseok-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/09/08 10:13:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/08 10:13:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# 큐브 함수 생성
def cubed(s):
    return s * s * s

# UDF로 등록
spark.udf.register('cubed', cubed, LongType())

# 임시 뷰 생성
spark.range(1, 9).createOrReplaceTempView('udf_test')
# spark.range(1, 9)
# DataFrame[id: bigint]
# spark.range(1, 9).show()
# +---+
# | id|
# +---+
# |  1|
# |  2|
# |  3|
# |  4|
# |  5|
# |  6|
# |  7|
# |  8|
# +---+

In [7]:
# 함수 사용
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

[Stage 1:>                                                          (0 + 2) / 2]

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



                                                                                

### 판다스 UDF로 파이스파크 UDF 속도 향상 및 배포
파이스파크 UDF 사용은 성능이 느리다는 단점이 있다. <br>
이 문제를 해결하기 위해 판다스 UDF가 도입되었다. 판다스 UDF는 아파치 애로우를 사용하여 데이터를 전송하고 판다스는 해당 데이터로 작업을 한다.pandas_udf 키워드를 데코레이터로 사용하여 판다스 UDF를 정의하거나 함수 자체를 래핑할 수 있다. 한 번에 한 행씩 처리하는 파이썬 UDF에 비해 최대 100배가지 성능을 향상 시킬 수 있는 벡터화된 연산을 허용한다.<br>
판다스 UDF는 파이썬 3.6 이상 기반의 아파치 스파크 3.0에서 판다스 UDF 및 판다스 함수 API로 분할되었다.

- 판다스 UDF: 아파치 스파크 3.0에서 판다스 UDF는 pandas.Series, pandas.DataFrame, Tuple 및 Iterator와 같은 파이썬 유형 힌트로 판다스 UDF 유형을 유추한다. 현재 판다스 UDF에서는 시리즈와 시리즈, 시리즈 반복자와 시리즈 반복자, 다중 시리즈 반복자와 시리즈 반복자, 시리즈와 스칼라(단일값)을 파이썬 유형 힌트로 지원한다.
- 판다스 함수 API: 판다스 함수 API를 사용하면 파이스파크 데이터 프레임에 입력과 출력이 모두 판다스 인스턴스인 로컬 파이썬 함수를 직접 적용할 수 있다. 스파크 3.0의 경우 판다스 함수 API는 그룹화된 맵, 맵, 공동 그룹화된 맵을 지원한다.

In [10]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf

In [11]:
from pyspark.sql.types import LongType

In [14]:
# # 큐브 함수 선언
# def cubed(a: pd.Series) -> pd.Series:
#     return a * a * a

# # 큐브 함수에 대한 판다스 UDF 생성
# cubed_udf = pandas_udf(cubed, returnType=LongType())

# 오류 발생
# PyArrow >= 1.0.0 must be installed; however, it was not found.

In [19]:
# pip install pyarrow

In [16]:
# 큐브 함수 선언
# 판다스 UDF를 만들기 위한 일반적인 판다스 함수
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a

# 큐브 함수에 대한 판다스 UDF 생성
cubed_udf = pandas_udf(cubed, returnType=LongType())

큐브 계산을 위해 간단한 판다스 시리즈로 로컬 함수 cubed()를 적용해보자

In [17]:
# 판다스 시리즈 생성
x = pd.Series([1, 2, 3])

# 로컬 판다스 데이터를 실행하는 pandas_udf에 대한 함수
print(cubed(x))

0     1
1     8
2    27
dtype: int64


스파크 데이터 프레임으로 전환해보자. 이 함수를 다음과 같이 벡터화된 스파크 UDF로 실행할 수 있다.

In [18]:
# 스파크 데이터 프레임 생성, 'spark'는 기존의 sparkSession과 같다.
df = spark.range(1, 4)

# 벡터화된 스파크 UDF를 함수로 실행
df.select('id', cubed_udf(col('id'))).show()



+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



                                                                                

자세한 내용은 판다스 사용자 정의 함수 문저를 참고<br>
https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html

### 판다스 UDF, 판다스 함수 API에 대한 상세한 내용

Pandas UDF와 Pandas함수 API의 차이점

In [27]:
# Pandas UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf, log2, col

@pandas_udf('long')
def pandas_plus_one(s:pd.Series) -> pd.Series:
    return s + 1

# pandas_plus_one("id") is identically treated as _a SQL expression_ internally.
# Namely, you can combine with other columns, functions and expressions.
spark.range(10).select(
    pandas_plus_one(col("id") - 1) + log2("id") + 1).show()
# log2() 함수는 밑이 2인 로그함수임

+------------------+
|          LOG2(id)|
+------------------+
|              null|
|               0.0|
|               1.0|
| 1.584962500721156|
|               2.0|
| 2.321928094887362|
| 2.584962500721156|
| 2.807354922057604|
|               3.0|
|3.1699250014423126|
+------------------+



In [28]:
# Pandas Function API

from typing import Iterator
import pandas as pd


def pandas_plus_one(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    return map(lambda v: v + 1, iterator)
# 위 함수는 일반 파이썬 함수임


# pandas_plus_one is just a regular Python function, and mapInPandas is
# logically treated as _a separate SQL query plan_ instead of a SQL expression. 
# Therefore, direct interactions with other expressions are impossible.
spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()

# 함수가 실행되는 원리는 아직 잘 모르겠음

                                                                                

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
+---+



또한, 판다스 UDF는 파이썬 타입 힌트가 필요하지만, 판다스 함수 API의 타입 힌트는 현재 선택 사항이다.

현재 Pandas UDF에서 지원되는 파이썬 유형 힌트

- Series to Series
- Iterator of Series to Iterator of Series
- Iterator of Multiple Series to Iterator of Series
- Series to Scalar (a single value)

타입 힌트는 모든 경우에 pandas.Series를 사용해야 한다. 그러나 입력 또는 출력 유형 힌트에 pandas.DataFrame을 대신 사용해야 하는 한 가지 변형이 있다. 입력 또는 출력 열이 StructType인 경우이다. 아래 예제를 살펴보자.

In [29]:
import pandas as pd
from pyspark.sql.functions import pandas_udf


df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1 string>")

df

DataFrame[long_col: bigint, string_col: string, struct_col: struct<col1:string>]

In [30]:
df.show()

                                                                                

+--------+----------+-----------------+
|long_col|string_col|       struct_col|
+--------+----------+-----------------+
|       1|  a string|{a nested string}|
+--------+----------+-----------------+



In [31]:
@pandas_udf("col1 string, col2 long")
def pandas_plus_len(
        s1: pd.Series, s2: pd.Series, pdf: pd.DataFrame) -> pd.DataFrame:
    # Regular columns are series and the struct column is a DataFrame.
    # 타입 힌트 사용시 스파크 데이터프레임의 일반 칼럼은 Series로 사용하고 structtype인 칼럼은 DataFrame으로 사용된다.
    # pdf: pd.DataFrame이므로 pdf는 structtype인 struct_col 칼럼을 의미한다.
    pdf['col2'] = s1 + s2.str.len() 
    return pdf  # the struct column expects a DataFrame to return

df.select(pandas_plus_len("long_col", "string_col", "struct_col")).show()

                                                                                

+-------------------------------------------------+
|pandas_plus_len(long_col, string_col, struct_col)|
+-------------------------------------------------+
|                             {a nested string, 9}|
+-------------------------------------------------+



                                                                                

지원되는 네 가지 유형 힌트에 대해 살펴본다

- Series to Series <br>

함수가 하나 이상의 pandas.Series를 취할 것으로 예상하고 하나의 pandas.Series를 출력한다.

In [37]:
import pandas as pd
from pyspark.sql.functions import pandas_udf       

@pandas_udf('long')
# @pandas_udf('long')은
# cubed_udf = pandas_udf(cubed, returnType=LongType()) 과정을 축약한 과정으로 예상된다.
def pandas_plus_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.range(10).select(pandas_plus_one("id")).show()



+-------------------+
|pandas_plus_one(id)|
+-------------------+
|                  1|
|                  2|
|                  3|
|                  4|
|                  5|
|                  6|
|                  7|
|                  8|
|                  9|
|                 10|
+-------------------+



                                                                                

- Iterator of Series to Iterator of Series <br>

pandas.Series의 이터레이터를 받아 출력한다. 전체 출력의 길이는 전체 입력의 길이와 같아야 한다. 전체 입력과 출력의 길이가 같으면 입력 이터레이터에서 데이터를 미리 가져올 수 있다. 함수는 단일 열을 입력으로 받는다.

In [38]:
from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf       

@pandas_udf('long')
def pandas_plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    return map(lambda s: s + 1, iterator)
    # map은 파이썬 시간에 배운 함수
    # iterator 각 요소를 함수로 처리한 후 반환

spark.range(10).select(pandas_plus_one("id")).show()

[Stage 20:>                                                         (0 + 2) / 2]

+-------------------+
|pandas_plus_one(id)|
+-------------------+
|                  1|
|                  2|
|                  3|
|                  4|
|                  5|
|                  6|
|                  7|
|                  8|
|                  9|
|                 10|
+-------------------+



                                                                                

- Iterator of Multiple Series to Iterator of Series <br>

위 경우와 유사한 특성과 제한을 갖는다. <br>
It is also useful when to use some states and when to prefetch the input data. (이해할 수 없는 말) <br>
전체 출력의 길이가 입력의 길이와 같아야 한다. 위 경우와 달리 여러 열을 입력으로 받는다.

In [42]:
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf       

@pandas_udf("long")
def multiply_two(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    # return (a * b for a, b in iterator)
    # 이번엔 return 형식이 바뀌었네, iterator이기만 하면 되는건가?
    return [a * b for a, b in iterator]
    # 같은 결과가 나왔다

spark.range(10).select(multiply_two("id", "id")).show()

+--------------------+
|multiply_two(id, id)|
+--------------------+
|                   0|
|                   1|
|                   4|
|                   9|
|                  16|
|                  25|
|                  36|
|                  49|
|                  64|
|                  81|
+--------------------+



                                                                                

- Series to Scalar <br>

반환된 Scalar는 파이썬의 int, float 또는 numpy의 numpy.int64, numpy.float64 중 하나일 수 있다.

In [43]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf("double")
def pandas_mean(v: pd.Series) -> float:
    return v.sum()

df.select(pandas_mean(df['v'])).show()
df.groupby("id").agg(pandas_mean(df['v'])).show()
df.select(pandas_mean(df['v']).over(Window.partitionBy('id'))).show()

                                                                                

+--------------+
|pandas_mean(v)|
+--------------+
|          21.0|
+--------------+



                                                                                

+---+--------------+
| id|pandas_mean(v)|
+---+--------------+
|  1|           3.0|
|  2|          18.0|
+---+--------------+





+----------------------------------------------------------------------------------------------+
|pandas_mean(v) OVER (PARTITION BY id ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)|
+----------------------------------------------------------------------------------------------+
|                                                                                           3.0|
|                                                                                           3.0|
|                                                                                          18.0|
|                                                                                          18.0|
|                                                                                          18.0|
+----------------------------------------------------------------------------------------------+



                                                                                

스파크 3.0에서 지원되는 Pandas 함수 API
- grouped map
- map
- co-grouped map <br>

the grouped map Pandas UDF는 group map Pandas Function API로 분류된다. <br>
Pandas 함수 API의 파이썬 타입 힌트는 선택 사항이다.

- Grouped Map <br>

Pandas 함수 API에서 grouped map은 그룹화된 데이터 프레임(df..groupby(...))에서 applyInPandas이다. <br>
이 함수는 각 그룹을 함수안에 각 pandas.DataFrame에 매핑한다.(It maps each group to each pandas.DataFrame in the function.) <br>
출력이 입력과 동일한 길이일 필요는 없다.

In [44]:
import pandas as pd

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    # 선택사항이지만 사용한 모습. 파이선 타입 힌트 사용을 권장
    v = pdf.v
    return pdf.assign(v= v - v.mean())
    # help(pd.DataFrame.assign)
    # Returns a new object with all original columns in addition to new ones.
    # Existing columns that are re-assigned will be overwritten.
    # 이미 'v' 칼럼이 존재하므로 'v'의 칼럼값을 바꾸는 함수가 된다. 마치 withColumn처럼
    # 바꾸는 방식은 v - v.mean()

df.groupby("id").applyInPandas(subtract_mean, schema=df.schema).show()
# groupby와 groupBy는 같은 함수임

[Stage 36:>                                                         (0 + 1) / 1]

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



                                                                                

- Map <br>

Map Pandas 함수 API는 데이터 프레임의 mapInPandas이다. 이 함수는 각 파티션의 모든 배치를 매핑하고 각각을 변환한다. 이 함수는 pandas.DataFrame의 이터레이터를 받아 pandas.DataFrame의 이터레이터를 출력한다. 출력 길이가 입력 크기와 일치할 필요는 없다.

In [51]:
from typing import Iterator
import pandas as pd

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def pandas_filter(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(pandas_filter, schema=df.schema).show()

                                                                                

+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+



- Co-grouped Map <br>

grouped map과 유사하게 각 그룹을 함수안에 각 pandas.DataFrame에 매핑하지만 공통 키로 다른 DataFrame과 그룹화한 다음 함수가 각 코그룹에 적용된다. 마찬가지로 출력 길이에는 제한이 없다.

In [52]:
import pandas as pd

df1 = spark.createDataFrame(
    [(1201, 1, 1.0), (1201, 2, 2.0), (1202, 1, 3.0), (1202, 2, 4.0)],
    ("time", "id", "v1"))
df2 = spark.createDataFrame(
    [(1201, 1, "x"), (1201, 2, "y")], ("time", "id", "v2"))

def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_asof(left, right, on="time", by="id")

df1.groupby("id").cogroup(
    df2.groupby("id")
).applyInPandas(asof_join, "time int, id int, v1 double, v2 string").show()

[Stage 43:>                                                         (0 + 1) / 1]

+----+---+---+---+
|time| id| v1| v2|
+----+---+---+---+
|1201|  1|1.0|  x|
|1202|  1|3.0|  x|
|1201|  2|2.0|  y|
|1202|  2|4.0|  y|
+----+---+---+---+



                                                                                

# 외부 데이터 소스

## JDBC 및 SQL 데이터베이스
스파크 SQL에는 JDBC를 사용하여 다른 데이터베이스에서 데이터를 읽을 수 있는 데이터 소스 API가 포함되어 있다. <br>
시작하려면 JDBC 데이터 소스에 대한 JDBC 드라이버를 지정해야 하며 스파크 클래스 경로에 있어야 한다. $SPARK_HOME 폴더에서 다음과 같은 명령을 실행한다. <br>
`./bin/spark-shell --driver-class-path $database.jar --jars $database.jar` <br>
데이터 소스 API를 사용하여 원격 데이터베이스의 테이블을 데이터 프레임 또는 스파크 SQL 임시 뷰로 로드할 수 있다. 사용자는 데이터 소스 옵션에서 JDBC 연결 속성을 지정할 수 있다. <br>

[스파크가 지원하는 더 많은 일반적인 연결 속성](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option)

### 파티셔닝의 중요성
모든 데이터가 하나의 드라이버 연결을 통해 처리되므로 성능을 크게 저하시킬 수 있을 뿐 아니라 소스 시스템의 리소스를 포화 상태로 만들 수 있다.<br>

파티셔닝 연결 속성 <br>
|속성명|설명|
|-|-|
|numPartitions|테이블 읽기 및 쓰기에서 병렬 처리를 위해 사용할 수 있는 최대 파티션 수이다.|
|partitionColumn|파티션을 결정하는데 사용되는 칼럼이다. 숫자, 날짜 또는 타임스탬프 칼럼이어야 한다.|
|lowerBound|파티션을 나눌 때 설정할 partitionColumn의 최솟값을 의미한다.|
|upperBound|파티션을 나눌 때 설정할 partitionColumn의 최댓값을 의미한다.|

> 예시
> numPartitions: 10
> lowerBound: 1000
> upperBound: 10000
>
> 위와 같은 경우 파티션 크기는 1,000이 되고 10개의 파티션이 생성된다
> 다음 10개의 쿼리를 실행하는 것과 동일하다.
>
> SELECT * FROM table WHERE partitionColumn BETWEEN 1000 AND 2000
> SELECT * FROM table WHERE partitionColumn BETWEEN 2000 AND 3000
> ...
> SELECT * FROM table WHERE partitionColumn BETWEEN 9000 AND 10000

## MySQL
MySQL 데이터베이스에 연결하려면 메이븐 또는 [MySQL](https://mvnrepository.com/artifact/mysql/mysql-connector-java)에서 JBDC jar를 빌드하거나 다운로드한 후에(후자가 더 쉽다.) 클래스 경로에 추가한다. 그런 다음 해당 jar를 지정하여 스파크 셸 또는 pyspark을 시작한다.

`./bin/pyspark --jars mysql-connector-java_8.0.16-bin.jar`

In [None]:
# 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF = (spark
          .read
          .format('jdbc')
          .option('url', 'jdbc:mysql://[DBSERVER]:3306/[DATABASE]')
          .option('driver', 'com.mysql.jdbcDriver')
          .option('dbtable', '[TABLENAME]')
          .option('user', '[USERNAME]')
          .option('password', '[PASSWORD')
          .load())

# 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF
    .write
    .format('jdbc')
    .option('url', 'jdbc:mysql://[DBSERVER]:3306/[DATABASE]')
    .option('driver', 'com.mysql.jdbcDriver')
    .option('dbtable', '[TABLENAME]')
    .option('user', '[USERNAME]')
    .option('password', '[PASSWORD')
    .save())

### PostgreSQL, 애저 코스모스 DB, MS SQL 서버도 비슷

### 기타 외부 데이터 소스
- 아파치 카산드라
- 스노우플레이크
- 몽고DB