In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("ml-pipeline").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/03 09:31:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 파이프라인 구축
- pipeline은 여러 개의 개별적인 Transformations 작업, Estimator의 학습 작업을 일련의 프로세스로 연결하여 간단한 API 처리로 구현할 수 있게 해 준다.
- `stage` 라는 단위로 여러 개의 작업을 하나로 뭉쳐서 관리 할 수 있다.

In [2]:
iris_filepath = "/home/ubuntu/working/spark-examples/data/iris.csv"

iris_sdf = spark.read.csv(f"file://{iris_filepath}", inferSchema=True, header=True)
iris_sdf.show(5)

                                                                                

+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width|target|
+------------+-----------+------------+-----------+------+
|         5.1|        3.5|         1.4|        0.2|     0|
|         4.9|        3.0|         1.4|        0.2|     0|
|         4.7|        3.2|         1.3|        0.2|     0|
|         4.6|        3.1|         1.5|        0.2|     0|
|         5.0|        3.6|         1.4|        0.2|     0|
+------------+-----------+------------+-----------+------+
only showing top 5 rows



In [3]:
train_sdf, test_sdf = iris_sdf.randomSplit([0.8, 0.2], seed=42)

In [4]:
# 훈련 데이터는 캐싱
train_sdf.cache()

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, target: int]

In [5]:
iris_columns = ["sepal_length", "sepal_width", "petal_length", "petal_width"]

Pipeline은 개별 변환 및 모델 학습 작업을 각각의 stage로 정의해서 파이프라인에 순서대로 등록
- pipeline.fit() 메소드를 활용하면 순서대로 연결된 스테이지 작업을 일괄적으로 수행
- pipeline.fit() 메소드의 결과는 PipelineModel로 반환이 된다.
- PipelineModel에서 예측 작업을 할 때는 transform()으로 수행

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler # Transformer
from pyspark.ml.classification import DecisionTreeClassifier # Estimator

In [7]:
# 첫 번째 stage는 Feature Vectorization을 위한 VectorAssembler
stage_1 = VectorAssembler(inputCols=iris_columns, outputCol="features")

# 두 번째 stage는 학습을 위한 모델 생성
stage_2 = DecisionTreeClassifier(featuresCol='features', labelCol="target", maxDepth=3)

In [8]:
# 리스트를 활용해 stage를 순서대로 배치
stages = [stage_1, stage_2]
stages

[VectorAssembler_1d4d4474ac74, DecisionTreeClassifier_8be3972cdb82]

In [9]:
# 파이프라인 생성 및 등록
pipeline = Pipeline(stages=stages)
type(pipeline)

pyspark.ml.pipeline.Pipeline

In [10]:
pipeline_model = pipeline.fit(train_sdf)
type(pipeline_model)

pyspark.ml.pipeline.PipelineModel

In [11]:
# 파이프라인을 통해서 테스트 세트 예측
predictions = pipeline_model.transform(test_sdf)
predictions.show(5)

+------------+-----------+------------+-----------+------+-----------------+--------------+-------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|target|         features| rawPrediction|  probability|prediction|
+------------+-----------+------------+-----------+------+-----------------+--------------+-------------+----------+
|         4.4|        3.0|         1.3|        0.2|     0|[4.4,3.0,1.3,0.2]|[39.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|         4.6|        3.2|         1.4|        0.2|     0|[4.6,3.2,1.4,0.2]|[39.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|         4.6|        3.6|         1.0|        0.2|     0|[4.6,3.6,1.0,0.2]|[39.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|         4.8|        3.1|         1.6|        0.2|     0|[4.8,3.1,1.6,0.2]|[39.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|         4.9|        3.1|         1.5|        0.1|     0|[4.9,3.1,1.5,0.1]|[39.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
+------------+-----------+------------+-----------+------+------

In [12]:
spark.stop()