이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.

**Spark Session:** 생성

In [1]:
from pyspark.sql import SparkSession
import findspark
findspark.init()

spark = SparkSession \
    .builder \
    .appName("Python Spark UDF") \
    .getOrCreate()

### Dataframe/SQL에 UDF 사용해보기 #1

In [2]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data, schema=columns)
df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

upperUDF = F.udf(lambda z:z.upper())   

df.withColumn("Curated Name", upperUDF("Name")) \
  .show(truncate=False)

+-----+------------+------------+
|Seqno|Name        |Curated Name|
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



In [4]:
def upper_udf(s):
    return s.upper()

In [5]:
# default가 stringType
upperUDF = F.udf(upper_udf, StringType())   

df.withColumn("Curated Name", upperUDF("Name")) \
   .show(truncate=False)

+-----+------------+------------+
|Seqno|Name        |Curated Name|
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



In [6]:
df.select("Name", upperUDF("Name").alias("Curated Name")).show()

+------------+------------+
|        Name|Curated Name|
+------------+------------+
|  john jones|  JOHN JONES|
|tracey smith|TRACEY SMITH|
| amy sanders| AMY SANDERS|
+------------+------------+



In [7]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
    return s.str.upper()

In [8]:
# 위에서 정의한 파이썬 upper 함수를 그대로 사용

upperUDF = spark.udf.register("upper_udf", upper_udf_f)
spark.sql("SELECT upper_udf('aBcD')").show()

+---------------+
|upper_udf(aBcD)|
+---------------+
|           ABCD|
+---------------+



In [9]:
df.select("name", upperUDF("name")).show()

+------------+-----------------+
|        name|upper_udf_f(name)|
+------------+-----------------+
|  john jones|       JOHN JONES|
|tracey smith|     TRACEY SMITH|
| amy sanders|      AMY SANDERS|
+------------+-----------------+



In [10]:
df.createOrReplaceTempView("test")
spark.sql("""
    SELECT name, upper_udf(name) `Curated Name` FROM test
""").show()

+------------+------------+
|        name|Curated Name|
+------------+------------+
|  john jones|  JOHN JONES|
|tracey smith|TRACEY SMITH|
| amy sanders| AMY SANDERS|
+------------+------------+



### Dataframe/SQL에 UDF 사용해보기 #2

In [11]:
data = [
    {"a": 1, "b": 2},
    {"a": 5, "b": 5}
]

df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b")).show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  5|  5| 10|
+---+---+---+



In [12]:
def plus(x, y):
    return x + y

plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2) sum").show()

+---+
|sum|
+---+
|  3|
+---+



In [13]:
df.withColumn("p", plusUDF("a", "b")).show()

+---+---+---+
|  a|  b|  p|
+---+---+---+
|  1|  2|  3|
|  5|  5| 10|
+---+---+---+



In [14]:
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) p FROM test").show()

+---+---+---+
|  a|  b|  p|
+---+---+---+
|  1|  2|  3|
|  5|  5| 10|
+---+---+---+



### Dataframe에 UDAF 사용해보기

In [15]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

# Define the UDF
@pandas_udf(FloatType())
def average_udf_f(v: pd.Series) -> float:
    return v.mean()

averageUDF = spark.udf.register('average_udf', average_udf_f)
spark.sql('SELECT average_udf(a) FROM test').show()

+--------------+
|average_udf(a)|
+--------------+
|           3.0|
+--------------+



In [16]:
df.agg(averageUDF("b").alias("count")).show()

+-----+
|count|
+-----+
|  3.5|
+-----+



### DataFrame에 explode 사용해보기

In [17]:
arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]

df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df.show()

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               null|                null|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+



In [18]:
# knownLanguages 필드를 언어별로 짤라서 새로운 레코드로 생성
from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages))
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------+
|     name|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|  Michael|  null|
|   Robert|CSharp|
|   Robert|      |
|Jefferson|     1|
|Jefferson|     2|
+---------+------+



### 하나의 레코드에서 다수의 레코드를 만들어내는 예제 (Order to 1+ Items)

In [20]:
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/orders.csv

--2023-01-24 06:03:34--  https://s3-geospatial.s3.us-west-2.amazonaws.com/orders.csv
Resolving s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)... 52.218.176.185, 52.92.180.18, 3.5.80.192, ...
Connecting to s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)|52.218.176.185|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 89951 (88K) [text/csv]
Saving to: ‘orders.csv’


2023-01-24 06:03:35 (653 KB/s) - ‘orders.csv’ saved [89951/89951]



In [21]:
!head -5 orders.csv

order_id	items
860196503764	[{"name": "DAILY SPF", "quantity": 1, "id": 1883727790094}]
860292645076	[{"name": "DAILY SPF \u2014 Bundle Set", "quantity": 1, "id": 1883875377166}]
860320956628	[{"name": "DAILY SPF", "quantity": 1, "id": 1883919974414}]
860321513684	[{"name": "DAILY SPF", "quantity": 1, "id": 1883920793614}]


### Spark으로 해보기

In [19]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType

order = spark.read.options(delimiter='\t').option("header","true").csv("./data/orders.csv")

