<a href="https://colab.research.google.com/github/zzuupp/Data-Engineering-BigData-Practice/blob/main/Spark/Spark_Beginner_Tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark : 초보자용 튜토리얼

---

> 아이리스 데이터를 이용하여, 스파크를 어떻게 사용하는지 살펴보기.

* 라이브러리 가져오기
* 스파크 세션 구축
* 데이터 로드
* 데이터 탐색 및 준비
* 피처 엔지니어링
* 데이터 스케일링
* 데이터 분할
* 모델 구축, 훈련 및 평가

install Apache Spark

In [1]:
!pip -q install pyspark

## Import Libraries

In [2]:
import numpy as np
import pandas as pd

#Apache Spark Libraries
import pyspark
from pyspark.sql import SparkSession

# Apache Spark ML CLassifier Libraries
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, NaiveBayes

# Apache Spark Evaluation Library
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Apache Spark Feature libraries
from pyspark.ml.feature import StandardScaler, StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder

# Apache Spark 'DenseVector'
from pyspark.ml.linalg import DenseVector

# Data Split Libraries
import sklearn
from sklearn.model_selection import train_test_split

# Tabulating Data
from tabulate import tabulate

# Garbage
import gc

## Build Spark Session

In [3]:
# Building Spark Session : 스파크 앱을 사용하기 위한 진입점.
# 내가 사용할 spark의 이름 지정.
# 'spark.executor.memory' : 실행 시, 각각의 작업 프로세스 (Executor)에 할당 메모리 크기 설정.
#                           클러스터 모드에서는 여러 Executor가 뜨기 때문에 이 설정이 중요.

# "spark.excutor.cores" : Executor가 사용하는 CPU 코어 수를 설정

# .getOrCreate() : 같은 이름의 SparkSession 이 있다면 그걸 가져오고, 없으면 새로 만들어줘!

# SparkSession : 고수준 API(DataFrame SQL) 등을 다룸. *사용자가 이해하기 쉽게 추상화된 인터페이스
spark = (SparkSession.builder
                            .appName('Apach Spark Beginner Tutorial')
                            .config('spark.executor.memory', '1G')
                            .config("spark.executor.cores", '4')
                            .getOrCreate())

In [4]:
# sparkContext : 스파크 엔진과 직접 연결된 컨트롤 객체
# .setLogLevel : 로그 출력 수준을 설정하는 함수
#               ('INFO') → 로그 수준을 INFO로 맞춰라 (많은 실행 정보가 보임)
spark.sparkContext.setLogLevel('INFO')

## Data Load

In [5]:
!wget -O iris.csv https://raw.githubusercontent.com/uiuc-cse/data-fa14/gh-pages/data/iris.csv

--2025-09-29 06:56:27--  https://raw.githubusercontent.com/uiuc-cse/data-fa14/gh-pages/data/iris.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3716 (3.6K) [text/plain]
Saving to: ‘iris.csv’


2025-09-29 06:56:27 (44.5 MB/s) - ‘iris.csv’ saved [3716/3716]



In [6]:

# .option('header', 'true') : 첫줄을 컬럼명으로 사용(없다면 c0, c1...이런식임)
# .option('inferSchema', 'true') \ # 문자열이 아닌 숫자 / 실수 등을 자동으로 타입 추론.
data = (spark.read.format("csv")
        .option('header', 'true')
        .option('inferSchema', 'true')
        .load('iris.csv'))

# .cache() → 데이터를 메모리에 올려두고 이후 연산 시 재사용 (속도 ↑)
data.cache()

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]

## Data Exploration & Preparation

In [7]:
data.count()

150

In [8]:
#Data Type
data.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [9]:
#Display recodes
data.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [10]:
#Record per Species
data.groupBy('species').count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



In [11]:
# Dataset Summary Stats

# stddev : 표준편차.
#     ex : sepal_length의 평균 값은 5.84cm인데 ± 0.828 범위내에 분포

data.describe().show()

+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|     NULL|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|     NULL|
|    min|               4.3|                2.0|               1.0|               0.1|   setosa|
|    max|               7.9|                4.4|               6.9|               2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+



예측을 시작하기 위해, Species 즉 Label 컬럼은 숫자 값이어야 한다 (모델은 문자열을 싫어한다!).


이를 달성하기 위해 우리는 Species 컬럼에 문자열 인덱싱(String Indexing) 을 적용할 것이다.

In [12]:
#String Indexing the Species column
SIndexer = StringIndexer(inputCol='species', outputCol = 'species_indx')

# .fit(data) 위에서 지정 규칙을 학습만 함 (아직 변환 x)
# .transform(data) = 변환 규칙을 적용해서 새 컬럼 생성 (하지만 진짜 계산은 Action 때 실행).
# 즉 실제 계산은 이 시점에서도 안일어남, **실행 계획(Execution Plan)**을 DAG(Directed Acyclic Graph)로 저장한 것.
data = SIndexer.fit(data).transform(data)

