### SparkContext에서 RDD의 읽기/쓰기
- Read
    - sc.textfile("경로")
- Save
    - Rdd이름.saveAsTextFile("경로")

### SparkSession에서 DataFrame의 읽기/쓰기

- Read
    - spark.read.format("문서형식").option("옵션들").load("경로")
- Save
    - df.write.format("문서형식").save("경로")
        - 파일이름을 지정하지 않음 - 디렉토리만 지정

- Format
    - text: text 파일
        - 컬럼이 1개인 Dataframe
        - 컬럼의 데이터 타입이 StringType()
    - csv
    - json
    - parquet
- Mode
    - Overwrite: 디렉토리의 기존 파일들을 치우고 다시 씀
    - Append: 디렉토리에 파일들을 추가
    - 지정하지 않을 경우 새로운 디렉토리를 만듬
        - 기존 디렉토리가 있으면 오류 발생

### DataFrame 읽어들이기

#### Schema를 직접 정의한 후, 사용

    import org.apache.spark.sql.types

    mySchema = StructType(Array( StructField("menuId",StringType, false),
                                 StructField("name", StringType, true),
                                 StructField("price", IntegerType, true),
                                 StructField("kcal",IntegerType, true))))

    dessertDF3 = spark.read \
                      .format("csv") \
                      .schema(mySchema) \
                      .load("file:///C:/SparkStreaming/dessert-menu.txt")

In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark

In [4]:
import pyspark.sql

In [5]:
import pyspark.sql.functions as F

In [7]:
spark = pyspark.sql.SparkSession \
.builder \
.master("local") \
.getOrCreate()

In [8]:
dessertMenuDF = spark.read \
.option("inferSchema", "true") \
.option("header", "true") \
.format("csv") \
.load("C://spark/data/dessert-menu.csv")

In [9]:
dessertMenuDF.show()

+------+---------------+-----+----+
|menuId|           name|price|kcal|
+------+---------------+-----+----+
|   D-0|  초콜릿 파르페| 4900| 420|
|   D-1|    푸딩 파르페| 5300| 380|
|   D-2|    딸기 파르페| 5200| 320|
|   D-3|       판나코타| 4200| 283|
|   D-4|      치즈 무스| 5800| 288|
|   D-5|       아포가토| 3000| 248|
|   D-6|       티라미스| 6000| 251|
|   D-7|    녹차 파르페| 4500| 380|
|   D-8|  바닐라 젤라또| 3600| 131|
|   D-9|  카라멜 팬케익| 3900| 440|
|  D-10|    크림 안미츠| 5000| 250|
|  D-11|  고구마 파르페| 6500| 650|
|  D-12|      녹차 빙수| 3800| 320|
|  D-13|  초코 크레이프| 3700| 300|
|  D-14|바나나 크레이프| 3300| 220|
|  D-15|  커스터드 푸딩| 2000| 120|
|  D-16|    초코 토르테| 3300| 220|
|  D-17|    치즈 수플레| 2200| 160|
|  D-18|    호박 타르트| 3400| 230|
|  D-19|      캬라멜 롤| 3700| 230|
+------+---------------+-----+----+
only showing top 20 rows



In [15]:
dessertOrderDF = spark.read \
.option("inferSchema", "true") \
.option("header", "False") \
.format("csv") \
.load("C://spark/data/dessert-order.csv")

In [16]:
dessertOrderDF.show()

+-----+----+---+
|  _c0| _c1|_c2|
+-----+----+---+
|SID-0| D-0|  2|
|SID-0| D-3|  1|
|SID-1|D-10|  4|
|SID-2| D-5|  1|
|SID-2| D-8|  1|
|SID-2|D-20|  1|
+-----+----+---+



In [17]:
dessertOrderDF = dessertOrderDF.withColumnRenamed("_c0","orderId").withColumnRenamed("_c1","menuId").withColumnRenamed("_c2","number")

In [18]:
# 1. 주문별 금액

dessertOrderDF.join(dessertMenuDF, dessertOrderDF.menuId == dessertMenuDF.menuId) \
.groupBy(dessertOrderDF.orderId) \
.agg(F.sum(dessertOrderDF.number * dessertMenuDF.price).alias('orderTotal')) \
.show()

+-------+----------+
|orderId|orderTotal|
+-------+----------+
|  SID-0|     14000|
|  SID-1|     20000|
|  SID-2|     10600|
+-------+----------+



In [20]:
orderTotalDF = dessertOrderDF.join(dessertMenuDF, dessertOrderDF.menuId == dessertMenuDF.menuId) \
.groupBy(dessertOrderDF.orderId) \
.agg(F.sum(dessertOrderDF.number * dessertMenuDF.price).alias('orderTotal'))

