## titanic_train.csv 파일을 로드하고, 이를 DataFrame으로 변환

In [0]:
#spark.read.csv() 메소드를 이용하여 csv 파일을 로드하고 DataFrame으로 변환. 
# pandas_df = pd.read_csv('/FileStore/tables/titanic_train.csv', header='infer')
titanic_sdf = spark.read.csv('/FileStore/tables/titanic_train.csv', header=True, inferSchema=True)

# pandas DataFrame을 spark DataFrame으로 부터 생성. 
titanic_pdf = titanic_sdf.select('*').toPandas()
display(titanic_sdf.limit(10))

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


In [0]:
titanic_sdf.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



## spark DataFrame의 orderBy() 알아보기
- spark DataFrame의 orderBy() 메소드는 1개 이상의 컬럼순으로 정렬할 수 있는 기능. orderBy() 결과는 DataFrame으로 반환.
- 정렬 컬럼은 문자열, 또는 컬럼 형태로 입력할 수 있으며, 정렬 컬럼이 여러개일 경우 개별 컬럼을 인자로 넣거나 list로도 넣을 수 있음.
- 오름차순, 내림차순 구분은 ascending=True/False로 구분
- 정렬 컬럼이 여러개 일때 개별 컬럼별로 서로 다른 정렬 옵션을 적용할 경우(예를 들어 컬럼1은 오름차순, 컬럼2는 내림차순) ascending=[True, False]와 같은 형태로 이용.

In [0]:
# Name 컬럼으로 오름차순으로 정렬 
titanic_pdf_sorted_01 = titanic_pdf.sort_values(by=['Name'], ascending=True)

# Pclass와 Name 컬럼으로 내림차순 정렬
titanic_pdf_sorted_02 = titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=False)

# Pclass는 오름차순, Name은 내림차순 정렬
titanic_pdf_sorted_03 = titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=[True, False])
display(titanic_pdf_sorted_01)
display(titanic_pdf_sorted_02)
display(titanic_pdf_sorted_03)

In [0]:
from pyspark.sql.functions import col

# orderBy에 컬럼명을 문자열로 지정. 
print("orderBy에 컬럼명을 문자열로 지정하고 내림 차순 정렬")
titanic_sdf.orderBy("Name", ascending=False).show() # select * from titanic_sdf order by Name desc

# orderBy에 컬럼명을 컬럼형태로 지정.
print("orderBy에 컬럼명을 DataFrame['컬럼명'] 컬럼형태로 오름 차순 정렬")
titanic_sdf.orderBy(titanic_sdf['Name'], ascending=True).show() # select * from titanic_sdf order by Name asc

print('orderBy에 컬럼명을 DataFrame.컬럼명 컬럼형태로 내림 차순 정렬')
titanic_sdf.orderBy(titanic_sdf.Name, ascending=False).show()

print("orderBy에 컬럼명을 col('컬럼명') 컬럼형태로 오름 차순 정렬")
titanic_sdf.orderBy(col('Name'), ascending=True).show()

orderBy에 컬럼명을 문자열로 지정하고 내림 차순 정렬
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|        869|       0|     3|van Melkebeke, Mr...|  male|null|    0|    0|          345777|     9.5| null|       S|
|        154|       0|     3|van Billiard, Mr....|  male|40.5|    0|    2|        A/5. 851|    14.5| null|       S|
|        362|       0|     2|del Carlo, Mr. Se...|  male|29.0|    1|    0|   SC/PARIS 2167| 27.7208| null|       C|
|        283|       0|     3|de Pelsmaeker, Mr...|  male|16.0|    0|    0|          345778|     9.5| null|       S|
|        287|       1|     3|de Mulder, Mr. Th...|  male|30.0|    0|    0|          345774|     9.5| null|       S|
|        560|       1|     3|de Messema

In [0]:
from pyspark.sql.functions import col

print("orderBy에 여러개의 컬럼명을 문자열로 지정하고 내림 차순 정렬")
titanic_sdf.orderBy("Pclass", "Name", ascending=False).show()
titanic_sdf.orderBy(["Pclass", "Name"], ascending=False).show()

