### Dataframe에 UDAF 사용해보기

In [0]:
data = [
    {"a": 1, "b": 2},
    {"a": 5, "b": 5}
]

df = spark.createDataFrame(data)

In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import FloatType
import pandas as pd

# Define the UDF
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
    return v.mean()

df.createOrReplaceTempView("test")
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) c FROM test').show()



+---+
|  c|
+---+
|3.5|
+---+



In [0]:
df.agg(averageUDF("b").alias("c")).show()

+---+
|  c|
+---+
|3.5|
+---+



### 하나의 레코드에서 다수의 레코드를 만들어내는 예제 (Order to 1+ Items)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType

order = spark.read.options(delimiter='\t').option("header","true").csv("s3a://s3-geospatial/orders.csv")

In [0]:
order.show(truncate=False)

+------------+-----------------------------------------------------------------------------+
|order_id    |items                                                                        |
+------------+-----------------------------------------------------------------------------+
|860196503764|[{"name": "DAILY SPF", "quantity": 1, "id": 1883727790094}]                  |
|860292645076|[{"name": "DAILY SPF \u2014 Bundle Set", "quantity": 1, "id": 1883875377166}]|
|860320956628|[{"name": "DAILY SPF", "quantity": 1, "id": 1883919974414}]                  |
|860321513684|[{"name": "DAILY SPF", "quantity": 1, "id": 1883920793614}]                  |
|862930665684|[{"name": "DAILY SPF", "quantity": 1, "id": 1887913672718}]                  |
|862975819988|[{"name": "DAILY SPF", "quantity": 1, "id": 1887985827854}]                  |
|862985191636|[{"name": "DAILY SPF \u2014 Bundle Set", "quantity": 1, "id": 1887999164430}]|
|870939295956|[{"name": "DAILY SPF", "quantity": 1, "id": 190014226433

In [0]:
order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- items: string (nullable = true)



In [0]:
# 데이터프레임을 이용해서 해보기
schema = ArrayType(
    StructType([
        StructField("name", StringType()),
        StructField("id", StringType()),
        StructField("quantity", LongType())
    ])
)

items 필드의 내용(JSON)을 앞서 정의한 schema에 맞게 파싱하기

In [0]:
order.withColumn("item_struct", from_json("items", schema)).printSchema()

root
 |-- order_id: string (nullable = true)
 |-- items: string (nullable = true)
 |-- item_struct: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)



In [0]:
order.withColumn("item", explode(from_json("items", schema))).printSchema()

root
 |-- order_id: string (nullable = true)
 |-- items: string (nullable = true)
 |-- item: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- quantity: long (nullable = true)



앞에서 만들어진 DataFrame의 items 필드를 제거하기

In [0]:
order_items = order.withColumn("item", explode(from_json("items", schema))).drop("items")

In [0]:
order_items.show(5, truncate=False)

+------------+------------------------------------------+
|order_id    |item                                      |
+------------+------------------------------------------+
|860196503764|{DAILY SPF, 1883727790094, 1}             |
|860292645076|{DAILY SPF — Bundle Set, 1883875377166, 1}|
|860320956628|{DAILY SPF, 1883919974414, 1}             |
|860321513684|{DAILY SPF, 1883920793614, 1}             |
|862930665684|{DAILY SPF, 1887913672718, 1}             |
+------------+------------------------------------------+
only showing top 5 rows



In [0]:
order_items.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- item: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- quantity: long (nullable = true)



In [0]:
order_items.createOrReplaceTempView("order_items")

In [0]:
spark.sql("""
    SELECT order_id, CAST(average(item.quantity) as decimal) avg_count
    FROM order_items 
    GROUP BY 1 
    ORDER BY 2 DESC""").show(5)

+-------------+---------+
|     order_id|avg_count|
+-------------+---------+
|1816674631892|      500|
|1821860430036|      300|
|2186043064532|      208|
|2118824558804|      200|
|2143034474708|      200|
+-------------+---------+
only showing top 5 rows



In [0]:
spark.sql("""SELECT item.quantity FROM order_items WHERE order_id = '1816674631892'""").show()

+--------+
|quantity|
+--------+
|     500|
+--------+



In [0]:
spark.catalog.listTables()

Out[31]: [Table(name='order_items', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='test', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [0]:
for f in spark.catalog.listFunctions():
    print(f[0])

!
!=
%
&
*
+
-
/
<
<=
<=>
<>
=
==
>
>=
^
abs
acos
acosh
add_months
aes_decrypt
aes_encrypt
aggregate
and
any
any_value
approx_count_distinct
approx_percentile
approx_top_k
array
array_agg
array_append
array_compact
array_contains
array_distinct
array_except
array_intersect
array_join
array_max
array_min
array_position
array_remove
array_repeat
array_size
array_sort
array_union
arrays_overlap
arrays_zip
ascii
asin
asinh
assert_true
atan
atan2
atanh
avg
base64
between
bigint
bin
binary
bit_and
bit_count
bit_get
bit_length
bit_or
bit_reverse
bit_xor
bool_and
bool_or
boolean
bround
btrim
cardinality
case
cast
cbrt
ceil
ceiling
char
char_length
character_length
charindex
chr
cloud_files
coalesce
collect_list
collect_set
concat
concat_ws
contains
conv
corr
cos
cosh
cot
count
count_if
count_min_sketch
covar_pop
covar_samp
crc32
csc
cume_dist
curdate
current_catalog
current_database
current_date
current_metastore
current_oauth_custom_identity_claim
current_schema
current_timestamp
current_time