# 비로소 실행.
data.show(5)

+------------+-----------+------------+-----------+-------+------------+
|sepal_length|sepal_width|petal_length|petal_width|species|species_indx|
+------------+-----------+------------+-----------+-------+------------+
|         5.1|        3.5|         1.4|        0.2| setosa|         0.0|
|         4.9|        3.0|         1.4|        0.2| setosa|         0.0|
|         4.7|        3.2|         1.3|        0.2| setosa|         0.0|
|         4.6|        3.1|         1.5|        0.2| setosa|         0.0|
|         5.0|        3.6|         1.4|        0.2| setosa|         0.0|
+------------+-----------+------------+-----------+-------+------------+
only showing top 5 rows



즉, **Spark**는 **스키마 기반 (SQL 테이블 느낌)**, Pandas는 딕셔너리 기반 (유연)이라 생긴 차이

## Feature Engineering

---

The Spark model needs two columns: “label” and “features” and we are not going to do much feature engineering because we want to focus on the mechanics of training the model in Spark. So, creating a seperate dataframe with re-ordered columns, then defining an input data using Dense Vector. A Dense Vector is a local vector that is backed by a double array that represents its entry values. In other words, it's used to store arrays of values for use in PySpark.



스파크에서 모델을 학습하려면 “label” 과 “features” 두 컬럼이 꼭 필요하다.

이번에는 모델 학습 과정을 익히는 데 집중할 거라 복잡한 피처 엔지니어링은 따로 하지 않는다.

그래서 컬럼 순서를 재배치하여 별도의 데이터프레임을 생성 후, 입력 데이터는 Dense Vector라는 형태로 정의.

Dense Vector는 내부적으로 double 배열을 기반으로 값을 담는 로컬 벡터인데,

쉽게 말해 PySpark에서 여러 값들을 묶어 저장하고 모델에 넣을 때 쓰이는 자료 구조라고 보면 된다

In [13]:
# creating a seperate dataframe with re-ordered columns
df = data.select('species_indx', 'sepal_length', 'sepal_width', 'petal_length', 'petal_width')

# Inspect the dataframe
df.show(5)

+------------+------------+-----------+------------+-----------+
|species_indx|sepal_length|sepal_width|petal_length|petal_width|
+------------+------------+-----------+------------+-----------+
|         0.0|         5.1|        3.5|         1.4|        0.2|
|         0.0|         4.9|        3.0|         1.4|        0.2|
|         0.0|         4.7|        3.2|         1.3|        0.2|
|         0.0|         4.6|        3.1|         1.5|        0.2|
|         0.0|         5.0|        3.6|         1.4|        0.2|
+------------+------------+-----------+------------+-----------+
only showing top 5 rows



Note: Observe that the species column which is our label (aka Target) is now at beginning of the dataframe



In [14]:
# Define the `input_data` as Dense Vector
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))


"""
***
🔎 DenseVector란?
Spark ML에서 쓰이는 특수한 벡터 자료구조.
내부적으로 double 배열 기반.
보통 features 컬럼이 DenseVector 형태여야 모델에 넣을 수 있어요


***
from pyspark.ml.linalg import DenseVector

v = DenseVector([1.0, 2.0, 3.0])
print(v)
# [1.0,2.0,3.0]
print(type(v))
# <class 'pyspark.ml.linalg.DenseVector'>


***
df.rdd로 바꾸면 DataFrame의 각 Row가 파이썬 튜플 비슷한 구조로 바뀜.
row = (0.0, 5.1, 3.5, 1.4, 0.2)  # RDD의 한 행

row[0]     # 0.0 → species_indx (label)
row[1:]    # (5.1, 3.5, 1.4, 0.2) → features

"""

"\n***\n🔎 DenseVector란?\nSpark ML에서 쓰이는 특수한 벡터 자료구조.\n내부적으로 double 배열 기반.\n보통 features 컬럼이 DenseVector 형태여야 모델에 넣을 수 있어요\n\n\n***\nfrom pyspark.ml.linalg import DenseVector\n\nv = DenseVector([1.0, 2.0, 3.0])\nprint(v)\n# [1.0,2.0,3.0]\nprint(type(v))\n# <class 'pyspark.ml.linalg.DenseVector'>\n\n\n***\ndf.rdd로 바꾸면 DataFrame의 각 Row가 파이썬 튜플 비슷한 구조로 바뀜.\nrow = (0.0, 5.1, 3.5, 1.4, 0.2)  # RDD의 한 행\n\nrow[0]     # 0.0 → species_indx (label)\nrow[1:]    # (5.1, 3.5, 1.4, 0.2) → features\n\n"

Note: Observe the definition of the Dense Vector.