#### Dataframe write
- 만들어지는 파일의 수
    - 여러 개가 만들어짐
    - 파일의 개수를 지정 : coalesce(n)

In [22]:
#orderTotalDF.write.format("csv").mode("overwrite").save("C://spark/data/df")

In [None]:
# orderTotalDF.write.format("json").mode("append").save("C://spark/data/df")

In [28]:
orderTotalDF.coalesce(1).write.format("json").mode("overwrite").save("C://spark/data/df")

In [29]:
orderTotalDFread = spark.read.format("json").load("C://spark/data/df")

In [30]:
orderTotalDFread.show()

+-------+----------+
|orderId|orderTotal|
+-------+----------+
|  SID-0|     14000|
|  SID-1|     20000|
|  SID-2|     10600|
+-------+----------+



In [21]:
# 총판매액

dessertOrderDF.join(dessertMenuDF, dessertOrderDF.menuId == dessertMenuDF.menuId ) \
.agg(F.sum(dessertOrderDF.number * dessertMenuDF.price).alias('orderTotal')) \
.show()

+----------+
|orderTotal|
+----------+
|     44600|
+----------+



### UDF(User Defined Function)

- Dataframe의 데이터 변환에서, 사용자가 만든 함수를 사용
    - Dataframe의 자체 함수들(methods)들에서 udf 사용
        - from pyspark.sql.functions import udf
        - udf이름 = udf[method이름]
        
- SparkSession의 sql에서 udf 사용
    - spark.udf.register("udf이름","method이름")

In [32]:
orderTotalDF.show()

+-------+----------+
|orderId|orderTotal|
+-------+----------+
|  SID-0|     14000|
|  SID-1|     20000|
|  SID-2|     10600|
+-------+----------+



In [48]:
def orderLevel(sum):
    if (sum >= 30000):
        level = 3
    elif (sum >= 20000):
        level = 2
    else:
        level = 1
    return(level)

In [49]:
orderLevel(10000)

1

In [50]:
orderLevel_udf = F.udf(orderLevel)

In [51]:
orderTotalDF.select(F.col("orderId"), F.col("orderTotal"), orderLevel_udf("orderTotal").alias("orderLevel")).show()

+-------+----------+----------+
|orderId|orderTotal|orderLevel|
+-------+----------+----------+
|  SID-0|     14000|         1|
|  SID-1|     20000|         2|
|  SID-2|     10600|         1|
+-------+----------+----------+



In [52]:
orderTotalDF.select(F.col("orderId"), F.col("orderTotal"), orderLevel_udf("orderTotal").alias("orderLevel")) \
.groupBy(F.col("orderLevel")) \
.agg(F.count(F.col("orderId"))).show()

+----------+--------------+
|orderLevel|count(orderId)|
+----------+--------------+
|         1|             2|
|         2|             1|
+----------+--------------+



In [53]:
spark.udf.register("orderSumLevel", orderLevel)

<function __main__.orderLevel(sum)>

In [54]:
orderTotalDF.createOrReplaceTempView("OrderTotal")

In [55]:
spark.sql(""" SELECT orderId, orderTotal
              FROM OrderTotal""").show()

+-------+----------+
|orderId|orderTotal|
+-------+----------+
|  SID-0|     14000|
|  SID-1|     20000|
|  SID-2|     10600|
+-------+----------+



In [56]:
spark.sql(""" SELECT orderId, orderTotal, orderSumLevel(orderTotal)
              FROM OrderTotal""").show()

+-------+----------+-------------------------+
|orderId|orderTotal|orderSumLevel(orderTotal)|
+-------+----------+-------------------------+
|  SID-0|     14000|                        1|
|  SID-1|     20000|                        2|
|  SID-2|     10600|                        1|
+-------+----------+-------------------------+



### 스파크 변환 함수

In [57]:
sc = spark.sparkContext

In [58]:
inputData = sc.textFile("C://spark/README.md")

In [59]:
inputData.count()

108

In [62]:
sampledInput = inputData.sample(False, 0.1, seed=0)

In [63]:
sampledInput.count()

7

In [64]:
sampledInput.collect()

['Spark is built using [Apache Maven](https://maven.apache.org/).',
 '(You do not need to do this if you downloaded a pre-built package.)',
 '',
 '',
 'There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md',
 'Please refer to the [Configuration Guide](https://spark.apache.org/docs/latest/configuration.html)',
 'Please review the [Contribution to Spark guide](https://spark.apache.org/contributing.html)']

In [65]:
inputData.takeSample(False, 10, seed=0)

