In [1]:
# [+] SparkSession 설정
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('udf').getOrCreate()

In [15]:
# 샘플 데이터
people_age = [
    ('이건호', 35),
    ('김솔비', 45),
    ('정우진', 55),
    ('오승준', 25),
    ('홍정화', 65)
]


In [34]:
# [+] 스키마 정의
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
schema = ['name','age']

In [35]:
# [+] 데이터프레임 생성
df = spark.createDataFrame(data = people_age, schema = schema)

In [18]:
# [+] 데이터프레임 출력
df.show()

+------+---+
|  name|age|
+------+---+
|이건호| 35|
|김솔비| 45|
|정우진| 55|
|오승준| 25|
|홍정화| 65|
+------+---+



In [19]:
# [+] 데이터프레임 스키마 출력
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [20]:
# [+] Temporary View 생성
df.createOrReplaceTempView('name_age')

In [21]:
# Annotation 방식으로 UDF 등록하기
from pyspark.sql.functions import udf

In [22]:
# UDF 
def age_category(age):
    if age < 35:
        return 'young' 
    elif 35 <= age and age <= 59:
        return 'adult'
    else:
        return 'senior'

In [23]:
# udf 등록
spark.udf.register("age_category",age_category)

<function __main__.age_category(age)>

In [24]:
# SQL문처리
spark.sql("SELECT name, age_category(age) AS age_category FROM name_age").show()

+------+------------+
|  name|age_category|
+------+------------+
|이건호|       adult|
|김솔비|       adult|
|정우진|       adult|
|오승준|       young|
|홍정화|      senior|
+------+------------+

