## 라이브러리 불러오기
- 버전 확인하자!

In [3]:
import pyspark 
import numpy as np 
import pandas as pd 
import matplotlib as mpl 
import seaborn as sns

print(pyspark.__version__)
print(np.__version__)
print(pd.__version__)
print(mpl.__version__)
print(sns.__version__)

3.2.0
1.23.4
1.5.1
3.6.2
0.12.1


## 환경설정 필요
- 참조 : https://spark.apache.org/docs/latest/configuration.html
- 배워야 할 내용, 네트워크 구성부터 시작, 서버 설정 등등, 욕나옴

In [4]:
sc

In [None]:
sc.stop()

## 현재 서버 사용자 이름 확인

In [6]:
import getpass
username = getpass.getuser()
username

'human'

## SparkSession
- 클러스터에 진입 위한 것

In [7]:
print("spark version:", sc.version)

spark version: 3.2.0


In [8]:
print("python version:", sc.pythonVer)

python version: 3.8


In [10]:
print("Spark Master Name:", sc.master)

Spark Master Name: local[*]


## lambda 함수
- 사용자 정의 함수를 1회성으로 쓴다.

In [35]:
def sum(a, b):
  c = a + b 
  return c

a = 1
b = 2
sum(a, b)

3

In [36]:
# 어떤 숫자가 들어와도 반드시 5를 더하는 함수
def add_five(x):
  y = x + 5
  return y

add_five(100)

105

In [37]:
plus_five = lambda x : x + 5
plus_five(10)

15

In [39]:
# 데이터.map(람다함수 식)
# 람다 표현식 자체를 호출하기
(lambda x: x + 5)(100)

105

- 매개변수가 2개일 때 

In [40]:
# 데이터.map(람다함수 식)
# 람다 표현식 자체를 호출하기
(lambda x: x + 5)(100)

105

### 문제 1
- 값이 여러개 있는 리스트를 정의 
- 각 값에 5를 더하고, 리스트로 반환

In [41]:
num_values = [10, 11, 12]
list(map(add_five, num_values))

[15, 16, 17]

- lambda를 활용

In [42]:
list(map(lambda x: x + 5, range(5)))

[5, 6, 7, 8, 9]

- filter() : 조건식으로 통해 값을 필터링 
- 0-9 값 중에서, 5 미만만 출력

In [43]:
list(range(10))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [44]:
list(filter(lambda x: x < 5, range(10)))

[0, 1, 2, 3, 4]

In [45]:
list(map(lambda x : x + 105, range(5)))

[105, 106, 107, 108, 109]

## RDD
- RDD를 배우자! 
- 새로운 Spark 세션을 만들자. 

In [16]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("my1stSpark").getOrCreate()
spark

- RDD 객체를 생성하자.
- 도움말 참조 : https://spark.apache.org/docs/latest/rdd-programming-guide.html

In [18]:
data = [1, 2, 3, 4, 5]
data

[1, 2, 3, 4, 5]

In [20]:
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [26]:
num_values = range(1, 101)
rdd = spark.sparkContext.parallelize(num_values)
print(type(rdd))
print(rdd)

<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[8] at RDD at PythonRDD.scala:53


In [25]:
np_num_values = np.array([1, 2, 3])
rdd = spark.sparkContext.parallelize(np_num_values)
print(type(rdd))
print(rdd)

<class 'pyspark.rdd.RDD'>
ParallelCollectionRDD[6] at readRDDFromFile at PythonRDD.scala:274


In [27]:
str_values = ["A", "B", "C"]
rdd = spark.sparkContext.parallelize(str_values)
print(type(rdd))

<class 'pyspark.rdd.RDD'>


## 데이터 가져오기
- README.md 파일을 가져오기

In [31]:
file_path = 'data/README.md'
fileRDD = spark.sparkContext.textFile(file_path)
print("The File Type is", type(fileRDD))

# Spark Transformation
# filter Spark 글자만 출력한다.
fileRDD_filter = fileRDD.filter(lambda line : 'Spark' in line)
print("The File Type is", type(fileRDD_filter))
print(fileRDD_filter)

# Spark Action
print(fileRDD_filter.count())
for line in fileRDD_filter.take(4):
    print(line)

The File Type is <class 'pyspark.rdd.RDD'>
The File Type is <class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[21] at RDD at PythonRDD.scala:53
19
# Apache Spark
Spark is a unified analytics engine for large-scale data processing. It provides
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
[![PySpark Coverage](https://codecov.io/gh/apache/spark/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/spark)


In [34]:
num_values = range(10)

# RDD 생성 
num_values = spark.sparkContext.parallelize(num_values)

# RDD 트랜스포메이션 생성
cubic_values = num_values.map(lambda x : x ** 3)

# RDD 액션
for num in cubic_values.collect():
    print(num)

0
1
8
27
64
125
216
343
512
729
