# Quickstart : DataFrame
Pyspark DataFrame은 RDD의 가장 위에서 시행된다. Spark가 데이터를 변형할 때, 변형을 바로 계산하지 않고 나중에 어떻게 계산할 지 계획만 세운다. `collect()`같은 동작이 호출되었을 때 계산이 시작된다.   

**Pyspark 어플리케이션의 시작**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

22/05/31 10:21:15 WARN Utils: Your hostname, qwer-ND resolves to a loopback address: 127.0.1.1; using 10.140.50.75 instead (on interface enp3s0)
22/05/31 10:21:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/05/31 10:21:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### DataFrame Creation
PySpark의 DataFrame은 `pyspark.sql.SparkSession.createDataFrame`으로 만들 수 있다.   
보통 lists, tuples, dictionaries나 `pyspark.sql.Row`s

In [2]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

# Create a PySpark DataFrame from a list of rows
df_row = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# Create a PySpark DataFrame with an explicit schema.
df_explicit = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')

# Create a PySpark DataFrame from a pandas DataFrame
df_temp = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df_pandas = spark.createDataFrame(df_temp)

# Create a PySpark DataFrame from an RDD consisting of a list of tuples.
rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df_rdd = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])

df_row.show()
df_rdd.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



### Viewing Data

In [4]:
# Top n rows
df_row.show(2)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 2 rows



주피터 노트북같은 곳에서 PySpark DataFrame의 조급한 계산을 위해 `spark.sql.repl.eagerEval.enabled` 설정을 할 수 있다. 나타낼 row의 수는 `spark.sql.repl.eagerEval.maxNumRows`로 조절한다.

In [14]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df_row

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
4,5.0,string3,2000-03-01,2000-01-03 12:00:00


In [17]:
# Show vertically
df_row.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



In [20]:
# Show columns
df_row.columns

['a', 'b', 'c', 'd', 'e']

In [21]:
# Show the summary
df_row.select('a', 'b', 'c').describe().show()

+-------+------------------+------------------+-------+
|summary|                 a|                 b|      c|
+-------+------------------+------------------+-------+
|  count|                 3|                 3|      3|
|   mean|2.3333333333333335|3.3333333333333335|   null|
| stddev|1.5275252316519468|1.5275252316519468|   null|
|    min|                 1|               2.0|string1|
|    max|                 4|               5.0|string3|
+-------+------------------+------------------+-------+



`DataFrame.collect()`는 Python에서, 로컬 데이터로서, 분산된 데이터를 드라이버 쪽에 수집한다. 드라이버 쪽으로 모든 데이터를 수집하기 때문에 데이터셋의 크기가 드라이버 쪽보다 너무 크면 out-of-memory 에러를 일으킬 수 있다.   
이 문제를 피하기 위해 `DataFrame.take()` 또는 `DataFrame.tail()`을 사용한다.

In [6]:
df_row.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=4, b=5.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [7]:
df_row.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

Pyspark의 DataFrame은 pandas DataFrame으로의 되돌리는 변환도 제공한다. 여전히 out-of-momory의 위험은 남아있다.

In [8]:
df_row.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,4,5.0,string3,2000-03-01,2000-01-03 12:00:00


### Selecting and Accessing Data
Pyspark의 DataFrame은 느긋한 계산법을 따르고, 단순히 column을 선택하는 것은 계산을 시작하지 않고 단지 `Column` 인스턴스를 반환한다.   
사실, 대부분의 column-wise 작업은 `Column`들을 반환한다.   

In [12]:
print(df_row.a)

from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df_row.c) == type(upper(df_row.c)) == type(df_row.c.isNull())

Column<'a'>


True

`Column`들은 DafaFrame에서 column들을 선택하기 위해 사용될 수 있다.   
- ex) `DataFramae.select()`는 `Column` 인스턴스들을 가져다가 다른 DataFrame을 반환한다.

In [13]:
df_row.select(df_row.c).show()
df_row.withColumn('upper_c', upper(df_row.c)).show()
df_row.filter(df_row.a==1).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



