## pyspark.sql.function as f

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
import pandas as pd

from pyspark.sql.types import DoubleType,IntegerType, StringType
#from pyspark.sql.functions import when, udf, col, regexp_replace
import pyspark.sql.functions as f

In [None]:
sc= SparkContext('local')
sqlCtx= SQLContext(sc)

In [None]:
df= sqlCtx.read.csv('data/crime_in_Seoul.csv', encoding= 'euc-kr',
                  header= True, inferSchema= True)

In [None]:
df.select('관서명').show()

+--------+
|  관서명|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



#### .alias('new name')
- 컬럼 명 변경
- withColumn과 차이가 있다.
- df['컬럼 명'].alias('new name')

In [None]:
df.select(df['관서명'].alias('kw')).show()

+--------+
|      kw|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



In [None]:
df.select(df['관서명'].alias('kw')).show()

+--------+
|      kw|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



### f 함수
```
max, avg, ...
```
- column 객체 반환
- select와 함께 사용

In [None]:
f.max(df['살인 발생'])

Column<b'max(\xec\x82\xb4\xec\x9d\xb8 \xeb\xb0\x9c\xec\x83\x9d)'>

In [None]:
df.select(f.max(df['살인 발생'])).show()

+--------------+
|max(살인 발생)|
+--------------+
|            14|
+--------------+



In [None]:
df.select(f.avg(df['살인 발생'])).show()

+-----------------+
|   avg(살인 발생)|
+-----------------+
|5.258064516129032|
+-----------------+



In [None]:
df.select(f.avg(df['살인 발생']), f.max(df['살인 발생'])).show()

+-----------------+--------------+
|   avg(살인 발생)|max(살인 발생)|
+-----------------+--------------+
|5.258064516129032|            14|
+-----------------+--------------+



In [None]:
df.select(f.avg('살인 발생').alias('평균'), f.sum('살인 발생').alias('총 합')).show()

+-----------------+-----+
|             평균|총 합|
+-----------------+-----+
|5.258064516129032|  163|
+-----------------+-----+



### .agg()
- DataFrame 반환
- 한 컬럼 당 1개씩 반환
```
DataFrame.agg(f func(DataFrame['컬럼'])
DataFrame.agg({'컬럼': 'func', '컬럼': 'func'})
```

In [None]:
df.agg(f.avg(df['살인 발생']))

DataFrame[avg(살인 발생): double]

In [None]:
df.agg({'살인 발생': 'avg', '살인 검거': 'avg'}).show()

+-----------------+-----------------+
|   avg(살인 발생)|   avg(살인 검거)|
+-----------------+-----------------+
|5.258064516129032|4.935483870967742|
+-----------------+-----------------+



#### f.round()

In [None]:
df.agg(f.round(f.avg(df['살인 발생']), 2)).show()

+------------------------+
|round(avg(살인 발생), 2)|
+------------------------+
|                    5.26|
+------------------------+



### Row(key= value...)
- dictionary와 유사
- .asDict( ) 를 통해 dict로 전환 가능

In [None]:
row = Row(a=1, b=2, c=3)
row

Row(a=1, b=2, c=3)

In [None]:
type(row)

pyspark.sql.types.Row

In [None]:
row['a']

1

In [None]:
row.asDict()

{'a': 1, 'b': 2, 'c': 3}

### .rdd
- DataFrame-> 분산 리스트
- action, transformation function 사용 가능

In [None]:
Rdd = df.agg({'살인 발생': 'avg', '살인 검거':'avg'})
Rdd.rdd

MapPartitionsRDD[89] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
gdf= df.select(f.round(f.avg(df['살인 발생']),2).alias('평균'),
                f.round(f.max(df['살인 발생']),2).alias('최대'))

In [None]:
Rdd.rdd.collect()

[Row(avg(살인 발생)=5.258064516129032, avg(살인 검거)=4.935483870967742)]

In [None]:
gdf

DataFrame[평균: double, 최대: int]

In [None]:
gdf.rdd.collect()

[Row(평균=5.26, 최대=14)]

In [None]:
gdf.rdd.map(lambda v: [v['평균'], v['최대']]).collect()

[[5.26, 14]]

In [None]:
my= [10,20,30,40,50]
nRdd= sc.parallelize(my)
nRdd

ParallelCollectionRDD[158] at readRDDFromFile at PythonRDD.scala:262

In [None]:
nRdd.map(lambda v: v+1).collect()

[11, 21, 31, 41, 51]

In [None]:
ddf= df.select('관서명', '살인 발생')

In [None]:
for r in ddf.head(5):
    print(r['관서명'], r['살인 발생'])

중부서 2
종로서 3
남대문서 1
서대문서 2
혜화서 3


In [None]:
ddf.rdd.collect()