print("orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 내림 차순 정렬")
titanic_sdf.orderBy(col("Pclass"), col("Name"), ascending=False).show()

orderBy에 여러개의 컬럼명을 문자열로 지정하고 내림 차순 정렬
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|        869|       0|     3|van Melkebeke, Mr...|  male|null|    0|    0|          345777|    9.5| null|       S|
|        154|       0|     3|van Billiard, Mr....|  male|40.5|    0|    2|        A/5. 851|   14.5| null|       S|
|        283|       0|     3|de Pelsmaeker, Mr...|  male|16.0|    0|    0|          345778|    9.5| null|       S|
|        287|       1|     3|de Mulder, Mr. Th...|  male|30.0|    0|    0|          345774|    9.5| null|       S|
|        560|       1|     3|de Messemaeker, M...|female|36.0|    1|    0|          345572|   17.4| null|       S|
|        423|       0|     3|  Zimmerman, 

In [0]:
# orderBy에 여러개의 컬럼명을 지정하고 서로 다른 방식으로 정렬하기
from pyspark.sql.functions import col

print("orderBy에 여러개의 컬럼명을 문자열로 지정하고 서로 다른 방식으로 정렬 ")
titanic_sdf.orderBy('Pclass', 'Name', ascending=[True, False]).show()

print("orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 서로 다른 방식으로 정렬 ")
titanic_sdf.orderBy(col('Pclass'), col('Name'), ascending=[True, False]).show()

# 개별 컬럼별로 asc(), desc()를 적용. 
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).show()

orderBy에 여러개의 컬럼명을 문자열로 지정하고 서로 다른 방식으로 정렬 
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|        326|       1|     1|Young, Miss. Mari...|female|36.0|    0|    0|PC 17760|135.6333|  C32|       C|
|        556|       0|     1|  Wright, Mr. George|  male|62.0|    0|    0|  113807|   26.55| null|       S|
|         56|       1|     1|   Woolner, Mr. Hugh|  male|null|    0|    0|   19947|    35.5|  C52|       S|
|        352|       0|     1|Williams-Lambert,...|  male|null|    0|    0|  113510|    35.0| C128|       S|
|        156|       0|     1|Williams, Mr. Cha...|  male|51.0|    0|    1|PC 17597| 61.3792| null|       C|
|        378|       0|     1|Widener, Mr. Harr...|  male|27.0|    0|    2|  113503|   211.5|

In [0]:
# orderBy()와 동일한 메소드로 sort()를 제공. 
titanic_sdf.sort(col('Pclass').asc(), col('Name').desc()).show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|        326|       1|     1|Young, Miss. Mari...|female|36.0|    0|    0|PC 17760|135.6333|  C32|       C|
|        556|       0|     1|  Wright, Mr. George|  male|62.0|    0|    0|  113807|   26.55| null|       S|
|         56|       1|     1|   Woolner, Mr. Hugh|  male|null|    0|    0|   19947|    35.5|  C52|       S|
|        352|       0|     1|Williams-Lambert,...|  male|null|    0|    0|  113510|    35.0| C128|       S|
|        156|       0|     1|Williams, Mr. Cha...|  male|51.0|    0|    1|PC 17597| 61.3792| null|       C|
|        378|       0|     1|Widener, Mr. Harr...|  male|27.0|    0|    2|  113503|   211.5|  C82|       C|
|        857|       1|     1

In [0]:
# select Pclass, Name from titanic_sdf order by Pclass asc, Name desc
titanic_sdf.select(col('Pclass'), col('Name')).orderBy(col('Pclass').asc(), col('Name').desc()).show()

#select Pclass, Name from (select * from titanic_sdf order by Pclass asc, Name desc)
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).select(col('Pclass'), col('Name')).show()