### Applying a Function
PySpark는 사용자들이 Python의 기본 함수들을 사용할 수 있도록 하는 다양한 UDF와 API를 지원한다.

In [15]:
# It allows users to directly use the APIs in pd Series
# within python native function
import pandas
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pd_plus_one(series:pd.Series) -> pd.Series:
    return series+1
df_row.select(pd_plus_one(df_row.a)).show()

+--------------+
|pd_plus_one(a)|
+--------------+
|             2|
|             3|
|             5|
+--------------+



In [55]:
# hell = pandas.DataFrame(pandas.Series([1,2,3,4,5,4,3]),index=None,columns=['wow'])
# print(pd_plus_one(hell.wow))

In [56]:
# mapInpandas allows users to directily use the APIs in a pd DataFrame
# without any restrictions such as the result length
def pd_filter_func(iterator):
    for pd_df in iterator:
        yield pd_df[pd_df.a==1]

df_row.mapInPandas(pd_filter_func, schema=df_row.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



### Grouping Data
PySpark DataFrame은 split-apply-combine 전략을 사용하여 그룹 데이터를 다루는 방법을 제시한다.

In [4]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2']
)
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [5]:
# Group and apply the `avg()` ftn to the result groups.
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+



In [13]:
# Apply a python native ftn against each group by using pd API.
def plus_mean(pd_df):
    return pd_df.assign(v1=pd_df.v1-pd_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
+-----+------+---+---+



***pandas.DataFrame.assign()***   
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.assign.html
```dddd = pd.DataFrame([['red', 'banana', 1, 10], ['blue', 'banana', 2, 20]])
dddd.assign(v3=dddd[2]-dddd[2].mean())```

***GroupedData.applyInPandas(ftn, schema)***    
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html

In [32]:
# Co-grouping ans applying a ftn.
df1 = spark.createDataFrame([(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))
df2 = spark.createDataFrame([(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(asof_join,
                                                          schema='time int, id int, v1 double, v2 string').show()


+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



In [39]:
def asof_join2(l, r):
    return pd.merge_asof(l, r, on='id', by='time')

df1.groupby('time').cogroup(df2.groupby('time')).applyInPandas(asof_join2,
                                                          schema='time int, id int, v1 double, v2 string').show()


+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000102|  1|3.0|null|
|20000102|  2|4.0|null|
|20000101|  1|1.0|   x|
|20000101|  2|2.0|   y|
+--------+---+---+----+



In [42]:
df1.show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+



In [43]:
df2.show()

+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+



### Getting Data in/out
CSV는 간단하고 쓰기 쉽다. Parquet와 ORC는 더 빠르게 읽고 쓰기 위한 효율적이고 컴팩트한 파일 포맷이다.   
PySpark에는 JDBC, text, binaryFile, Avro 등의 데이터 소스가 있다.

In [45]:
# csv
df.write.csv('foo.csv', header = True)
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  1| 10|
| blue| grape|  4| 40|
|  red|banana|  7| 70|
|  red|carrot|  3| 30|
|  red|carrot|  5| 50|
|  red| grape|  8| 80|
+-----+------+---+---+



In [47]:
# Parquet
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

22/05/31 12:36:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/05/31 12:36:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
22/05/31 12:36:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
| blue| grape|  4| 40|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
|  red|carrot|  5| 50|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [48]:
# ORC
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|carrot|  5| 50|
|  red|banana|  1| 10|
| blue| grape|  4| 40|
|  red|carrot|  3| 30|
+-----+------+---+---+



# Working with SQL
DataFrame과 Spark SQL은 같은 실행 엔진을 공유하기에 문제없이 교체될 수 있다.

In [50]:
# Register the DataFrame as a table and run a SQL
df.createOrReplaceTempView('tableA')
spark.sql("select count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [52]:
# UDFs can be registered and invoked in SQL out of the box
from pyspark.sql.functions import pandas_udf

@pandas_udf("integer")
def add_one(s:pd.Series)->pd.Series:
    return s+1

spark.udf.register('add_one', add_one)
spark.sql('select add_one(v1) from tableA').show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



In [53]:
# SQL expressions can directly be mixed and used as PySpark columns
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)')>0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+

