# UDF(User Defined Function)
- 사용자 정의 함수
- 스파크 데이터프레임에서 사용이 가능, SQL에서도 사용이 가능
- 사용자가 만든 함수를 Worker의 Task에서 사용 가능하도록

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("udf").getOrCreate()
spark

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/29 11:20:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
transactions = [
    ('찹쌀탕수육+짜장2', '2021-11-07 13:20:00', 22000, 'KRW'),
    ('등심탕수육+크림새우+짜장면', '2021-10-24 11:19:00', 21500, 'KRW'),
    ('월남 쌈 2인 세트', '2021-07-25 11:12:40', 42000, 'KRW'),
    ('콩국수+열무비빔국수', '2021-07-10 08:20:00', 21250, 'KRW'),
    ('장어소금+고추장구이', '2021-07-01 05:36:00', 68700, 'KRW'),
    ('족발', '2020-08-19 19:04:00', 32000, 'KRW'),
]

schema = ["name", "datetime", "price", "currency"]

In [5]:
df= spark.createDataFrame(data=transactions, schema=schema)
df

DataFrame[name: string, datetime: string, price: bigint, currency: string]

In [6]:
df.show()

                                                                                

+--------------------------+-------------------+-----+--------+
|                      name|           datetime|price|currency|
+--------------------------+-------------------+-----+--------+
|          찹쌀탕수육+짜장2|2021-11-07 13:20:00|22000|     KRW|
|등심탕수육+크림새우+짜장면|2021-10-24 11:19:00|21500|     KRW|
|          월남 쌈 2인 세트|2021-07-25 11:12:40|42000|     KRW|
|       콩국수+열무비빔국수|2021-07-10 08:20:00|21250|     KRW|
|       장어소금+고추장구이|2021-07-01 05:36:00|68700|     KRW|
|                      족발|2020-08-19 19:04:00|32000|     KRW|
+--------------------------+-------------------+-----+--------+



In [8]:
df.createOrReplaceTempView("transactions")

In [10]:
query= """
SELECT *
FROM transactions
"""
spark.sql(query).show()

+--------------------------+-------------------+-----+--------+
|                      name|           datetime|price|currency|
+--------------------------+-------------------+-----+--------+
|          찹쌀탕수육+짜장2|2021-11-07 13:20:00|22000|     KRW|
|등심탕수육+크림새우+짜장면|2021-10-24 11:19:00|21500|     KRW|
|          월남 쌈 2인 세트|2021-07-25 11:12:40|42000|     KRW|
|       콩국수+열무비빔국수|2021-07-10 08:20:00|21250|     KRW|
|       장어소금+고추장구이|2021-07-01 05:36:00|68700|     KRW|
|                      족발|2020-08-19 19:04:00|32000|     KRW|
+--------------------------+-------------------+-----+--------+



- UDF는 분산 병렬 처리 환경에서 사용할 수 있는 함수 만들 때 사용한다.(Worker에서 작동하는 함수)
- 리턴 타입을 따로 지정하지 않으면 기본적으로 String을 리턴

In [11]:
# 반드시 리턴 있어야함
def squared(n):
    return n * n

In [12]:
from pyspark.sql.types import LongType

# register("Worker에서 사용할 함수의 이름", 마스터(클라이언트)에 정의된 함수의 이름, 리턴 타입)
spark.udf.register("squared", squared, LongType()) # 이름은 함수이름과 동일하게 하면 됨

<function __main__.squared(n)>

In [13]:
query = """
SELECT price, squared(price)
FROM transactions

"""
spark.sql(query).show()

                                                                                

+-----+--------------+
|price|squared(price)|
+-----+--------------+
|22000|     484000000|
|21500|     462250000|
|42000|    1764000000|
|21250|     451562500|
|68700|    4719690000|
|32000|    1024000000|
+-----+--------------+



In [15]:
def read_number(n):
    units = ["", "십", "백", "천", "만"]
    nums = '일이삼사오육칠팔구'
    result = []
    i = 0
    while n > 0:
        n, r = divmod(n, 10)
        if r > 0:
            result.append(nums[r-1]+units[i])
        i += 1
    return "".join(reversed(result))

In [16]:
read_number(29990)

'이만구천구백구십'

In [18]:
spark.udf.register("read_number", read_number) # 리턴 타입을 지정하지 않으면 자동으로 문자열 타입으로 지정

<function __main__.read_number(n)>

In [20]:
query="""
SELECT price, read_number(price)
FROM transactions
"""
spark.sql(query).show()

+-----+------------------+
|price|read_number(price)|
+-----+------------------+
|22000|          이만이천|
|21500|      이만일천오백|
|42000|          사만이천|
|21250|  이만일천이백오십|
|68700|      육만팔천칠백|
|32000|          삼만이천|
+-----+------------------+



# 데이터 프레임에서 사용할 udf 만들기

In [21]:
filepath = "/home/ubuntu/working/spark/data/titanic_train.csv"

# inferSchema : 컬럼 타입을 자동 추론
titanic_sdf = spark.read.csv(filepath, inferSchema=True, header=True)
titanic_sdf.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [22]:
import pyspark.sql.functions as F

avg_age = titanic_sdf.select(F.avg(F.col('Age')))
avg_age_row = avg_age.head()
avg_age_value = avg_age.head()[0]

# Spark DataFrame의 fillna()에 인자로 Dict를 입력하여 여러개의 컬럼들에 대해서 결측치 값을 입력할 수 있게 만들어줌.
titanic_sdf_filled = titanic_sdf.fillna({'Age': avg_age_value,
                                         'Cabin': 'C000',
                                         'Embarked': 'S'})

In [23]:
# 나이의 카테고리를 구하는 함수를 정의
def get_category(age):
    cat = ''

    if age <= 5: cat = 'Baby'
    elif age <= 12: cat = 'Child'
    elif age <= 18: cat = 'Teenager'
    elif age <= 25: cat = 'Student'
    elif age <= 35: cat = 'Young Adult'
    elif age <= 60: cat = 'Adult'
    else : cat = 'Elderly'

    return cat

In [27]:
# 데이터프레임에서 API에서 udf 사용하기
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

udf_get_category = F.udf(lambda x : get_category(x), StringType())
udf_get_category

<function __main__.<lambda>(x)>

In [28]:
titanic_sdf_filled.withColumn("AgeCategory", udf_get_category(F.col("Age"))).select("Age", "AgeCategory").show()

+-----------------+-----------+
|              Age|AgeCategory|
+-----------------+-----------+
|             22.0|    Student|
|             38.0|      Adult|
|             26.0|Young Adult|
|             35.0|Young Adult|
|             35.0|Young Adult|
|29.69911764705882|Young Adult|
|             54.0|      Adult|
|              2.0|       Baby|
|             27.0|Young Adult|
|             14.0|   Teenager|
|              4.0|       Baby|
|             58.0|      Adult|
|             20.0|    Student|
|             39.0|      Adult|
|             14.0|   Teenager|
|             55.0|      Adult|
|              2.0|       Baby|
|29.69911764705882|Young Adult|
|             31.0|Young Adult|
|29.69911764705882|Young Adult|
+-----------------+-----------+
only showing top 20 rows



In [29]:
spark.stop()