In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local") \
.appName("chapter6") \
.getOrCreate()

24/09/26 16:11:39 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.31 instead (on interface en0)
24/09/26 16:11:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/26 16:11:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("./data/retail-data/by-day/2010-12-01.csv")

df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



                                                                                

In [3]:
df.createOrReplaceTempView("dfTable")

## 스파크 데이터 타입으로 변환하기

In [4]:
from pyspark.sql.functions import *

df.select(lit(5), lit("five"), lit(5.0))

DataFrame[5: int, five: string, 5.0: double]

## 불리언 데이터 타입 다루기

In [5]:
df.where(col("InvoiceNo") != 536365) \
.select("InvoiceNo", "Description") \
.show(5, False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [6]:
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descriptFilter = instr(col("Description"), "POSTAGE") >= 1
                       
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descriptFilter)) \
    .where("isExpensive") \
    .select("unitPrice", "isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



## 수치형 데이터 타입 다루기 

In [7]:
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(5)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
|   17850.0|             489.0|
|   17850.0|          418.7156|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 5 rows



In [8]:
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()

+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+



## 문자열 데이터 타입 다루기

In [9]:
# initcap 함수는 주어진 문자열에서 공백으로 나뉘는 모든 단어의 첫 글자를 대문자로 변경

df.select(initcap(col("Description"))).show()

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
|Cream Cupid Heart...|
|Knitted Union Fla...|
|Red Woolly Hottie...|
|Set 7 Babushka Ne...|
|Glass Star Froste...|
|Hand Warmer Union...|
|Hand Warmer Red P...|
|Assorted Colour B...|
|Poppy's Playhouse...|
|Poppy's Playhouse...|
|Feltcraft Princes...|
|Ivory Knitted Mug...|
|Box Of 6 Assorted...|
|Box Of Vintage Ji...|
|Box Of Vintage Al...|
|Home Building Blo...|
|Love Building Blo...|
|Recipe Box With M...|
+--------------------+
only showing top 20 rows



In [10]:
# lpad, ltrim, rpad, rtrim, trim 함수를 사용해 문자열 주변의 공백을 제거하거나 추가할 수 있다.

df.select(
    trim(lit("     HELLO      ")).alias("trim")).show(5)

+-----+
| trim|
+-----+
|HELLO|
|HELLO|
|HELLO|
|HELLO|
|HELLO|
+-----+
only showing top 5 rows



## 정규표현식

스파크는 자바 정규 표현식이 가진 강력한 능력을 활용한다. 스파크는 정규 표현식을 위해 `regexp_extract`와 `regexp_replace` 함수를 제공한다. 

In [11]:
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"

df.select(
    regexp_replace(col("Description"), regex_string, "COLOR").alias("Color_Clean"),
    col("Description")).show(5)

+--------------------+--------------------+
|         Color_Clean|         Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR METAL LANTERN| WHITE METAL LANTERN|
|CREAM CUPID HEART...|CREAM CUPID HEART...|
|KNITTED UNION FLA...|KNITTED UNION FLA...|
|COLOR WOOLLY HOTT...|RED WOOLLY HOTTIE...|
+--------------------+--------------------+
only showing top 5 rows



In [12]:
# 때로는 값 추출 없이 단순히 값의 존재 여부를 확인하고 싶을 때가 있다. 이때 contain 메서드를 사용한다.
# 파이썬에서는 instr 함수를 사용해 값의 존재 여부를 나타낸다.

containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1

df.withColumn("hasSimpleColor", containsBlack | containsWhite) \
.where("hasSimpleColor") \
.select("Description").show(5)

+--------------------+
|         Description|
+--------------------+
|WHITE HANGING HEA...|
| WHITE METAL LANTERN|
|RED WOOLLY HOTTIE...|
|WHITE HANGING HEA...|
| WHITE METAL LANTERN|
+--------------------+
only showing top 5 rows



## 날짜와 타임스탬프 데이터 타입 다루기

In [13]:
dateDF = spark.range(10) \
.withColumn("today", current_date()) \
.withColumn("now", current_timestamp()) \

dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [14]:
# 위의 만든 DateFrame을 사용해 오늘을 기준으로 5일 전후의 날짜를 구해본다.

dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(5)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2024-09-21|        2024-10-01|
|        2024-09-21|        2024-10-01|
|        2024-09-21|        2024-10-01|
|        2024-09-21|        2024-10-01|
|        2024-09-21|        2024-10-01|
+------------------+------------------+
only showing top 5 rows



In [15]:
# 두 날짜의 차이를 구하는 방법도 자주 발생한다. 두 날짜 사이의 일수를 변환하는 datediff 함수를 사용해 이러한 작업을 수행할 수 있다.

dateDF.withColumn("week_ago", date_sub(col("today"), 7)) \
.select(datediff(col("week_ago"), col("today"))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row



In [16]:
spark.range(5).withColumn("date", lit("2024-01-01")) \
.select(to_date(col("date"))).show()

+-------------+
|to_date(date)|
+-------------+
|   2024-01-01|
|   2024-01-01|
|   2024-01-01|
|   2024-01-01|
|   2024-01-01|
+-------------+



지정한 날짜 형식에 맞춰 데이터가 들어온다면 문제가 없다. 하지만 날짜 형식을 지키지 않은 데이터가 들어온다면 디버깅하기 매우 어려워진다. 이를 해결하기 위해 자바의 SimpleDateFormat 표준에 맞춰 날짜 포맷을 지정할 수 있다. 문제를 해결하기 위해 `to_date` 함수와 `to_timestamp` 함수를 사용한다. to_date 함수는 필요에 따라 날짜 포맷을 지정할 수 있지만 to_timestamp 함수는 반드시 날짜 포맷을 지정해야 한다.

## null 값 다루기 

스파크의 `coalesce` 함수는 인수로 지정한 여러 칼럼 중 null이 아닌 첫 번째 값을 반환한다. 모든 컬럼이 null이 아닌 값을 가지는 경우 첫 번째 컬럼의 값을 반환한다.

이외에도 `na.drop()`: null 값 삭제, `na.fill()`: null 값 채우기, `na.replace()`: null 값 치환하기 등을 수행할 수 있다.

In [17]:
df.select(coalesce(col("Description"), col("CustomerId"))).show(5)

+---------------------------------+
|coalesce(Description, CustomerId)|
+---------------------------------+
|             WHITE HANGING HEA...|
|              WHITE METAL LANTERN|
|             CREAM CUPID HEART...|
|             KNITTED UNION FLA...|
|             RED WOOLLY HOTTIE...|
+---------------------------------+
only showing top 5 rows



## 복합 데이터 다루기 

복합 데이터 타입을 사용하면 해결하려는 문제에 더욱 적합한 방식을 데이터를 구성하고 구조화할 수 있다. 복합 데이터는 구조체, 배열, 맵이 있다.

In [18]:
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")

complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))