In [20]:
order.show()

+------------+--------------------+
|    order_id|               items|
+------------+--------------------+
|860196503764|[{"name": "DAILY ...|
|860292645076|[{"name": "DAILY ...|
|860320956628|[{"name": "DAILY ...|
|860321513684|[{"name": "DAILY ...|
|862930665684|[{"name": "DAILY ...|
|862975819988|[{"name": "DAILY ...|
|862985191636|[{"name": "DAILY ...|
|870939295956|[{"name": "DAILY ...|
|880188063956|[{"name": "DAILY ...|
|933014601940|[{"name": "DAILY ...|
|934065930452|[{"name": "DAILY ...|
|938210722004|[{"name": "DAILY ...|
|944748331220|[{"name": "DAILY ...|
|862843896020|[{"name": "DAILY ...|
|862959763668|[{"name": "DAILY ...|
|870966558932|[{"name": "DAILY ...|
|887936647380|[{"name": "DAILY ...|
|908426477780|[{"name": "DAILY ...|
|921300107476|[{"name": "DAILY ...|
|932229710036|[{"name": "DAILY ...|
+------------+--------------------+
only showing top 20 rows



In [21]:
order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- items: string (nullable = true)



In [23]:
# 데이터프레임을 이용해서 해보기
struct = ArrayType(
    StructType([
        StructField("name", StringType()),
        StructField("id", StringType()),
        StructField("quantity", LongType())
    ])
)

In [24]:
order.withColumn("item", explode(from_json("items", struct))).show(truncate=False)

+------------+-----------------------------------------------------------------------------+------------------------------------------+
|order_id    |items                                                                        |item                                      |
+------------+-----------------------------------------------------------------------------+------------------------------------------+
|860196503764|[{"name": "DAILY SPF", "quantity": 1, "id": 1883727790094}]                  |{DAILY SPF, 1883727790094, 1}             |
|860292645076|[{"name": "DAILY SPF \u2014 Bundle Set", "quantity": 1, "id": 1883875377166}]|{DAILY SPF — Bundle Set, 1883875377166, 1}|
|860320956628|[{"name": "DAILY SPF", "quantity": 1, "id": 1883919974414}]                  |{DAILY SPF, 1883919974414, 1}             |
|860321513684|[{"name": "DAILY SPF", "quantity": 1, "id": 1883920793614}]                  |{DAILY SPF, 1883920793614, 1}             |
|862930665684|[{"name": "DAILY SPF", "quantity":

In [25]:
order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")

In [26]:
order_items.show(5)

+------------+--------------------+
|    order_id|                item|
+------------+--------------------+
|860196503764|{DAILY SPF, 18837...|
|860292645076|{DAILY SPF — Bund...|
|860320956628|{DAILY SPF, 18839...|
|860321513684|{DAILY SPF, 18839...|
|862930665684|{DAILY SPF, 18879...|
+------------+--------------------+
only showing top 5 rows



In [27]:
order_items.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- item: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- quantity: long (nullable = true)



In [28]:
order_items.createOrReplaceTempView("order_items")

In [30]:
spark.sql("""
    SELECT order_id, CAST(average_udf(item.quantity) as decimal) avg_count
    FROM order_items 
    GROUP BY 1 
    ORDER BY 2 DESC""").show(5)

+-------------+---------+
|     order_id|avg_count|
+-------------+---------+
|1816674631892|      500|
|1821860430036|      300|
|2186043064532|      208|
|2118824558804|      200|
|2143034474708|      200|
+-------------+---------+
only showing top 5 rows



In [31]:
spark.sql("""SELECT item.quantity FROM order_items WHERE order_id = '1816674631892'""").show()

+--------+
|quantity|
+--------+
|     500|
+--------+



In [32]:
spark.catalog.listTables()

[Table(name='order_items', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='test', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [33]:
# udf function 뿐만 아니라 native function도 출력된다.
for f in spark.catalog.listFunctions():
    print(f[0])

!
!=
%
&
*
+
-
/
<
<=
<=>
<>
=
==
>
>=
^
abs
acos
acosh
add_months
aes_decrypt
aes_encrypt
aggregate
and
any
any_value
approx_count_distinct
approx_percentile
array
array_agg
array_append
array_compact
array_contains
array_distinct
array_except
array_insert
array_intersect
array_join
array_max
array_min
array_position
array_remove
array_repeat
array_size
array_sort
array_union
arrays_overlap
arrays_zip
ascii
asin
asinh
assert_true
atan
atan2
atanh
avg
base64
between
bigint
bin
binary
bit_and
bit_count
bit_get
bit_length
bit_or
bit_xor
bool_and
bool_or
boolean
bround
btrim
cardinality
case
cast
cbrt
ceil
ceiling
char
char_length
character_length
chr
coalesce
collect_list
collect_set
concat
concat_ws
contains
conv
convert_timezone
corr
cos
cosh
cot
count
count_if
count_min_sketch
covar_pop
covar_samp
crc32
csc
cume_dist
curdate
current_catalog
current_database
current_date
current_schema
current_timestamp
current_timezone
current_user
date
date_add
date_diff
date_format
date_from_unix_da