['',
 'Spark also comes with several sample programs in the `examples` directory.',
 '',
 '',
 'Spark is built using [Apache Maven](https://maven.apache.org/).',
 '',
 '["Building Spark"](https://spark.apache.org/docs/latest/building-spark.html).',
 '',
 'There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md',
 'Many of the example programs print usage help if no params are given.']

In [None]:
# sampledInput.saveAsTextFile("C://spark/data/dataSample")

### RDD transformations

- union(otherDataset)
    - 합집합
    - 그러나 실제로는 합집합이 아님.
- intersection(otherDataset)
    - 교집합
- subtract(otherDataset)
    - 차집합

In [67]:
allRDD = sc.parallelize(range(0,10))

In [68]:
allRDD.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [69]:
evenRDD = sc.parallelize([0, 2, 4, 6, 8])

In [70]:
oddRDD = sc.parallelize([1, 3, 5, 7, 8])

In [73]:
allRDD.union(evenRDD).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 2, 4, 6, 8]

In [74]:
allRDD.union(evenRDD).distinct().collect()

[0, 2, 4, 6, 8, 1, 3, 5, 7, 9]

#### reduce(func)
- 데이터 값을 반환
- Transformation이 아니라 action임

In [75]:
allRDD.reduce(lambda x, y: x+y)

45

#### 숫자 RDD의 Actions
- max()
- min()
- sum()
- variance()
- stdev()
- stats()

In [76]:
allRDD.stats()

(count: 10, mean: 4.5, stdev: 2.8722813232690143, max: 9.0, min: 0.0)

### Pair RDD transformations
- ReduceByKey
- SortByKey
- Keys
    - 키 만으로 구성된 RDD를 생성
- values
    - Value 들만으로 구성된 RDD를 생성
- keyBy
    - 임의의 컬럼을 key로 지정
- groupByKey()
    - It takes key-value pair (K,V) as an input
      produces RDD with key and list of values
    - reduceByKey를 사용할 수 있을 경우
      groupByKey는 가능하면 사용하지 않는 것이 좋음.

In [80]:
lines = sc.textFile("C://spark/README.md")

In [87]:
import re

words = lines.flatMap(lambda line: re.split('\\W+', line)) \
             .filter(lambda word: len(word) > 0) \
             .map(lambda word: word.lower())

In [88]:
words.take(3)

['apache', 'spark', 'spark']

In [89]:
words.countByValue()

defaultdict(int,
            {'apache': 11,
             'spark': 38,
             'is': 7,
             'a': 10,
             'unified': 1,
             'analytics': 1,
             'engine': 2,
             'for': 15,
             'large': 1,
             'scale': 1,
             'data': 2,
             'processing': 3,
             'it': 2,
             'provides': 1,
             'high': 1,
             'level': 2,
             'apis': 1,
             'in': 5,
             'scala': 4,
             'java': 1,
             'python': 4,
             'and': 11,
             'r': 1,
             'an': 4,
             'optimized': 1,
             'that': 2,
             'supports': 2,
             'general': 2,
             'computation': 1,
             'graphs': 1,
             'analysis': 1,
             'also': 5,
             'rich': 1,
             'set': 2,
             'of': 5,
             'higher': 1,
             'tools': 4,
             'including': 4,
             'sql': 2,


In [91]:
wordAndOne = words.map(lambda word: (word,1))

In [92]:
wordAndOne.countByKey()

defaultdict(int,
            {'apache': 11,
             'spark': 38,
             'is': 7,
             'a': 10,
             'unified': 1,
             'analytics': 1,
             'engine': 2,
             'for': 15,
             'large': 1,
             'scale': 1,
             'data': 2,
             'processing': 3,
             'it': 2,
             'provides': 1,
             'high': 1,
             'level': 2,
             'apis': 1,
             'in': 5,
             'scala': 4,
             'java': 1,
             'python': 4,
             'and': 11,
             'r': 1,
             'an': 4,
             'optimized': 1,
             'that': 2,
             'supports': 2,
             'general': 2,
             'computation': 1,
             'graphs': 1,
             'analysis': 1,
             'also': 5,
             'rich': 1,
             'set': 2,
             'of': 5,
             'higher': 1,
             'tools': 4,
             'including': 4,
             'sql': 2,


In [93]:
wordCount = wordAndOne.reduceByKey(lambda v1, v2: v1 + v2)

In [94]:
wordCount.take(5)

[('apache', 11), ('spark', 38), ('is', 7), ('a', 10), ('unified', 1)]

In [95]:
wordCount.mapValues(lambda v: v+v).take(5)

[('apache', 22), ('spark', 76), ('is', 14), ('a', 20), ('unified', 2)]