+------+--------------------+
|Pclass|                Name|
+------+--------------------+
|     1|Young, Miss. Mari...|
|     1|  Wright, Mr. George|
|     1|   Woolner, Mr. Hugh|
|     1|Williams-Lambert,...|
|     1|Williams, Mr. Cha...|
|     1|Widener, Mr. Harr...|
|     1|Wick, Mrs. George...|
|     1|Wick, Miss. Mary ...|
|     1|White, Mr. Richar...|
|     1|White, Mr. Perciv...|
|     1|     Weir, Col. John|
|     1|Warren, Mrs. Fran...|
|     1|    Ward, Miss. Anna|
|     1|Walker, Mr. Willi...|
|     1|Van der hoef, Mr....|
|     1|Uruchurtu, Don. M...|
|     1|Thorne, Mrs. Gert...|
|     1|Thayer, Mrs. John...|
|     1|Thayer, Mr. John ...|
|     1|Thayer, Mr. John ...|
+------+--------------------+
only showing top 20 rows

+------+--------------------+
|Pclass|                Name|
+------+--------------------+
|     1|Young, Miss. Mari...|
|     1|  Wright, Mr. George|
|     1|   Woolner, Mr. Hugh|
|     1|Williams-Lambert,...|
|     1|Williams, Mr. Cha...|
|     1|Widene

## spark DataFrame에 aggregation(집계) 메소드 적용하기
- pandas DataFrame은 DataFrame 객체에서 aggregation 메소드를 많이 가질 수 있음(DataFrame.count(), DataFrame.max())
- pandas DataFrame은 DataFrame 객체에 aggregation 메소드를 적용 시 DataFrame에 속한 전체 컬럼들에 모두 aggregation 메소드를 적용
- spark DataFrame은 DataFrame 객체에서 aggregation 메소드를 별로 가지고 있지 않음. count() 메소드 정도...
- spark DataFrame에 aggregation 메소드를 적용 시에는 pyspark.sql.functions 모듈의 max, min, sum 등의 함수를 이용해야함.
- pandas와 spark에서 agrregation 방식이 다르고, 맞게 사용하지 않을 시 에러가 나기 때문에 주의해서 사용

In [0]:
print('#### pandas dataframe count() aggregation ####')
print(titanic_pdf.count())

print('#### pandas dataframe max() aggregation ####')
print(titanic_pdf.max())

print('#### pandas dataframe count() aggregation 결과 type ####')
print(type(titanic_pdf.count()))

#### pandas dataframe count() aggregation ####
PassengerId    891
Survived       891
Pclass         891
Name           891
Sex            891
Age            714
SibSp          891
Parch          891
Ticket         891
Fare           891
Cabin          204
Embarked       889
dtype: int64
#### pandas dataframe max() aggregation ####
PassengerId                            891
Survived                                 1
Pclass                                   3
Name           van Melkebeke, Mr. Philemon
Sex                                   male
Age                                   80.0
SibSp                                    8
Parch                                    6
Ticket                           WE/P 5735
Fare                              512.3292
dtype: object
#### pandas dataframe count() aggregation 결과 type ####
<class 'pandas.core.series.Series'>


  print(titanic_pdf.max())


In [0]:
print(titanic_pdf[['Pclass', 'Age']].max())

Pclass     3.0
Age       80.0
dtype: float64


In [0]:
# spark DataFrame에 count() aggregation을 적용하면 DataFrame의 Record 건수 반환. 
print('count 결과:', titanic_sdf.count()) # select count(*) from titanic_sdf

count 결과: 891