So,when we create a new indexed dataframe(below) the machine understands


that the first column is a Label (Target) and the remaining columns are Features.

In [15]:
# Creating a new Indexed DataFrame
df_indx = spark.createDataFrame(input_data, ['label', 'features'])

In [16]:
df_indx.show(5)

+-----+-----------------+
|label|         features|
+-----+-----------------+
|  0.0|[5.1,3.5,1.4,0.2]|
|  0.0|[4.9,3.0,1.4,0.2]|
|  0.0|[4.7,3.2,1.3,0.2]|
|  0.0|[4.6,3.1,1.5,0.2]|
|  0.0|[5.0,3.6,1.4,0.2]|
+-----+-----------------+
only showing top 5 rows



## Data Scaling


In [17]:
# Initialize Standard Scaler
stdScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit and transform the data
scaler = stdScaler.fit(df_indx)

# Transform the dataframe
df_scaled = scaler.transform(df_indx)

In [18]:
df_scaled.show(5)

+-----+-----------------+--------------------+
|label|         features|     features_scaled|
+-----+-----------------+--------------------+
|  0.0|[5.1,3.5,1.4,0.2]|[6.15892840883878...|
|  0.0|[4.9,3.0,1.4,0.2]|[5.9174018045706,...|
|  0.0|[4.7,3.2,1.3,0.2]|[5.67587520030241...|
|  0.0|[4.6,3.1,1.5,0.2]|[5.55511189816831...|
|  0.0|[5.0,3.6,1.4,0.2]|[6.03816510670469...|
+-----+-----------------+--------------------+
only showing top 5 rows



In [19]:
# Dropping the Features column
df_scaled = df_scaled.drop('features')

In [20]:
df_scaled.show(5)

+-----+--------------------+
|label|     features_scaled|
+-----+--------------------+
|  0.0|[6.15892840883878...|
|  0.0|[5.9174018045706,...|
|  0.0|[5.67587520030241...|
|  0.0|[5.55511189816831...|
|  0.0|[6.03816510670469...|
+-----+--------------------+
only showing top 5 rows



## Data Split
---
Just like always, before building a model we shall split our scaled dataset into training & test sets.

 Training Dataset = 90% Test Dataset = 10%

In [21]:
train_data, test_data = df_scaled.randomSplit([0.9, 0.1], seed = 12345)

In [22]:
train_data.show(5)

+-----+--------------------+
|label|     features_scaled|
+-----+--------------------+
|  0.0|[5.19282199176603...|
|  0.0|[5.31358529390013...|
|  0.0|[5.31358529390013...|
|  0.0|[5.31358529390013...|
|  0.0|[5.43434859603422...|
+-----+--------------------+
only showing top 5 rows



## Build, Train & Evaluate Model
---

In this step we will create multiple models, train them on our scaled dataset and then compare their accuracy.



In [23]:
model = ['Decision Tree', 'Random Forest', 'Naive Bayes']
model_results = []

#### Decision Tree Classifier

In [24]:
# Decision Tree Classifier
dtc = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features_scaled')
dtc_model = dtc.fit(train_data)
dtc_pred = dtc_model.transform(test_data)

# Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol = 'label',
                                              predictionCol = 'prediction',
                                              metricName = 'accuracy')

dtc_acc = evaluator.evaluate(dtc_pred)

# print Decision Tree Classifier Accuracy = {:.2%}.format(dtc_acc)
model_results.extend([model[0], '{:.2%}'.format(dtc_acc)])


#### Random Forest Classifier

In [25]:
rfc = RandomForestClassifier(labelCol = 'label', featuresCol = 'features_scaled', numTrees= 10)

rfc_model = rfc.fit(train_data)

rfc_pred = rfc_model.transform(test_data)

# Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                              predictionCol='prediction',
                                              metricName= 'accuracy')

rfc_acc = evaluator.evaluate(rfc_pred)

model_results.extend([[model[1], '{:.2%}'.format(rfc_acc)]])

#### Naive Bayes Classifier


In [26]:
nbc = NaiveBayes(smoothing= 1.0,
                 modelType = 'multinomial',
                 labelCol= 'label',
                 featuresCol = 'features_scaled')

nbc_model = nbc.fit(train_data)

nbc_pred = nbc_model.transform(test_data)

# Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                              predictionCol='prediction',
                                              metricName= 'accuracy')

nbc_acc = evaluator.evaluate(nbc_pred)
model_results.extend([[model[2], '{:.2%}'.format(nbc_acc)]])

In [27]:
gc.collect()

442

In [28]:
print(tabulate([model_results], headers = ['Model', 'Accuracy']))

                       Model                         Accuracy
-------------  ------  ----------------------------  --------------------------
Decision Tree  90.91%  ['Random Forest', '100.00%']  ['Naive Bayes', '100.00%']