DataFrame[complex.Description: string]

explode 함수는 배열 타입의 컬럼을 입력받는다. 그리고 입력된 컬럼의 배열값에 포함된 모든 값을 로우로 변환한다. 

In [20]:
df.withColumn("splitted", split(col("Description"), " ")) \
.withColumn("exploded", explode(col("splitted"))) \
.select("Description", "InvoiceNo", "exploded").show(5)

+--------------------+---------+--------+
|         Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE HANGING HEA...|   536365|   WHITE|
|WHITE HANGING HEA...|   536365| HANGING|
|WHITE HANGING HEA...|   536365|   HEART|
|WHITE HANGING HEA...|   536365| T-LIGHT|
|WHITE HANGING HEA...|   536365|  HOLDER|
+--------------------+---------+--------+
only showing top 5 rows



맵은 map 함수와 컬럼의 키-값 쌍을 이용해 생성한다. 그리고 배열과 동일한 방법으로 값을 선택할 수 있다. 

In [24]:
# key = Description , value = InvoiceNo
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map")) \
.show(5)

+--------------------+
|         complex_map|
+--------------------+
|{WHITE HANGING HE...|
|{WHITE METAL LANT...|
|{CREAM CUPID HEAR...|
|{KNITTED UNION FL...|
|{RED WOOLLY HOTTI...|
+--------------------+
only showing top 5 rows



In [25]:
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(5)

+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|                            NULL|
|                          536365|
|                            NULL|
|                            NULL|
|                            NULL|
+--------------------------------+
only showing top 5 rows



## 사용자 정의 함수

In [28]:
udfExampleDF = spark.range(5).toDF("num")

def power3(double_value):
    return double_value ** 3

power3(2.0)

8.0

함수를 만들고 테스트를 완료했으니 모든 워커 노드에서 생성된 함수를 사용할 수 있도록 스파크에 등록해야 한다. 

스파크는 드라이버에서 함수를 직렬화하고 네트워크를 통해 모든 익스큐터 프로세서로 전달한다. 이 과정은 모든 언어에서 똑같다.

스파크는 워커 노드에 파이썬 프로세스를 실행하고 파이썬이 이해할 수 있는 포맷으로 모든 데이터를 직렬화한다. 

그리고 파이썬 프로세스에 있는 데이터의 로우마다 함수를 실행하고 마지막으로 JVM과 스파크에 처리 결과를 반환한다.

In [30]:
power3udf = udf(power3)

In [31]:
udfExampleDF.select(power3udf(col("num"))).show(5)

[Stage 25:>                                                         (0 + 1) / 1]

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+



                                                                                