# DataFrame

 - - -

이번 notebook에서는 Spark RDD의 sub-type인 DataFrame에 대해 학습을 진행합니다.

* Dataframes are a restricted sub-type of RDDs. 
* Restircing the type allows for more optimization.
* Dataframes store two dimensional data, similar to the type of data stored in a spreadsheet(ex. MS Excel). 
   * Each column in a dataframe can have a different type.
   * Each row contains a `record`.
   
**Spark의 DataFrame은 python pandas의 DataFrame, R의 DataFrame과 매우 유사합니다.**

### pyspark import & SparkContext 생성

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

sc = SparkContext(master="local[*]")
print(sc)

# Just like using Spark requires having a SparkContext, using SQL requires an SQLContext
sqlContext = SQLContext(sc)
print(sqlContext)

<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.context.SQLContext object at 0x7f043c57d9e8>


### Constructing a DataFrame from an RDD of Rows
Each Row defines it's own  fields, the schema is *inferred*.

여기서, ``schema is inferred``는 Spark가 스스로 각 column의 데이터의 유형을 파악하고, 그에 맞는 형식을 제공한다는 의미.

In [2]:
# One way to create a DataFrame is to first define an RDD from a list of Rows 
some_rdd = sc.parallelize([Row(name="John", age=19),
                           Row(name="Smith", age=23),
                           Row(name="Sarah", age=18)])
some_rdd.collect()

[Row(age=19, name='John'),
 Row(age=23, name='Smith'),
 Row(age=18, name='Sarah')]

In [3]:
# The DataFrame is created from the RDD or Rows
# Infer schema from the first row, create a DataFrame and print the schema
some_df = sqlContext.createDataFrame(some_rdd)
some_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [4]:
# A dataframe is an RDD of rows plus information on the schema.
# performing **collect()* on either the RDD or the DataFrame gives the same result.
print(" RDD(some_rdd) = ", type(some_rdd),"\n","DataFrame(some_df) = ", type(some_df), "\n")
print('some_df =',some_df.collect())
print('some_rdd=',some_rdd.collect())

 RDD(some_rdd) =  <class 'pyspark.rdd.RDD'> 
 DataFrame(some_df) =  <class 'pyspark.sql.dataframe.DataFrame'> 

some_df = [Row(age=19, name='John'), Row(age=23, name='Smith'), Row(age=18, name='Sarah')]
some_rdd= [Row(age=19, name='John'), Row(age=23, name='Smith'), Row(age=18, name='Sarah')]


### Defining the Schema explicitly
The advantage of creating a DataFrame using a pre-defined schema allows the content of the RDD to be simple tuples, rather than rows.

Spark가 ``schema``를 추론(inferred)하는 것이 아닌, 사용자가 직접 ``schema를 정의``할 수 있다.

In [5]:
# In this case we create the dataframe from an RDD of tuples (rather than Rows) and provide the schema explicitly
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", IntegerType(), False)])

# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.createDataFrame(another_rdd, schema)
another_df.printSchema()

root
 |-- person_name: string (nullable = false)
 |-- person_age: integer (nullable = false)



## Loading DataFrames from disk
There are many methods to load DataFrames from Disk. Here we will discuss three of these methods
1. JSON (on your own)
2. CSV  (on your own)
3. **Parquet**

In addition, there are API's for connecting Spark to an external database. We will not discuss this type of connection in this class.

