In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local") \
.appName("chapter7") \
.getOrCreate()

24/10/03 15:55:11 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.99 instead (on interface en0)
24/10/03 15:55:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/03 15:55:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferschema", "true") \
.load("./data/retail-data/all/*.csv") \
.coalesce(5)

df.cache()
df.createOrReplaceTempView("dfTable")

                                                                                

In [7]:
df.count()

                                                                                

541909

## 집계 함수

In [8]:
from pyspark.sql.functions import *

# 특정 column을 count

df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [9]:
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [10]:
df.select(sum_distinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [11]:
# avg

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases")) \
.selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



In [12]:
# 분산과 표준 편차
# 분산: 평균과의 차이를 제곱한 결과의 평균
# 표준편차: 분산의 제곱근


df.select(var_pop("Quantity"), var_samp("Quantity"), stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660879| 47559.39140929848|  218.08095663447733|   218.08115785023355|
+-----------------+------------------+--------------------+---------------------+



In [13]:
# 복합 데이터 타입의 집계
# 스파크는 수식을 이용한 집계 뿐만 아니라 복합 데이터 타입을 사용해 집계를 수행할 수 있다. 예를 들어 특정 컬럼의 값을 리스트로 수집하거나 셋 데이터 타입으로 고윳값을 수집할 수 있다. 

df.agg(collect_set("Country"), collect_list("Country")).show()

                                                                                

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



## 그룹화

지금까지 DataFrame 수준의 집계만 다뤘다. 하지만 데이터 **그룹** 기반의 집계를 수행하는 경우가 더 많다. 데이터 그룹 기반의 집계는 단일 컬럼의 데이터를 그룹화하고 해당 그룹의 다른 여러 컬럼을 사용해서 계산하기 위해 카테고리형 데이터를 사용한다.

In [14]:
# 표현식을 이용한 그룹화

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



In [15]:
# 맵을 이용한 그룹화

df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"), expr("stddev_pop(Quantity)")).show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

## 윈도우 함수

**윈도우 함수**를 집계에 사용할 수도 있다. 윈도우 함수는 데이터의 특정 윈도우를 대상으로 고유의 집계 연산을 수행한다. 데이터의 윈도우는 현재 데이터에 대한 참조를 사용해 정의한다.

윈도우 명세는 함수에 전달될 로우를 결정한다. 표준 group-by 함수와 유사해 보일수도 있으므로 차이점을 알아본다.

group-by 함수를 사용하면 모든 로우 레코드가 단일 그룹으로만 이동한다. 윈도우 함수는 프레임에 입력되는 모든 로우에 대해 결과값을 계산한다. 

프레임은 로우 그룹 기반의 테이블을 의미한다. 각 로우는 하나 이상의 프레임에 할당될 수 있다. 가장 흔하게 사용되는 방법 중 하나는 하루를 나타내는 값의 롤링 평균을 구하는 것이다.

In [29]:
# 주문 일자(InvoiceDate) 컬럼을 변환해 date 컬럼을 만든다. 이 컬럼은 시간 정보를 제외한 날짜 정보만을 가진다.

dfWithDate = df.withColumn("date", to_date(to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")))
dfWithDate.createOrReplaceTempView("dfWithDate")

윈도우 함수를 정의하기 위해 첫 번째로 윈도우 명세를 만든다. 여기서 사용하는 `partitionBy` 메서드는 지금까지 사용해온 파티셔닝 스키마의 개념과는 관련이 없으며 그룹을 어떻게 나눌지 결정하는 것과 유사한 개념이다.

`orderBy` 메서드는 파티션의 정렬 방식을 정의한다. 그리고 프레임 명세(`rowsBetween` 구문)는 입력된 로우의 참조를 기반으로 프레임에 로우가 포함될 수 있는지 결정한다. 

In [30]:
from pyspark.sql.window import Window

windowSpec = Window \
.partitionBy("CustomerId", "date") \
.orderBy(desc("Quantity")) \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

ERROR! Session/line number was not unique in database. History logging moved to new session 4


이제 집계 함수를 사용해 고객을 좀 더 자세히 살펴본다. 여기서는 시간대별 최대 구매 개수를 구하는 예제이다.

In [31]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [32]:
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [33]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

                                                                                

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

                                                                                

## 그룹화 셋

지금까지는 컬럼의 값을 이용해 여러 컬럼을 집계하는 데 `group-by` 표현식을 사용했다. 때로는 여러 그룹에 걸쳐 집계할 수 있는 무언가가 필요할 수 있다.

**그룹화 셋**이 바로 그 주인공이다. 그룹화 셋은 여러 집계를 결합하는 저수준 기능이다. 그룹화 셋을 이용하면 `group-by` 구문에서 원하는 형태로 집계를 생성할 수 있다.

### 롤업

다양한 컬럼을 그룹화 키로 설정하면 그룹화 키로 설정된 조합 뿐만 아니라 데이터셋에서 볼 수 있는 실제 조합을 모두 살펴볼 수 있다. 

롤업은 `group-by` 스타일의 다양한 연산을 수행할 수 있는 다차원 집계 기능이다.

다음 예제에서는 시간(신규 Date 컬럼)과 공간(Country 컬럼)을 축으로 하는 롤업을 생성한다. 롤업의 결과로 생성된 DataFrame은 모든 날짜의 총합, 날짜별 총합, 날짜별 국가별 총합을 포함한다.

In [34]:
dfNoNull = dfWithDate.na.drop()

In [35]:
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity")) \
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity") \
.orderBy("Date")

rolledUpDF.show()

[Stage 42:>                                                         (0 + 1) / 1]

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      NULL|          NULL|       4906888|
|2010-12-01|United Kingdom|         21167|
|2010-12-01|        France|           449|
|2010-12-01|     Australia|           107|
|2010-12-01|          NULL|         24032|
|2010-12-01|        Norway|          1852|
|2010-12-01|       Germany|           117|
|2010-12-01|          EIRE|           243|
|2010-12-01|   Netherlands|            97|
|2010-12-02|          EIRE|             4|
|2010-12-02|          NULL|         20855|
|2010-12-02|United Kingdom|         20705|
|2010-12-02|       Germany|           146|
|2010-12-03|      Portugal|            65|
|2010-12-03|        Poland|           140|
|2010-12-03|         Spain|           400|
|2010-12-03|       Belgium|           528|
|2010-12-03|        France|           239|
|2010-12-03|          NULL|         11548|
|2010-12-03|   Switzerland|           110|
+----------

                                                                                

### 큐브

큐브는 롤업을 고차원적으로 사용할 수 있게 해준다. 큐브는 요소들을 계층적으로 다루는 대신 모든 차원에 대해 동일한 작업을 수행한다. 즉, 전체 기간에 대해 날짜와 국가별 결과를 얻을 수 있다.

메서드 호출 방식은 롤업과 매우 유사하며 `rollup`메서드 대신 `cube` 메서드를 호출한다.

In [36]:
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity"))) \
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

[Stage 45:>                                                         (0 + 1) / 1]

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|NULL|              France|       109848|
|NULL|               Italy|         7999|
|NULL|             Finland|        10666|
|NULL|           Lithuania|          652|
|NULL|              Poland|         3653|
|NULL|             Iceland|         2458|
|NULL|               Japan|        25218|
|NULL|United Arab Emirates|          982|
|NULL|      United Kingdom|      4008533|
|NULL|              Greece|         1556|
|NULL|      Czech Republic|          592|
|NULL|  European Community|          497|
|NULL|              Sweden|        35637|
|NULL|             Lebanon|          386|
|NULL|              Canada|         2763|
|NULL|           Australia|        83653|
|NULL|             Denmark|         8188|
|NULL|        Saudi Arabia|           75|
|NULL|                NULL|      4906888|
|NULL|           Singapore|         5234|
+----+--------------------+-------

                                                                                

### 피벗

피벗을 사용해 로우를 컬럼으로 변환할 수 있다. 현재 데이터셋에서는 Country 컬럼이 있다. 피벗을 사용해 국가별로 집계 함수를 적용할 수 있으며 쿼리를 사용해 쉽게 결과를 확인할 수 있다.

In [37]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [39]:
pivoted.where("date > '2011-12-05'").select("date", "`USA_sum(Quantity)`").show()

+----------+-----------------+
|      date|USA_sum(Quantity)|
+----------+-----------------+
|2011-12-06|             NULL|
|2011-12-09|             NULL|
|2011-12-08|             -196|
|2011-12-07|             NULL|
+----------+-----------------+