In [0]:
# 하지만 count() 가 아닌 다른 aggregation 함수를 DataFrame에 적용하면 오류 발생.이는 SQL과 유사
titanic_sdf.max() # select max() from titanic_sdf 와 같은 SQL을 구문 오류. 

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-418230295219943>:4[0m
[1;32m      1[0m [38;5;66;03m# 하지만 count() 가 아닌 다른 aggregation 함수를 DataFrame에 적용하면 오류 발생.이는 SQL과 유사[39;00m
[1;32m      2[0m [38;5;66;03m# 이는 count() aggregation 함수가 가진 특수성. 다른 aggregation 함수들은 어떤 컬럼을 aggregation 할지[39;00m
[1;32m      3[0m [38;5;66;03m# count()외의 다른 aggregation 함수, 예를 들어 max(), min()등은 pyspark.sql.functions 모듈에 별도로 [39;00m
[0;32m----> 4[0m [43mtitanic_sdf[49m[38;5;241;43m.[39;49m[43mmax[49m()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;

In [0]:
from pyspark.sql.functions import max, sum, min # agrregation함수를 import해서 사용해야함 
# spark DataFrame에 count()를 제외하고 max(), min(), sum(), avg()와 같은 aggregate 메소드를 바로 호출
titanic_sdf_max = titanic_sdf.select(max('Age')) # select max(Age) from titanic_sdf
print(titanic_sdf_max.show())
print(type(titanic_sdf_max)) # max() aggregation은 단 한개의 값을 반환하지만 DataFrame으로 반환. 

+--------+
|max(Age)|
+--------+
|    80.0|
+--------+

None
<class 'pyspark.sql.dataframe.DataFrame'>


## spark DataFrame의 groupBy() 알아 보기
- pandas DataFrame의 groupby(by='group_by_컬럼명') 수행 시 group_by_컬럼명 레벨로 group by 된 DataFrameGroupBy 객체 반환하고 여기에 aggregation 메소드 적용.
- spark DataFrame도 groupBy('group_by_컬럼명') 수행 시 group_by_컬럼명 레벨로 group by 된 GroupedData 객체 반환하고 여기에 aggregation 메소드 적용.
- pandas DataFrameGroupBy 객체에 agg() 메소드를 이용하여 서로 다른 컬럼에 서로 다른 aggregation 함수 적용 가능
- spark GroupedData 객체도 agg() 메소드를 이용하여 서로 다른 컬럼에 서로 다른 aggregation 함수 적용 가능
- spark groupBy()는 pandas groupby()의 특징과 SQL의 특징을 함께 가짐.

In [0]:
# pandas DataFrame에 groupby()메소드 호출 시 DataFrameGroupBy 객체 반환. 
titanic_pdf_groupby = titanic_pdf.groupby(by='Pclass')
print('pandas DataFrame의 groupby() 적용 결과 type:', type(titanic_pdf_groupby))

# Group by 된 pandas DataFrameGroupBy 객체에 count()를 적용 시 group by 된 컬럼값 레벨로 모든 컬럼들에 적용
print('\n#### group by 레벨로 모든 컬럼에 count 적용 #### ')
print(titanic_pdf.groupby(by='Pclass').count())
print('\n#### group by 레벨로 특정 컬럼에 aggregation 적용 #### ')

# Group by 된 pandas DataFrameGroupBy 객체에 특정 컬럼에 aggregation 을 적용하려면 해당 컬럼을 ['컬럼'].max()
print(titanic_pdf.groupby(by='Pclass')['Age'].max()) # select max(Age) from titanic_pdf group by Pcl

# pandas DataFrameGroupBy 객체에 여러 컬럼에 동일 aggregation 을 적용하려면 해당 컬럼들을 [['컬럼명1','컬럼명2']].max()
print('\n#### group by 레벨로 여러 컬럼에 동일 aggregation 적용 #### ')
print(titanic_pdf.groupby(by='Pclass')[['Age', 'Fare']].max()) # select max(Age), max(Fare) from tit

# Group by 된 DataFrameGroupBy 객체에 서로 다른 컬럼에 서로 다른 aggregation 함수를 적용하려면 agg()
# agg()메소드 내부에 인자는 dictionary 형태로 적용 컬럼명과 적용 aggregation 함수 기재
print('\n#### group by 레벨로 여러개의 aggregation 함수를 서로 다른 컬럼에 적용 #### ')
agg_format = {'Age':'max', 'SibSp':'sum', 'Fare':'mean'}
print(titanic_pdf.groupby(by='Pclass').agg(agg_format))


pandas DataFrame의 groupby() 적용 결과 type: <class 'pandas.core.groupby.generic.DataFrameGroupBy'>

#### group by 레벨로 모든 컬럼에 count 적용 #### 
        PassengerId  Survived  Name  Sex  Age  SibSp  Parch  Ticket  Fare  \
Pclass                                                                      
1               216       216   216  216  186    216    216     216   216   
2               184       184   184  184  173    184    184     184   184   
3               491       491   491  491  355    491    491     491   491   

        Cabin  Embarked  
Pclass                   
1         176       214  
2          16       184  
3          12       491  

#### group by 레벨로 특정 컬럼에 aggregation 적용 #### 
Pclass
1    80.0
2    70.0
3    74.0
Name: Age, dtype: float64

#### group by 레벨로 여러 컬럼에 동일 aggregation 적용 #### 
         Age      Fare
Pclass                
1       80.0  512.3292
2       70.0   73.5000
3       74.0   69.5500

#### group by 레벨로 여러개의 aggregation 함수를 서로 다른 컬럼에 적용 #### 
         Age  

In [0]:
# pandas DataFrame의 value_counts()는 Series에 적용시 해당 series내의 값 별로 건수를 구함. 
print(titanic_pdf['Pclass'].value_counts())

3    491
1    216
2    184
Name: Pclass, dtype: int64


In [0]:
# pandas 의 value_counts()의 대응될 수 있는 groupBy() 메소드. Spark DataFrame에 groupBy() 적용 시 Gr
# GroupedData Object에 count()외에 min(), max(), avg(), sum() 등 다양한 aggregation 메소드를 호출하여
titanic_sdf.groupBy('Pclass').count().show() # select pclass, count(*) from titanic_sdf group by pcl
print('spark DataFrame groupBy type:', type(titanic_sdf.groupBy('Pclass')))
print('spark GroupedData의 aggregation 메소드 적용 결과 type:', titanic_sdf.groupBy('Pclass').count())

+------+-----+
|Pclass|count|
+------+-----+
|     1|  216|
|     3|  491|
|     2|  184|
+------+-----+

spark DataFrame groupBy type: <class 'pyspark.sql.group.GroupedData'>
spark GroupedData의 aggregation 메소드 적용 결과 type: DataFrame[Pclass: int, count: bigint]


In [0]:
# spark DataFrame의 orderBy()메소드를 적용하여 group by 결과 건수 descending 으로 정렬 
titanic_sdf.groupBy('Pclass').count().orderBy('count', ascending=False).show()

+------+-----+
|Pclass|count|
+------+-----+
|     3|  491|
|     1|  216|
|     2|  184|
+------+-----+



In [0]:
#GroupedData 에 count()가 아니고 다른 aggregation 메소드를 적용 시 pandas DataFrame의 groupby와 유사
titanic_sdf.groupBy('Pclass').max().show()

+------+----------------+-------------+-----------+--------+----------+----------+---------+
|Pclass|max(PassengerId)|max(Survived)|max(Pclass)|max(Age)|max(SibSp)|max(Parch)|max(Fare)|
+------+----------------+-------------+-----------+--------+----------+----------+---------+
|     1|             890|            1|          1|    80.0|         3|         4| 512.3292|
|     3|             891|            1|          3|    74.0|         8|         6|    69.55|
|     2|             887|            1|          2|    70.0|         3|         3|     73.5|
+------+----------------+-------------+-----------+--------+----------+----------+---------+



In [0]:
# group by 레벨로 특정 컬럼에 aggregation 적용. max('컬럼명')과 같이 aggregation 메소드 내부에 인자로
titanic_sdf.groupBy('Pclass').max('Age').show() # select max(Age) from titainic_sdf group by Pclass

#GroupedData에서 aggregation 메소드 호출 시 오직 문자열 컬럼명만 가능. 컬럼형 인자 입력은 오류 발생.
titanic_sdf.groupBy('Pclass').max(col('Age')).show()

+------+--------+
|Pclass|max(Age)|
+------+--------+
|     1|    80.0|
|     3|    74.0|
|     2|    70.0|
+------+--------+



[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-418230295219953>:5[0m
[1;32m      2[0m titanic_sdf[38;5;241m.[39mgroupBy([38;5;124m'[39m[38;5;124mPclass[39m[38;5;124m'[39m)[38;5;241m.[39mmax([38;5;124m'[39m[38;5;124mAge[39m[38;5;124m'[39m)[38;5;241m.[39mshow() [38;5;66;03m# select max(Age) from titainic_sdf group by Pclass[39;00m
[1;32m      4[0m [38;5;66;03m#GroupedData에서 aggregation 메소드 호출 시 오직 문자열 컬럼명만 가능. 컬럼형 인자 입력은 오류 발생.[39;00m
[0;32m----> 5[0m titanic_sdf[38;5;241m.[39mgroupBy([38;5;124m'[39m[38;5;124mPclass[39m[38;5;124m'[39m)[38;5;241m.[39mmax(col([38;5;124m'[39m[38;5;124mAge[39m[38;5;124m'[39m))[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m star

In [0]:
# 여러 컬럼으로 Group by 규정할 때 개별 컬럼명을 입력하거나, list 형태로 입력 가능. 
titanic_sdf.groupBy('Pclass', 'Sex').max('Age').show() # select max(Age) from titanic_sdf group by Pclass, Sex
titanic_sdf.groupBy(['Pclass', 'Sex']).max('Age').show()

+------+------+--------+
|Pclass|   Sex|max(Age)|
+------+------+--------+
|     2|female|    57.0|
|     3|  male|    74.0|
|     1|  male|    80.0|
|     3|female|    63.0|
|     1|female|    63.0|
|     2|  male|    70.0|
+------+------+--------+

+------+------+--------+
|Pclass|   Sex|max(Age)|
+------+------+--------+
|     2|female|    57.0|
|     3|  male|    74.0|
|     1|  male|    80.0|
|     3|female|    63.0|
|     1|female|    63.0|
|     2|  male|    70.0|
+------+------+--------+



In [0]:
### 여러개의 aggregation 함수를 적용할 경우는 agg()메소드 내에서 개별 aggregation 함수를 명시 해야함
from pyspark.sql.functions import max, avg, sum, min

# select max(age), min(age), sum(age), avg(age) from titanic_sdf group by pclass
titanic_sdf.groupBy('Pclass').agg(max('Age'), min('Age'), sum('Age'), avg('Age')).show() 

+------+--------+--------+--------+------------------+
|Pclass|max(Age)|min(Age)|sum(Age)|          avg(Age)|
+------+--------+--------+--------+------------------+
|     1|    80.0|    0.92| 7111.42|38.233440860215055|
|     3|    74.0|    0.42| 8924.92| 25.14061971830986|
|     2|    70.0|    0.67| 5168.83| 29.87763005780347|
+------+--------+--------+--------+------------------+



In [0]:
#아래와 같이 개별 aggregation 함수 결과 컬럼에 별도의 컬럼명을 alias('새로운 컬럼명')을 활용하여 부여
# agg() 메소드 내에서 aggregation 함수 적용 시에는 col('컬럼명')과 같은 컬럼형으로 컬럼명을 지정해도
# select max(age) as max_age, min(age) as min_age, sum(age) as sum_age, avg(age) as avg_age from tit

titanic_sdf.groupBy('Pclass').agg(
 max(col('Age')).alias('max_age'), min('Age').alias('min_age'), \
 sum('Age').alias('sum_age'), avg('Age').alias('avg_age') \
 ).show()

+------+-------+-------+-------+------------------+
|Pclass|max_age|min_age|sum_age|           avg_age|
+------+-------+-------+-------+------------------+
|     1|   80.0|   0.92|7111.42|38.233440860215055|
|     3|   74.0|   0.42|8924.92| 25.14061971830986|
|     2|   70.0|   0.67|5168.83| 29.87763005780347|
+------+-------+-------+-------+------------------+



In [0]:
# 아래와 같이 filter()를 적용하여 group by의 aggregation 결과 값을 기준으로 filtering 적용할 수 있음

titanic_sdf.groupBy('Pclass').agg(max(col('Age')).alias('max_age'), min('Age').alias('min_age') , \
 sum('Age').alias('sum_age'), avg('Age').alias('avg_age') \
 ).filter(col('max_age') > 70).show()

+------+-------+-------+-------+------------------+
|Pclass|max_age|min_age|sum_age|           avg_age|
+------+-------+-------+-------+------------------+
|     1|   80.0|   0.92|7111.42|38.233440860215055|
|     3|   74.0|   0.42|8924.92| 25.14061971830986|
+------+-------+-------+-------+------------------+