### Loading dataframes from JSON files
[JSON](http://www.json.org/)은 ``속성-값 pair`` 또는 ``키-값 pair``으로 이루어진 데이터 오브젝트를 전달하기 위해 인간이 읽을 수 있는 텍스트를 사용하는 개방형 표준 포맷이다. 특히, 인터넷에서 자료를 주고 받을 때 그 자료를 표현하는 방법으로 알려져 있다. 자료의 종류에 큰 제한은 없으며, 특히 컴퓨터 프로그램의 변수값을 표현하는 데 적합하다. **JSON can also be used to store tabular data and can be easily loaded into a dataframe.**

**( .json 예시 )**

![json예시](https://wallees.files.wordpress.com/2018/04/593aa-screen2bshot2b2018-04-172bat2b4-56-002bpm.png?w=400&h=186)

In [6]:
import urllib.request
# people.json(예제파일 다운로드)
f = urllib.request.urlretrieve ("https://docs.google.com/uc?export=download&id=1TZyM7Gfc6XWLot-L36TDV-JwySgHxGv4", "people.json")
data_file = "./people.json"

# Create a DataFrame from the file(s) pointed to by path
people = sqlContext.read.json(data_file)
print('people is a',type(people))

# The inferred schema can be visualized using the printSchema() method.
people.show()
people.printSchema()

people is a <class 'pyspark.sql.dataframe.DataFrame'>
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### Excercise 1 : Loading ``csv`` files into dataframes (30 point)

- - -
**task**

아래에 제시되는 두 가지 방법을 이용하여 ``csv``을 dataframe으로 변환하고(schema infered), ``schema``를 직접 설정하여 dataframe으로 변환합니다(**두 가지 방법 중 선택**)

* 1. pandas의 ``read_csv``를 이용하여 ``csv``파일을 불러온 뒤, SQLContext의 ``createDataFrame``을 이용하여 dataframe으로 변환합니다.(10 point)

* 2. SQLContext의 ``read.csv``를 이용하여 ``csv``를 dataframe으로 변환합니다. (10 point)

* 3. ``schema``를 직접 설정하여 dataframe으로 변환(task1 또는 task2의 방법을 이용) (10 point)

**위의 방법 외에도 ``csv``를 불러오는 방법은 많습니다. 하지만, Excercise 1 에서는 task 1, task 2 방법으로 한정합니다.**

**!!csv to dataframe 방법 [참고 주소](https://stackoverflow.com/questions/29936156/get-csv-to-spark-dataframe)**

- - -
**출력 예시**

* show()와 printSchema()를 이용하여 결과를 출력합니다(task 1, task 2 모두)

* ex) 

```
# task 1 == pandas -> createDataFrame

task1.show(3)
task1.printSchema()

# task 2 == SQLContext -> DataFrame

task2.show(3)
task2.printSchema()

# task 3
task3.show(3)
task3.printSchema()

### task3.printSchema() 예시
root
 |-- batter_id: integer (nullable = true)
 |-- batter_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- opposing_team: string (nullable = true)
 |-- avg: double (nullable = true)
 |-- AB: integer (nullable = true)
 |-- R: integer (nullable = true)
 |-- H: integer (nullable = true)
 |-- 2B: integer (nullable = true)
 |-- 3B: integer (nullable = true)
 |-- HR: integer (nullable = true)
 |-- RBI: integer (nullable = true)
 |-- CS: integer (nullable = true)
 |-- BB: integer (nullable = true)
 |-- HBP: integer (nullable = true)
 |-- SO: integer (nullable = true)
 |-- GDP: integer (nullable = true)
 |-- avg2: double (nullable = true)
 |-- year: string (nullable = true)
```  

In [7]:
import urllib.request
# people.json(예제파일 다운로드)
f = urllib.request.urlretrieve ("https://docs.google.com/uc?export=download&id=1QHSuh61Ng8JQ7JkAQfDZgLzd5auC9KAP", "Regular_Season_Batter_Day_by_Day.csv")
data_file = "./Regular_Season_Batter_Day_by_Day.csv"
    
!ls

Data				  Lab_HW2_upload.ipynb
ex2_coalesce			  people.json
ex2_raw_partition		  pre.csv
ex2_repartition			  regular.csv
HW1_20155138_안춘모.ipynb	  Regular_Season_Batter_Day_by_Day.csv
HW3_upload_V1.ipynb		  save
HW5_20155138_안춘모.ipynb	  users.parquet
kddcup.data_10_percent_corrected


**task**
* 1. pandas의 ``read_csv``를 이용하여 ``csv``파일을 불러온 뒤, SQLContext의 ``createDataFrame``을 이용하여 dataframe으로 변환합니다.(10 point)

In [8]:
# 1-1 답안 작성
import pandas as pd

pandas_df = pd.read_csv('Regular_Season_Batter_Day_by_Day.csv')
task1_DF = sqlContext.createDataFrame(pandas_df) 
# csv파일을 읽어와 pandas의 dataframe과 스키마를 출력한다.

# output
task1_DF.show(3)
task1_DF.printSchema()

+---------+-----------+----+-------------+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+-------------------+----+
|batter_id|batter_name|date|opposing_team| avg1| AB|  R|  H| 2B| 3B| HR|RBI| SB| CS| BB|HBP| SO|GDP|               avg2|year|
+---------+-----------+----+-------------+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+-------------------+----+
|        0|   가르시아|3.24|           NC|0.333|  3|  1|  1|  0|  0|  0|  0|  0|  0|  1|  0|  1|  0|              0.333|2018|
|        0|   가르시아|3.25|           NC|0.000|  4|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|0.14300000000000002|2018|
|        0|   가르시아|3.27|         넥센|0.200|  5|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|0.16699999999999998|2018|
+---------+-----------+----+-------------+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+-------------------+----+
only showing top 3 rows

root
 |-- batter_id: long (nullable = true)
 |-- batter_name: string (nullable = true)
 |-- date: double (n

**task**
* 2. SQLContext의 ``read.csv``를 이용하여 ``csv``를 dataframe으로 변환합니다. (10 point)

In [9]:
# 1-2 답안 작성
task2_DF = sqlContext.read.csv('Regular_Season_Batter_Day_by_Day.csv') #csv파일을 읽어 csv파일을 스키마와 dataframe형태로 출력
# output
task2_DF.show(3)
task2_DF.printSchema()

+---------+-----------+----+-------------+-----+---+---+---+---+---+----+----+----+----+----+----+----+----+-------------------+----+
|      _c0|        _c1| _c2|          _c3|  _c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|               _c18|_c19|
+---------+-----------+----+-------------+-----+---+---+---+---+---+----+----+----+----+----+----+----+----+-------------------+----+
|batter_id|batter_name|date|opposing_team| avg1| AB|  R|  H| 2B| 3B|  HR| RBI|  SB|  CS|  BB| HBP|  SO| GDP|               avg2|year|
|        0|   가르시아|3.24|           NC|0.333|  3|  1|  1|  0|  0|   0|   0|   0|   0|   1|   0|   1|   0|0.33299999999999996|2018|
|        0|   가르시아|3.25|           NC|0.000|  4|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   1|   0|0.14300000000000002|2018|
+---------+-----------+----+-------------+-----+---+---+---+---+---+----+----+----+----+----+----+----+----+-------------------+----+
only showing top 3 rows

root
 |-- _c0: string (nullable = true)
 |-- 

**task**
* 3. ``schema``를 직접 설정하여 dataframe으로 변환(task1 또는 task2의 방법을 이용) (10 point)

In [10]:
# 1-3 답안 작성
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, DoubleType, LongType
task3_schema = StructType([StructField("better_id", LongType(), True),
                          StructField("better_name", StringType(),True),
                          StructField("date", DoubleType(), True),
                          StructField("opposing_team",StringType(),True),
                          StructField("avg1",StringType(),True),
                          StructField("AB",LongType(),True),
                          StructField("R",LongType(),True),
                          StructField("H",LongType(),True),
                          StructField("2B",LongType(),True),
                          StructField("3B",LongType(),True),
                          StructField("HR",LongType(),True),
                          StructField("RBI",LongType(),True),
                          StructField("SB",LongType(),True),
                          StructField("CS",LongType(),True),
                          StructField("BB",LongType(),True),
                          StructField("HBP",LongType(),True),
                          StructField("SO",LongType(),True),
                          StructField("GDP",LongType(),True),
                          StructField("avg2",DoubleType(),True),
                          StructField("year",LongType(),True),])
# 스키마를 생성하는데 타입을 맞춰서 선언 하고, null값을 의미하는 세번째 인자를 허용한다는 뜻에서 True로 설정한다.

# output
task3_DF = sqlContext.read.csv(data_file,task3_schema) # data_file에서 csv파일을 읽어와 새로만든 스키마에 적용한다.
task3_DF.printSchema()
task3_DF.show(3) # dataframe형태로 3줄 출력

root
 |-- better_id: long (nullable = true)
 |-- better_name: string (nullable = true)
 |-- date: double (nullable = true)
 |-- opposing_team: string (nullable = true)
 |-- avg1: string (nullable = true)
 |-- AB: long (nullable = true)
 |-- R: long (nullable = true)
 |-- H: long (nullable = true)
 |-- 2B: long (nullable = true)
 |-- 3B: long (nullable = true)
 |-- HR: long (nullable = true)
 |-- RBI: long (nullable = true)
 |-- SB: long (nullable = true)
 |-- CS: long (nullable = true)
 |-- BB: long (nullable = true)
 |-- HBP: long (nullable = true)
 |-- SO: long (nullable = true)
 |-- GDP: long (nullable = true)
 |-- avg2: double (nullable = true)
 |-- year: long (nullable = true)

+---------+-----------+----+-------------+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------------------+----+
|better_id|better_name|date|opposing_team| avg1|  AB|   R|   H|  2B|  3B|  HR| RBI|  SB|  CS|  BB| HBP|  SO| GDP|               avg2|year|
+---------+-----------+----+--

### Loading dataframes from Parquet
[Parquet](https://en.wikipedia.org/wiki/Apache_Parquet)은 ``중첩된 데이터를 효율적으로 저장할 수 있는 컬럼 기준 저장 포맷``

**자세한 설명은 아래의 주소를 참고하세요**

* http://engineering.vcnc.co.kr/2018/05/parquet-and-spark/

* https://medium.com/ssense-tech/csv-vs-parquet-vs-avro-choosing-the-right-tool-for-the-right-job-79c9f56914a8

* http://parquet.apache.org/



**( Parquet 예시 )**

![json예시](http://engineering.vcnc.co.kr/images/2018/05/parquet-data-io-philadelphia-2013-8-638.jpg)

In [11]:
import urllib.request

f = urllib.request.urlretrieve ("https://docs.google.com/uc?export=download&id=1FKoN6JB34LIvYF571xHrLyVFduj5n-Kj", "users.parquet")
data_file = "./users.parquet"

df = sqlContext.read.load(data_file)
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



## Lets have a look at a real-world dataframe

This dataframe is a small part from a large dataframe (15GB) which stores meteorological data from stations around the world. We will read the dataframe from a zipped parquet file.

- - -

``Spark가 parquet format을 load 하는 메소드는 1줄``

``parquet가 어떤 형태로 되어있는지 반드시 확인할 것!``

```
df = sqlContext.read.load(weather_parquet)
```

In [12]:
from os.path import split,join,exists
from os import mkdir,getcwd,remove
from glob import glob

# create directory if needed

notebook_dir=getcwd()
print(notebook_dir, type(notebook_dir))
data_dir=join(notebook_dir,'Data')
weather_dir=join(data_dir,'Weather')
print(data_dir)
print(weather_dir)

/home/jovyan/work <class 'str'>
/home/jovyan/work/Data
/home/jovyan/work/Data/Weather


In [13]:
# Initializing the directory
if exists(data_dir):
    print('directory',data_dir,'already exists')
else:
    print('making',data_dir)
    mkdir(data_dir)

if exists(weather_dir):
    print('directory',weather_dir,'already exists')
else:
    print('making',weather_dir)
    mkdir(weather_dir)

file_index='NY'
zip_file='%s.tgz'%(file_index)
print(zip_file[:-3])

# For linux
old_files='%s/%s*'%(weather_dir,zip_file[:-3])
print(glob(old_files))
for f in glob(old_files):
    print('removing',f)
#   For Linux
    !rm -rf {f}

directory /home/jovyan/work/Data already exists
directory /home/jovyan/work/Data/Weather already exists
NY.
['/home/jovyan/work/Data/Weather/NY.parquet', '/home/jovyan/work/Data/Weather/NY.tgz']
removing /home/jovyan/work/Data/Weather/NY.parquet
removing /home/jovyan/work/Data/Weather/NY.tgz


In [14]:
!pip install googledrivedownloader



In [15]:
from google_drive_downloader import GoogleDriveDownloader as gdd
import tarfile

gdd.download_file_from_google_drive(file_id='1hAHV6vC6FvVgrYnoN-lR-IfH488-H121',
                                   dest_path = 'Data/Weather/NY.tgz')

!ls -lh $weather_dir/$zip_file

#extracting the parquet file
#!tar zxvf {weather_dir}/{zip_file} -C {weather_dir}


tf = tarfile.open(join(weather_dir,zip_file), mode="r")
tf.extractall(weather_dir)

Downloading 1hAHV6vC6FvVgrYnoN-lR-IfH488-H121 into Data/Weather/NY.tgz... Done.
-rwxr-xr-x 1 root root 64M Oct 28 16:31 /home/jovyan/work/Data/Weather/NY.tgz


In [16]:
weather_parquet = join(weather_dir, zip_file[:-3]+'parquet')
print(weather_parquet)
df = sqlContext.read.load(weather_parquet)
df.printSchema()
df.show(1)

/home/jovyan/work/Data/Weather/NY.parquet
root
 |-- Station: string (nullable = true)
 |-- Measurement: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Values: binary (nullable = true)
 |-- dist_coast: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)

+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|state|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|  

## Writing DataFrames to CSV, JSON, PARQUET

생성된 DataFrame을 ``.write.[csv/json/parquet]("폴더이름")`` 를 통해서 write 가능



설정된 ``폴더이름``에 ``DataFrame이 파티션(분할)되어 지정한 format으로 저장됨``.

```
df.write.csv("폴더이름")
df.write.json("폴더이름")
df.write.parquet("폴더이름")
```

In [17]:
if exists("save"):
    print('save directory already exists')
else:
    mkdir("save")
    df.write.csv("save/csv_test")
    df.write.json("save/json_test")
    df.write.parquet("save/parquet_test")

!ls save # save 폴더 확인

save directory already exists
csv_test  json_test  parquet_test


### Excercise 2 : ``coalesce `` or ``repartition``!! (30 point)

Spark는 기본적으로 RDD or DataFrame or 등등.. 을 생성할 때, Spark Core 개수만큼 파티션을 설정한다. 예를 들어 어떠한 RDD를 생성할 때, Spark Core가 7개라면 7개의 파티션으로 처리한다. 단, DataFrame의 size에 따라 자동변경 될 수도 있다.

``coalesce``와 ``repartition``는 무엇인가....?

``coalesce``와 ``repartition`` 파티션 개수를 줄이거나 늘리는 데 사용한다. 
```
#coalesce 
coalesce (numPartitions: Int, shuffle: Boolean = false)
```

자세한 설명은 [여기1](https://thebook.io/006908/part01/ch04/02/03/02/), [여기2](https://knight76.tistory.com/entry/scala-spark%EC%97%90%EC%84%9C-partition-%EC%A4%84%EC%9D%B4%EA%B8%B0-repartition-coalesce), [여기3](https://m.blog.naver.com/PostView.nhn?blogId=8x8x8x8x8x8&logNo=220740234992&proxyReferer=https%3A%2F%2Fwww.google.com%2F)을 클릭하세요.

또한, Spark의 Shuffling에 대해 이해를 해야 합니다. [여기4](https://swalloow.github.io/spark-shuffling)

- - -
**task**

``coalesce``와 ``repartition``을 이용하여 Exercise 1에서 생성된 DataFrame의 파티션을 수정하여 ``parquet``로 저장합니다.

* 1. [Dataframe].rdd.getNumPartitions() 으로 partition 값을 확인합니다.

* 2. **partition을 수정하지 않고** DataFrame을 parquet로 저장후 ``!ls``를 이용하여 저장된 parquet 파일 목록을 출력합니다. (5 point)

* 3. ``repartition``을 이용하여 DataFrame의 **partition을 2에서 10으로 증가**시킵니다. **partition이 증가된 DataFrame을 parquet로 저장** 후 ``!ls``를 이용하여 저장된 parquet 파일 목록을 출력합니다. (10 point)

* * 4. coalesce를 이용하여 partition의 수를 1로 감소 시킵니다. **partition이 감소된 DataFrame을 parquet로 저장** 후 ``!ls``를 이용하여 저장된 parquet 파일 목록을 출력합니다. (10 pt)

* 5. 첨부된 링크를 이용하여 ``coalesce``와 ``repartition``의 **차이점을 2 ~ 3 문장으로 서술하세요.** (5 point)
- - -
**출력 예시(세부 값(주소)은 다를 수 있음)**
```
# task 4
part-00000-0900cf86-0c0f-47e8-b6ce-d3c6ad359e45-c000.snappy.parquet
part-00001-0900cf86-0c0f-47e8-b6ce-d3c6ad359e45-c000.snappy.parquet
part-00002-0900cf86-0c0f-47e8-b6ce-d3c6ad359e45-c000.snappy.parquet
part-00003-0900cf86-0c0f-47e8-b6ce-d3c6ad359e45-c000.snappy.parquet
part-00004-0900cf86-0c0f-47e8-b6ce-d3c6ad359e45-c000.snappy.parquet
_SUCCESS
```  

**task**
- 1. [Dataframe].rdd.getNumPartitions() 으로 partition 값을 확인합니다.

``#output``
```
Ex1_df partition number :  4
```

In [18]:
# task 2 -1 답안 작성
Ex1_df = sqlContext.read.load(data_file) # data_file의 파티션의 개수를 출력한다.

# output
print("Ex1_df partition number : ", Ex1_df.rdd.getNumPartitions())

Ex1_df partition number :  1


**task**

* 2. **partition을 수정하지 않고** DataFrame을 parquet로 저장후 ``!ls``를 이용하여 저장된 parquet 파일 목록을 출력합니다. (5 point)

``#output``
```
part-00000-27992d24-df2c-4c67-849b-098a4727d5b0-c000.snappy.parquet
part-00001-27992d24-df2c-4c67-849b-098a4727d5b0-c000.snappy.parquet
part-00002-27992d24-df2c-4c67-849b-098a4727d5b0-c000.snappy.parquet
part-00003-27992d24-df2c-4c67-849b-098a4727d5b0-c000.snappy.parquet
_SUCCESS
```

In [20]:
# task 2-2 답안 작성
Ex1_df.write.parquet("ex2_raw_partition") #ex2_raw_partition을 parquet파일로 저장한다.

# output
!ls ex2_raw_partition

part-00000-fe33c956-b9d5-4445-a1e8-e5718e47d669-c000.snappy.parquet  _SUCCESS


**task**

* 3. ``repartition``을 이용하여 DataFrame의 **partition을 4에서 10으로 증가**시킵니다. **partition이 증가된 DataFrame을 parquet로 저장** 후 ``!ls``를 이용하여 저장된 parquet 파일 목록을 출력합니다. (10 point)

``#output``
```
part-00000-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00001-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00002-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00003-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00004-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00005-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00006-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00007-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00008-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
part-00009-1a9b7918-f4f6-4789-ab8d-798cbb9e1feb-c000.snappy.parquet
_SUCCESS
```

In [21]:
# task 2-3 답안 작성
Ex1_df.repartition(10).write.parquet("ex2_repartition") 
# "ex2_repartition을 repartition(10)을 이용해 파티션의 개수를 증가시킨 후 parquet형태로 저장한다."

# output
!ls ex2_repartition

part-00000-c4614c07-eab4-4f59-b1bd-2f03e84063ad-c000.snappy.parquet
part-00001-c4614c07-eab4-4f59-b1bd-2f03e84063ad-c000.snappy.parquet
part-00002-c4614c07-eab4-4f59-b1bd-2f03e84063ad-c000.snappy.parquet
_SUCCESS


**task**

* 4. coalesce를 이용하여 partition의 수를 1로 감소 시킵니다. **partition이 감소된 DataFrame을 parquet로 저장** 후 ``!ls``를 이용하여 저장된 parquet 파일 목록을 출력합니다. (10 point)

``#output``
```
part-00000-01b5e415-ddcc-4d69-9478-4e2db153f103-c000.snappy.parquet  _SUCCESS
```

In [23]:
# task 2-4 답안 작성
Ex1_df.coalesce(1).write.parquet("ex2_coalesce") 
#10개로 늘렸던(실제로는 컴퓨터의 사양에 의해 3개 밖에 출력 안됨)파티션의 수를 coalsece를 사용해 파티션의 개수를 감소시킨 후 
#write를 사용해 parquet형태로 저장한다.

# output
!ls ex2_coalesce

part-00000-38931ca9-3047-42dc-a5f8-728ab7065ef4-c000.snappy.parquet  _SUCCESS


### task 2-5 답안작성 (5 point)
#### coalsece는 partition의 개수를 감소시키는 역할을 하고, repartitiond은 partition의 개수를 증가시키는데 사용된다.

## DataFrame Operations
```
df = sqlContext.read.load(weather_parquet)
df2 = sqlContext\
.read.csv("Regular_Season_Batter_Day_by_Day.csv", header=True)

```

위에서 사용했던 DataFrame을 다시 사용합니다.

In [24]:
df = sqlContext.read.load(weather_parquet)
df2 = sqlContext\
.read.csv("Regular_Season_Batter_Day_by_Day.csv", header=True)

df.show(3)

+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|state|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1946|[99 46 52 46 0B 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1947|[79 4C 75 4C 8F 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+---

### 1. columns & dtypes

``columns`` : DataFrame의 column명 반환

``dtypes``  : DataFrame의 column명 및 데이터 유형 반환

In [25]:
print(df.columns, "\n")   # columns를 이용한 DataFrame의 column 명 추출
print(df.dtypes)          # dtypes를 이용한 column 명 및 type 확인

['Station', 'Measurement', 'Year', 'Values', 'dist_coast', 'latitude', 'longitude', 'elevation', 'state', 'name'] 

[('Station', 'string'), ('Measurement', 'string'), ('Year', 'bigint'), ('Values', 'binary'), ('dist_coast', 'double'), ('latitude', 'double'), ('longitude', 'double'), ('elevation', 'double'), ('state', 'string'), ('name', 'string')]


### 2. show, select, drop, filter

``show(n = 20, truncate = True or False)`` : 처음 n행 출력. truncate는 정렬 여부

``select(col)``  : DataFrame의 지정된 column 반환

``drop(col)``    : DataFrame의 지정된 column 삭제

``filter(condition)`` : True or False로 평가되는 condition 인수에 의한 DataFrame 반환

``distinct()`` : 모든 column의 모든 값이 동일한 row 제거

In [26]:
# show
df.show(10) # n = 10, truncate=True

+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|state|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1946|[99 46 52 46 0B 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1947|[79 4C 75 4C 8F 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1948|[72 48 7A 48 85 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   

In [27]:
# select(col)
df.select("Station", "Measurement", "Year", "Values", "state").show(5)

+-----------+-----------+----+--------------------+-----+
|    Station|Measurement|Year|              Values|state|
+-----------+-----------+----+--------------------+-----+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|   NY|
|USW00094704|   PRCP_s20|1946|[99 46 52 46 0B 4...|   NY|
|USW00094704|   PRCP_s20|1947|[79 4C 75 4C 8F 4...|   NY|
|USW00094704|   PRCP_s20|1948|[72 48 7A 48 85 4...|   NY|
|USW00094704|   PRCP_s20|1949|[BB 49 BC 49 BD 4...|   NY|
+-----------+-----------+----+--------------------+-----+
only showing top 5 rows



In [28]:
# drop(col)
df.drop("state").show(5)

+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1946|[99 46 52 46 0B 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1947|[79 4C 75 4C 8F 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1948|[72 48 7A 48 85 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|DANSVILLE MUNI AP|
|USW00094704|   PRCP

### 4. Aggregations

* **Aggregation** can be used, in combination with built-in sparkSQL functions 
to compute statistics of a dataframe.

* computation will be fast thanks to combined optimzations with database operations.

* A partial list : `count(), approx_count_distinct(), avg(), max(), min()`

* Of these, the interesting one is `approx_count_distinct()` which uses sampling to get an approximate count fast. (`approximation`(근사치)를 이용해 빠른 결과값 도출 가능)

* pyspark에서 이용 가능한 모든 Aggregations function은 [여기서 확인 하시오!!!](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#module-pyspark.sql.functions)

```
from pyspark.sql.functions import [사용하고자 하는 function]
```

```

아래의 예제부터는 DataFrame에서 특정 column을 선택할 때, col() 사용하여 진행하겠습니다.

```

In [29]:
df = sqlContext.read.load(weather_parquet)
df.printSchema()

root
 |-- Station: string (nullable = true)
 |-- Measurement: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Values: binary (nullable = true)
 |-- dist_coast: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)



#### (1) Count, CountDistinct, approx_count_distinct

**전체 row의 수 count**

In [30]:
from pyspark.sql.functions import col, count, countDistinct, approx_count_distinct

df.select(count(col("Station"))).show()

+--------------+
|count(Station)|
+--------------+
|        168398|
+--------------+



``countDistinct``는 고유한(중복없이) row의 수를 반환합니다


In [31]:
df.select(countDistinct(col("Station"))).show()

+-----------------------+
|count(DISTINCT Station)|
+-----------------------+
|                    343|
+-----------------------+



approx_count_distinct는 row의 근사치를 반환합니다.

* approx_count_distinct는 최대 추정 오류율(maximum estimation error)의 값을 인자로 받는데, 자신이 오류를 어디까지 허용하는지에 따라 값을 주면됩니다.

* count_distinct를 통해 얻는 실제값과의 차이는 최대 추정 오류의 수치에 따라 상이합니다.

* **하지만, 연산은 count_distinct보다 더 빠르게 결과를 반환합니다. 즉, 대규모 데이터셋을 파악할 때 사용될 수 있습니다.!!!**

In [32]:
df.select(approx_count_distinct(col("Station"), 0.2)).show()

+------------------------------+
|approx_count_distinct(Station)|
+------------------------------+
|                           330|
+------------------------------+



#### (2) first, last
첫 번째 row와 마지막 row

In [33]:
from pyspark.sql.functions import first, last

df.select(first(col("Station")), last(col("Station"))).show()

+---------------------+--------------------+
|first(Station, false)|last(Station, false)|
+---------------------+--------------------+
|          USW00094704|         USC00307664|
+---------------------+--------------------+



#### (3) min, max

In [34]:
from pyspark.sql.functions import min, max

df.select(min(col("year")), max(col("year"))).show()

+---------+---------+
|min(year)|max(year)|
+---------+---------+
|     1871|     2013|
+---------+---------+



#### (4) sum, sumDistinct

``sumDistinct``는 고유값(중복 없는)의 sum!!

In [35]:
from pyspark.sql.functions import sum, sumDistinct

df.select(sum(col("dist_coast"))).show()

df.select(sumDistinct(col("dist_coast"))).show()

+-------------------+
|    sum(dist_coast)|
+-------------------+
|4.138962684120101E7|
+-------------------+

+------------------------+
|sum(DISTINCT dist_coast)|
+------------------------+
|       76862.93162703142|
+------------------------+



#### (5) avg, alias

``alias``는 SQL에서 ``as``와 동일한 기능을 합니다. 집계된 column을 재활용하기 위해 이름을 설정한다고 생각하시면 됩니다.

In [36]:
from pyspark.sql.functions import avg

df.select(avg(col("year")), avg(col("latitude"))).show()

# alias를 사용하게 되면...
df.select(avg(col("year")).alias("year_avg"), 
          avg(col("latitude")).alias("latitude_avg")).show()

+------------------+----------------+
|         avg(year)|   avg(latitude)|
+------------------+----------------+
|1963.4289124573927|42.6842968505041|
+------------------+----------------+

+------------------+----------------+
|          year_avg|    latitude_avg|
+------------------+----------------+
|1963.4289124573927|42.6842968505041|
+------------------+----------------+



#### (6) agg and groupby
The method `.agg(spec)` computes a summary for each group as specified in `spec`
The method `.groupby(col)` groups rows according the value of the column `col`.  

``groupy``로 원하는 **column의 데이터를 그룹화**하고, ``agg``를 통해 여러 aggregation을 동시에 진행한다.

``agg의 사용 에시``
```
[DataFrame].agg({'colum1':'[aggregation]', 'coulum2':'[aggregation]', ...})
```

In [37]:
df.agg({'year':'avg', 'latitude':'max', 'dist_coast':'min'}).show()

+-------------------+------------------+-----------------+
|    min(dist_coast)|         avg(year)|    max(latitude)|
+-------------------+------------------+-----------------+
|0.04799420014023781|1963.4289124573927|44.93579864501953|
+-------------------+------------------+-----------------+



In [38]:
df.groupby(col('Measurement')).agg({'year': 'min', 'state':'count'}).show()

+-----------+---------+------------+
|Measurement|min(year)|count(state)|
+-----------+---------+------------+
|   TMIN_s20|     1873|       13442|
|       TMIN|     1873|       13442|
|   SNOW_s20|     1884|       15629|
|       TOBS|     1876|       10956|
|   SNWD_s20|     1888|       14617|
|   PRCP_s20|     1871|       16118|
|   TOBS_s20|     1876|       10956|
|       TMAX|     1873|       13437|
|       SNOW|     1884|       15629|
|   TMAX_s20|     1873|       13437|
|       SNWD|     1888|       14617|
|       PRCP|     1871|       16118|
+-----------+---------+------------+



In [39]:
df.groupby(col('station')).agg({'year': 'min'}).show()

+-----------+---------+
|    station|min(year)|
+-----------+---------+
|USC00303955|     1992|
|USW00093732|     1958|
|USW00014786|     1945|
|USC00300621|     1950|
|USC00301387|     1926|
|USC00305426|     1896|
|USC00306659|     1898|
|USC00303124|     1971|
|USC00303983|     1950|
|USC00300343|     1895|
|USC00305441|     1973|
|USC00303050|     1948|
|USC00300360|     1907|
|USW00004742|     1956|
|USC00301401|     1902|
|USC00306817|     1893|
|USC00308104|     1901|
|USC00305769|     1985|
|USC00303889|     1926|
|USC00306019|     1942|
+-----------+---------+
only showing top 20 rows



In [40]:
df.groupby('state').agg({'latitude':'mean', 'longitude':'mean'}).show()

+-----+----------------+-----------------+
|state|   avg(latitude)|   avg(longitude)|
+-----+----------------+-----------------+
|   NY|42.6842968505041|-75.4551864389521|
+-----+----------------+-----------------+



#### (7) describe()

The method `df.describe()` computes five statistics for each column of the dataframe `df`.

    The statistics are: **count, mean, std, min,max**

R의 summary와 같은 기능. 각 column의 통계치를 간단하게 구할 수 있다.

In [41]:
df.describe().show()    # 전체 column

+-------+-----------+-----------+------------------+-------------------+------------------+------------------+------------------+------+---------------+
|summary|    Station|Measurement|              Year|         dist_coast|          latitude|         longitude|         elevation| state|           name|
+-------+-----------+-----------+------------------+-------------------+------------------+------------------+------------------+------+---------------+
|  count|     168398|     168398|            168398|             168398|            168398|            168398|            168398|168398|         168398|
|   mean|       null|       null|1963.4289124573927| 245.78455113006692|  42.6842968505041| -75.4551864389521| 245.2899639266881|  null|           null|
| stddev|       null|       null|30.586766032145405| 129.97112783972682|1.0492530244970353|1.7907915903419898| 189.6934270109707|  null|           null|
|    min|USC00300015|       PRCP|              1871|0.04799420014023781| 39.799999

In [42]:
df.describe().select(col('summary'),col('station'),
                    col('year')).show() # select를 이용한 year column 선택

+-------+-----------+------------------+
|summary|    station|              year|
+-------+-----------+------------------+
|  count|     168398|            168398|
|   mean|       null|1963.4289124573927|
| stddev|       null|30.586766032145405|
|    min|USC00300015|              1871|
|    max|USW00094794|              2013|
+-------+-----------+------------------+



### 5. Join

**왼쪽과 오른쪽**의 데이터셋에 있는 ``하나 이상의 key(키)값``을 비교하고 왼쪽과 오른쪽 데이터셋의 결합 여부를 결정하는 ``조인 표현식(join expression)``의 평가 결과에 따라 두 개의 데이터셋를 조인합니다.

- - -

**조인 타입**

```
- 내부 조인(inner join) : 왼쪽과 오른쪽 데이터셋에 키가 있는 row를 유지
- 외부 조인(outer join) : 왼쪽이나 오른쪽 데이터셋에 키가 있는 row를 유지
- 왼쪽 외부 조인(left outer join) : 왼쪽 데이터셋에 키가 있는 row를 유지
- 오른쪽 외부 조인(right outer join) : 오른쪽 데이터셋에 키가 있는 row를 유지
- 왼쪽 세미 조인(left semi join) : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에는 키가 일치하는 왼쪽 데이터셋만 유지
- 왼쪽 안티 조인(left anti join) : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않는 왼쪽 데이터셋만 유지
- 자연 조인(natural join) : 두 데이터셋에서 동일한 이름을 가진 column을 암시적(implicit)으로 결합하는 조인
```


In [43]:
#### 예제 수행을 위한 DataFrame 생성.... 실제로는 DataFrame을 만드는 경우는 거의 없다...!

person_RDD = sc.parallelize([Row(ID=0, NAME="Bill Chambers",GRADUATE_PROGRAM=0,
                                 spark_status = [100]),
                             Row(ID=1, NAME="Matei Zaharia",GRADUATE_PROGRAM=1,
                                 spark_status = [500, 250, 100]),
                             Row(ID=2, NAME="Michael Armbrust",GRADUATE_PROGRAM=1,
                                 spark_status = [250, 100])
                            ])
graduateProgram_RDD = sc.parallelize([Row(ID=0, DEGREE="Masters",
                                      DEPARTMENT="School of Information",
                                      SCHOLL = "UC Berkeley"),
                                  Row(ID=2, DEGREE="Masters",
                                      DEPARTMENT="EECS",
                                      SCHOLL = "UC Berkeley"),
                                  Row(ID=1, DEGREE="PH.D",
                                      DEPARTMENT="EECS",
                                      SCHOLL = "UC Berkeley"),
                            ])
spark_status_RDD = sc.parallelize([Row(ID=500, status="Vice President"),
                                   Row(ID=250, status="PMC Member"),
                                   Row(ID=100, status="Contributor")
                                  ])


person_DF = sqlContext.createDataFrame(person_RDD)
graduateProgram_DF = sqlContext.createDataFrame(graduateProgram_RDD)
sparkStatus_DF = sqlContext.createDataFrame(spark_status_RDD)

In [44]:
print("person's schema")
person_DF.printSchema()
print("graduateProgram's schema")
graduateProgram_DF.printSchema()
print("sparkStatus's schema")
sparkStatus_DF.printSchema()

person's schema
root
 |-- GRADUATE_PROGRAM: long (nullable = true)
 |-- ID: long (nullable = true)
 |-- NAME: string (nullable = true)
 |-- spark_status: array (nullable = true)
 |    |-- element: long (containsNull = true)

graduateProgram's schema
root
 |-- DEGREE: string (nullable = true)
 |-- DEPARTMENT: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- SCHOLL: string (nullable = true)

sparkStatus's schema
root
 |-- ID: long (nullable = true)
 |-- status: string (nullable = true)



### (1) inner join

In [45]:
# joinExpression 생성. 키값 설정!
join_ex = person_DF["GRADUATE_PROGRAM"] == graduateProgram_DF["ID"]

In [46]:
person_DF.join(graduateProgram_DF, join_ex).show()

+----------------+---+----------------+---------------+-------+--------------------+---+-----------+
|GRADUATE_PROGRAM| ID|            NAME|   spark_status| DEGREE|          DEPARTMENT| ID|     SCHOLL|
+----------------+---+----------------+---------------+-------+--------------------+---+-----------+
|               0|  0|   Bill Chambers|          [100]|Masters|School of Informa...|  0|UC Berkeley|
|               1|  1|   Matei Zaharia|[500, 250, 100]|   PH.D|                EECS|  1|UC Berkeley|
|               1|  2|Michael Armbrust|     [250, 100]|   PH.D|                EECS|  1|UC Berkeley|
+----------------+---+----------------+---------------+-------+--------------------+---+-----------+



### (2) outer join

In [47]:
person_DF.join(graduateProgram_DF, join_ex, "outer").show()

+----------------+----+----------------+---------------+-------+--------------------+---+-----------+
|GRADUATE_PROGRAM|  ID|            NAME|   spark_status| DEGREE|          DEPARTMENT| ID|     SCHOLL|
+----------------+----+----------------+---------------+-------+--------------------+---+-----------+
|               0|   0|   Bill Chambers|          [100]|Masters|School of Informa...|  0|UC Berkeley|
|               1|   1|   Matei Zaharia|[500, 250, 100]|   PH.D|                EECS|  1|UC Berkeley|
|               1|   2|Michael Armbrust|     [250, 100]|   PH.D|                EECS|  1|UC Berkeley|
|            null|null|            null|           null|Masters|                EECS|  2|UC Berkeley|
+----------------+----+----------------+---------------+-------+--------------------+---+-----------+



### (3) left outer join

In [48]:
graduateProgram_DF.join(person_DF, join_ex, "left_outer").show()

+-------+--------------------+---+-----------+----------------+----+----------------+---------------+
| DEGREE|          DEPARTMENT| ID|     SCHOLL|GRADUATE_PROGRAM|  ID|            NAME|   spark_status|
+-------+--------------------+---+-----------+----------------+----+----------------+---------------+
|Masters|School of Informa...|  0|UC Berkeley|               0|   0|   Bill Chambers|          [100]|
|   PH.D|                EECS|  1|UC Berkeley|               1|   1|   Matei Zaharia|[500, 250, 100]|
|   PH.D|                EECS|  1|UC Berkeley|               1|   2|Michael Armbrust|     [250, 100]|
|Masters|                EECS|  2|UC Berkeley|            null|null|            null|           null|
+-------+--------------------+---+-----------+----------------+----+----------------+---------------+



### (4) left semi join

In [49]:
graduateProgram_DF.join(person_DF, join_ex, "left_semi").show()

+-------+--------------------+---+-----------+
| DEGREE|          DEPARTMENT| ID|     SCHOLL|
+-------+--------------------+---+-----------+
|Masters|School of Informa...|  0|UC Berkeley|
|   PH.D|                EECS|  1|UC Berkeley|
+-------+--------------------+---+-----------+



### (5) left anti join

In [50]:
graduateProgram_DF.join(person_DF, join_ex, "left_anti").show()

+-------+----------+---+-----------+
| DEGREE|DEPARTMENT| ID|     SCHOLL|
+-------+----------+---+-----------+
|Masters|      EECS|  2|UC Berkeley|
+-------+----------+---+-----------+



## 6. ★★UDF(사용자 정의 함수)..★

파이썬과 외부 라이브러리를 사용해서 사용자가 원하는 형태로 transformation 할 수 있다.

**UDF는 하나 이상의 column을 입력으로 받고, 사용자 정의에 따라 return**

In [51]:
udfExamDF = sqlContext\
.createDataFrame(sc.parallelize([Row(num1 = x, num2 = x*x) for x in range(5)]))
udfExamDF.show()

+----+----+
|num1|num2|
+----+----+
|   0|   0|
|   1|   1|
|   2|   4|
|   3|   9|
|   4|  16|
+----+----+



In [52]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

def power3(double_value):
    return double_value ** 3

def power10(double_value):
    return double_value ** 10

power3udf = udf(power3)  # python 함수를 udf로 등록
power10udf = udf(power10)  # python 함수를 udf로 등록

In [53]:
udfExamDF.select(power3udf(col('num1')), col('num2')).show()

+------------+----+
|power3(num1)|num2|
+------------+----+
|           0|   0|
|           1|   1|
|           8|   4|
|          27|   9|
|          64|  16|
+------------+----+



In [54]:
udfExamDF.select(power3udf(col('num1')),power10udf(col('num2'))).show()

+------------+-------------+
|power3(num1)|power10(num2)|
+------------+-------------+
|           0|            0|
|           1|            1|
|           8|      1048576|
|          27|   3486784401|
|          64|1099511627776|
+------------+-------------+



In [55]:
udfExamDF.select('num1', 'num2',power3udf('num1'), power10udf('num1'), 
                power3udf('num2'), power10udf('num2')).show()

+----+----+------------+-------------+------------+-------------+
|num1|num2|power3(num1)|power10(num1)|power3(num2)|power10(num2)|
+----+----+------------+-------------+------------+-------------+
|   0|   0|           0|            0|           0|            0|
|   1|   1|           1|            1|           1|            1|
|   2|   4|           8|         1024|          64|      1048576|
|   3|   9|          27|        59049|         729|   3486784401|
|   4|  16|          64|      1048576|        4096|1099511627776|
+----+----+------------+-------------+------------+-------------+



## Exercise 3 -

### DataFrame을 사용하여 HW4 Exercise 4 다시 풀기! (40 point)

- - -
 
다음 데이터에 대하여 다음 과제를 수행하세요.

- regular.csv : KBO에서 활약한 타자들의 역대 정규시즌 성적을 포함하여 몸무게, 키 ,생년월일 등의 기본정보
- pre.csv : KBO에서 활약한 타자들의 **역대 시범경기(정규시즌 직전에 여는 연습경기)** 성적

**위의 두 데이터는 모두 `,`로 구분되어 있습니다.**

 - **데이터의 자세한 설명은 다음의 링크를 참조해주세요.([여기를 눌러서 12. 데이터 설명 참고](https://dacon.io/cpt6/62885))**
 - 또한 regular.csv와 pre.csv를 직접 열어서 데이터가 어떻게 저장되어 있는지 확인해주세요.

★[column type 변경 참고](https://stackoverflow.com/questions/52871560/how-to-typecast-spark-dataframe-columns-using-pyspark)★

★[column name 변경 참고](https://docs.microsoft.com/en-us/dotnet/api/microsoft.spark.sql.dataframe.withcolumnrenamed?view=spark-dotnet)★

- - -
**task**

- 1. pandas 또는 .read.csv를 이용하여 ``regular.csv``와 ``pre.csv``를 각각의 DataFrame으로 만듭니다.


- 2. 생성된 각각의 DataFrame에서 **타자 이름(batter_name), 타수(AB), 안타(H)을 ``.select(col)``를 이용하여 Transformation**합니다. **단, 새로운 DataFrame의 각 column type은 string, flot 또는 double로 변환합니다.** (10 point)


- 3. task2에서 생성된 DataFrame 각각에 ``groupby``, ``agg``, ``udf`` 또는 ``임의의 함수를 자유롭게 적용``하여 **정규 시즌과 시범경기의 평균 타율을 구한 후 선수 이름(column 1), 평균 타율(column 2)로 구성된 DataFrame으로 Transformation 합니다(regular, pre 모두).** (10 point)


- 4. task3의 각 DataFrame(regular, pre)를 **batter_name**을 기준으로 ``join``하여,**``역대 정규시즌 평균 타율이 역대 시범경기 평균 타율보다 높은``, 선수 이름(column 1)과 해당 선수의 역대 정규시즌 평균타율(column 2)로 구성된 DataFrame으로 Transformation 합니다.**(10 point)


- 5. task4에서 생성된 DataFrame에 ``내림차순``을 적용하여 **상위 10명의 선수의 이름과 역대 정규시즌 평균타율 출력합니다. 단, column 명을 출력 예시와 같게 변경할 것.** (10 point)

---

**DataFrame join 할 때...★**

    - DataFrame의 기준이 되는 column 명(키값)이 서로 같으면 오류가 발생합니다. 

    - 따라서 서로 다른 column 명을 사용해야 됩니다(alias 또는 withColumnRenamed를 사용하여 column 명 변경)

    - JoinExpression을 잘 활용하시면 됩니다.

**task**
- 1. pandas 또는 .read.csv를 이용하여 ``regular.csv``와 ``pre.csv``를 각각의 DataFrame으로 만듭니다.

In [56]:
# pre.csv download
from pyspark.sql.functions import col, desc
import urllib.request
import re

f = urllib.request\
.urlretrieve ("https://docs.google.com/uc?export=download&id=1t3icaDgI5KeNEwNmaWFOYGYQtdY8NOMm",
              "regular.csv")
f = urllib.request\
.urlretrieve ("https://docs.google.com/uc?export=download&id=1g4r8tCCocVwCg6pTWaeioMdmtETYo_cf",
              "pre.csv")

reg_df = sqlContext.read.csv("regular.csv", header = True)
pre_df = sqlContext.read.csv("pre.csv", header = True)

reg_df.show(2)
pre_df.show(2)

+---------+-----------+----+----+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----+-----+---+-------------+----------------+----------------+--------------------------------+---------------+-----+
|batter_id|batter_name|year|team|  avg|  G| AB|  R|  H| 2B| 3B| HR| TB|RBI| SB| CS| BB|HBP| SO|GDP|  SLG|  OBP|  E|height/weight|       year_born|        position|                          career|starting_salary|  OPS|
+---------+-----------+----+----+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----+-----+---+-------------+----------------+----------------+--------------------------------+---------------+-----+
|        0|   가르시아|2018|  LG|0.339| 50|183| 27| 62|  9|  0|  8| 95| 34|  5|  0|  9|  8| 25|  3|0.519|0.383|  9|   177cm/93kg|1985년 04월 12일|내야수(우투우타)|          쿠바 Ciego de Avila...|           null|0.902|
|        1|     강경학|2011|한화|  0.0|  2|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0.0|  0.0|  1|   180cm/72kg|1992년 08월 11일|내야

**task**

- 2. 생성된 각각의 DataFrame에서 **타자 이름(batter_name), 타수(AB), 안타(H)을 ``.select(col)``를 이용하여 Transformation**합니다. **단, 새로운 DataFrame의 각 column type은 string, float 또는 double로 변환합니다.** (10 point)

``` # output```
```
★★regular DataFrame★★
+-----------+-----+----+
|batter_name|   AB|   H|
+-----------+-----+----+
|   가르시아|183.0|62.0|
|     강경학|  1.0| 0.0|
|     강경학| 86.0|19.0|
+-----------+-----+----+
only showing top 3 rows

root
 |-- batter_name: string (nullable = true)
 |-- AB: double (nullable = true)
 |-- H: double (nullable = true)

★★pre DataFrame★★
+-----------+----+---+
|batter_name|  AB|  H|
+-----------+----+---+
|   가르시아|20.0|7.0|
|     강경학| 2.0|0.0|
|     강경학| 0.0|0.0|
+-----------+----+---+
only showing top 3 rows
```

In [57]:
# 5-2 답안 작성 
reg_task2 = reg_df.select(reg_df.batter_name.cast("string"),reg_df.AB.cast("float"),reg_df.H.cast("float")) 
pre_task2 = pre_df.select(pre_df.batter_name.cast("string"),pre_df.AB.cast("float"),pre_df.H.cast("float")) 
#.select()를 사용해 새로운 DataFrame으로 Transform하고, batter_name , AB, H를 각각 string형 float형, float형으로 변환한다.
# 형 변환 시 cast("type")을 사용해 형변환을 한다.

# output
print("★★regular DataFrame★★")
reg_task2.show(3)
reg_task2.printSchema()
print("★★pre DataFrame★★")
pre_task2.show(3)

★★regular DataFrame★★
+-----------+-----+----+
|batter_name|   AB|   H|
+-----------+-----+----+
|   가르시아|183.0|62.0|
|     강경학|  1.0| 0.0|
|     강경학| 86.0|19.0|
+-----------+-----+----+
only showing top 3 rows

root
 |-- batter_name: string (nullable = true)
 |-- AB: float (nullable = true)
 |-- H: float (nullable = true)

★★pre DataFrame★★
+-----------+----+---+
|batter_name|  AB|  H|
+-----------+----+---+
|   가르시아|20.0|7.0|
|     강경학| 2.0|0.0|
|     강경학| 0.0|0.0|
+-----------+----+---+
only showing top 3 rows



**task**
- 3. task2에서 생성된 각각의 DataFrame에 ``groupby``, ``agg``, ``udf`` 또는 ``임의의 함수를 자유롭게 적용``하여 **역대 정규 시즌과 시범경기의 평균 타율을 구한 후 선수 이름(column 1), 평균 타율(column 2)로 구성된 DataFrame으로 Transformation 합니다(regular, pre 모두). 단, 각각의 DataFrame의 batter_name(선수 이름)과 avg(평균 타율) column의 이름을 regular/pre_batter, regular/pre_avg로 변경할 것(``alias`` 또는 ``withColumnRenamed`` 사용),** (10 point)

``` # output ```
```
★★regular DataFrame★★
+--------------+-------------------+
|regular_batter|        regular_avg|
+--------------+-------------------+
|        김하성| 0.2879359095193214|
|        도태훈|0.20454545454545456|
|        이종범| 0.2965346534653465|
+--------------+-------------------+
only showing top 3 rows

★★pre DataFrame★★
+----------+-------------------+
|pre_batter|            pre_avg|
+----------+-------------------+
|    김하성|0.19327731092436976|
|    도태훈|0.22727272727272727|
|    정상호|0.22674418604651161|
+----------+-------------------+
only showing top 3 rows
```

In [61]:
# 5-3 답안 작성
from pyspark.sql.functions import udf
def avgHit(AB_sum, H_sum):
    if AB_sum == 0:
        return 0
    else:   
        avg = H_sum/AB_sum
    return avg
# 타수를 안타로 나눠 평균타율을 구하는 함수 작성, 0인 값이 있을 수 있어 타수가 0이면 안타도 0이기 때문에 타수가 0이면 0을 리턴하는 예외처리를 함
    
avgHit_udf = udf(avgHit) # 파이썬 함수를 udf함수로 등록
reg_task3 = reg_task2.groupby("batter_name").agg({"AB":"sum", "H":"sum"}).select(col("batter_name").alias("regular_batter"),
                                                                                 avgHit_udf("sum(AB)","sum(H)").alias("regular_avg"))                                                                                
pre_task3 = pre_task2.groupby("batter_name").agg({"AB":"sum", "H":"sum"}).select(col("batter_name").alias("pre_batter"),
                                                                                 avgHit_udf("sum(AB)","sum(H)").alias("pre_avg")) 
#타자(batter_name)을 gropby를 사용해 묶고, agg를 사용해 타수(AB)와 안타(H)의 합을 계산 후 selet로 위에서 작성한 avgHit_udf함수에 
#적용하여 colume이 batter_name인 것과 sum(AB), sum(H)인 것을 뽑아내고, 
#alias함수를 사용해 batter_name을 regular/pre_batter로, sum(AB),sum(H)를 regular/pre_avg로 재명명 한다.
# output
print("★★regular DataFrame★★")
reg_task3.show(3)
print("★★pre DataFrame★★")
pre_task3.show(3)

★★regular DataFrame★★
+--------------+-------------------+
|regular_batter|        regular_avg|
+--------------+-------------------+
|        김하성| 0.2879359095193214|
|        도태훈|0.20454545454545456|
|        이종범| 0.2965346534653465|
+--------------+-------------------+
only showing top 3 rows

★★pre DataFrame★★
+----------+-------------------+
|pre_batter|            pre_avg|
+----------+-------------------+
|    김하성|0.19327731092436976|
|    도태훈|0.22727272727272727|
|    정상호|0.22674418604651161|
+----------+-------------------+
only showing top 3 rows



**task**
- 4. task3의 각 DataFrame(regular, pre)을 **batter_name**을 기준으로 ``join``하여,**``역대 정규시즌 평균 타율이 역대 시범경기 평균 타율보다 높은``, 선수 이름(column 1)과 해당 선수의 역대 정규시즌 평균타율(column 2)로 구성된 DataFrame으로 Transformation 합니다.**(10 point)

``` # Output ```
```
★★regular and pre joined★★
+--------------+-------------------+
|regular_batter|        regular_avg|
+--------------+-------------------+
|        김하성| 0.2879359095193214|
|        정상호|0.24957264957264957|
|      스크럭스| 0.2771855010660981|
|        지석훈| 0.2260519247985676|
|        박경수| 0.2566744730679157|
|        박정음|0.27440633245382584|
|        박병호| 0.2893900889453621|
+--------------+-------------------+
only showing top 7 rows
```

In [87]:
# 5-4 답안 작성
joinEx = reg_task3["regular_batter"] == pre_task3["pre_batter"] #joinexpresion
mask = task4["regular_avg"] > task4["pre_avg"]
task4 = reg_task3.join(pre_task3, joinEx)[mask].select(col("regular_batter"),col("regular_avg"))
# reg_task3와 pre_task3를 regular_battar와 pre_battar의 이름이 같은(같은 key를 가진)것을 기준으로 조인 한 후 regular_avg가 pre_avg보다
# 큰 경우를 추출 한 후 select를 이용해 ragular_battat와 regular_avg를 추출한다.
# output
print("★★regular and pre joined★★")
task4.show(7)

★★regular and pre joined★★
+--------------+-------------------+
|regular_batter|        regular_avg|
+--------------+-------------------+
|        김하성| 0.2879359095193214|
|        정상호|0.24957264957264957|
|      스크럭스| 0.2771855010660981|
|        지석훈| 0.2260519247985676|
|        박경수| 0.2566744730679157|
|        박정음|0.27440633245382584|
|        박병호| 0.2893900889453621|
+--------------+-------------------+
only showing top 7 rows



**task**
- 5. task4에서 생성된 DataFrame에 ``내림차순``을 적용하여 **상위 10명의 선수의 이름과 역대 정규시즌 평균타율 출력합니다. 단, column 명을 출력 예시와 같게 변경할 것.** (10 point)

``` # Output ```
```
+---------+-----------------------+
|선수 이름|역대 정규시즌 평균 타율|
+---------+-----------------------+
|   장승현|    0.38461538461538464|
|   전병우|    0.36363636363636365|
|   이정후|    0.33827893175074186|
|   박건우|    0.33410538506079906|
|   김태진|     0.3333333333333333|
|   구자욱|    0.33191489361702126|
|   손아섭|    0.32515082171832743|
|   김태균|     0.3247439180537772|
|   박민우|    0.32383536861148804|
|   김현수|     0.3226377517149812|
+---------+-----------------------+
only showing top 10 rows
```

In [110]:
# 5-5 답안 작성
task5 = task4.sort(desc('regular_avg')) # take4의 Dataframe의 regular_avg를 내림차순으로 정렬한다.
task5 = task5.select(col("regular_batter").alias("선수 이름"),col("regular_avg").alias("역대 정규시즌 평균 타율"))
#그런 후 alias를 사용하여 colume의 이름을 재명명한다.
# output
task5.show(10)

+---------+-----------------------+
|선수 이름|역대 정규시즌 평균 타율|
+---------+-----------------------+
|   장승현|    0.38461538461538464|
|   전병우|    0.36363636363636365|
|   이정후|    0.33827893175074186|
|   박건우|    0.33410538506079906|
|   김태진|     0.3333333333333333|
|   구자욱|    0.33191489361702126|
|   손아섭|    0.32515082171832743|
|   김태균|     0.3247439180537772|
|   박민우|    0.32383536861148804|
|   김현수|     0.3226377517149812|
+---------+-----------------------+
only showing top 10 rows



## *수고! ㅋ*