[Row(관서명='중부서', 살인 발생=2),
 Row(관서명='종로서', 살인 발생=3),
 Row(관서명='남대문서', 살인 발생=1),
 Row(관서명='서대문서', 살인 발생=2),
 Row(관서명='혜화서', 살인 발생=3),
 Row(관서명='용산서', 살인 발생=5),
 Row(관서명='성북서', 살인 발생=2),
 Row(관서명='동대문서', 살인 발생=5),
 Row(관서명='마포서', 살인 발생=8),
 Row(관서명='영등포서', 살인 발생=14),
 Row(관서명='성동서', 살인 발생=4),
 Row(관서명='동작서', 살인 발생=5),
 Row(관서명='광진서', 살인 발생=4),
 Row(관서명='서부서', 살인 발생=2),
 Row(관서명='강북서', 살인 발생=7),
 Row(관서명='금천서', 살인 발생=3),
 Row(관서명='중랑서', 살인 발생=13),
 Row(관서명='강남서', 살인 발생=3),
 Row(관서명='관악서', 살인 발생=9),
 Row(관서명='강서서', 살인 발생=7),
 Row(관서명='강동서', 살인 발생=4),
 Row(관서명='종암서', 살인 발생=3),
 Row(관서명='구로서', 살인 발생=8),
 Row(관서명='서초서', 살인 발생=7),
 Row(관서명='양천서', 살인 발생=3),
 Row(관서명='송파서', 살인 발생=11),
 Row(관서명='노원서', 살인 발생=10),
 Row(관서명='방배서', 살인 발생=1),
 Row(관서명='은평서', 살인 발생=1),
 Row(관서명='도봉서', 살인 발생=3),
 Row(관서명='수서서', 살인 발생=10)]

In [None]:
ddf.rdd.map(lambda v: v['살인 발생']).sum()

163

### .groupBy()
- group 객체 반환
- 숫자 데이터만 집계

In [None]:
df= sqlCtx.read.csv('data/grade.csv', header= True, inferSchema= True)

In [None]:
df.show()

+-----+----+-----+----+----+
| 학년|과목| 결과|중간|기말|
+-----+----+-----+----+----+
|1학년|국어| 좋음|  80|  90|
|1학년|국어| 나쁨|  50|  40|
|1학년|국어| 나쁨|  20|  50|
|1학년|수학| 좋음|  83|  95|
|1학년|수학| 좋음|  93|  86|
|2학년|국어| 나쁨|  44|  65|
|2학년|국어| 좋음|  95|  98|
|2학년|수학| 좋음|  96|  99|
|2학년|수학| 나쁨|  57|  69|
+-----+----+-----+----+----+



In [None]:
df.groupBy('학년')

<pyspark.sql.group.GroupedData at 0x1560a8a2bc8>

In [None]:
df.groupBy('학년').max()

DataFrame[학년: string, max(중간): int, max(기말): int]

In [None]:
df.groupBy('학년').max().show()

+-----+---------+---------+
| 학년|max(중간)|max(기말)|
+-----+---------+---------+
|2학년|       96|       99|
|1학년|       93|       95|
+-----+---------+---------+



In [None]:
df.groupBy('학년').avg().show()

+-----+---------+---------+
| 학년|avg(중간)|avg(기말)|
+-----+---------+---------+
|2학년|     73.0|    82.75|
|1학년|     65.2|     72.2|
+-----+---------+---------+



In [None]:
df.groupBy('학년', '과목').avg().orderBy('학년').show()

+-----+----+---------+---------+
| 학년|과목|avg(중간)|avg(기말)|
+-----+----+---------+---------+
|1학년|국어|     50.0|     60.0|
|1학년|수학|     88.0|     90.5|
|2학년|수학|     76.5|     84.0|
|2학년|국어|     69.5|     81.5|
+-----+----+---------+---------+



In [None]:
df.groupBy('학년').agg(f.sum('중간'), f.sum('기말')).show()

+-----+---------+---------+
| 학년|sum(중간)|sum(기말)|
+-----+---------+---------+
|2학년|      292|      331|
|1학년|      326|      361|
+-----+---------+---------+



In [None]:
df.groupBy('학년').agg(f.sum('중간').alias('중간 합')
                     , f.sum('기말').alias('기말 합')).show()

+-----+-------+-------+
| 학년|중간 합|기말 합|
+-----+-------+-------+
|2학년|    292|    331|
|1학년|    326|    361|
+-----+-------+-------+



In [None]:
df.fillna({'중간': 10}).show()

+-----+----+-----+----+----+
| 학년|과목| 결과|중간|기말|
+-----+----+-----+----+----+
|1학년|국어| 좋음|  80|  90|
|1학년|국어| 나쁨|  50|  40|
|1학년|국어| 나쁨|  20|  50|
|1학년|수학| 좋음|  83|  95|
|1학년|수학| 좋음|  93|  86|
|2학년|국어| 나쁨|  44|  65|
|2학년|국어| 좋음|  95|  98|
|2학년|수학| 좋음|  96|  99|
|2학년|수학| 나쁨|  57|  69|
+-----+----+-----+----+----+



## 메모리 회수

In [None]:
sc.stop()