In [8]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

#  SparkSession 생성
spark = SparkSession.builder \
    .appName("adam") \
    .getOrCreate()

# 데이터를 원격 URL에서 다운로드하기 위해 addFile()을 사용하여 데이터 파일을 Spark 클러스터 노드로 복사
url = "https://raw.githubusercontent.com/pkmklong/Breast-Cancer-Wisconsin-Diagnostic-DataSet/master/data.csv"
spark.sparkContext.addFile(url)

# SparkFiles.get()를 사용하여 복사된 데이터 파일을 읽고, DataFrame으로 로드 
df = spark.read.csv(SparkFiles.get("data.csv"), header=True, inferSchema=True)
df.show(2)
df.printSchema()

# DataFrame의 컬럼명 변경
#id' 및 'diagnosis' 컬럼 외 'feature_1', 'feature_2', ... 형식으로 변경
columns = ['id', 'diagnosis'] + [f'feature_{i}' for i in range(1, 32)]
data = df.toDF(*columns)

# 'diagnosis' 컬럼의 'M'을 1로, 'B' (양성)을 0으로 정수 매핑 후, 'diagnosis' 컬럼은 제거
data = data.withColumn("label", (data["diagnosis"] == "M").cast("integer")).drop("diagnosis")

# 'feature_1'부터 'feature_24'까지의 컬럼만 독립변수로 선택하여 이를 하나의 피처 벡터 컬럼으로 변환 후 'features' 컬럼에 저장
feature_columns = [f'feature_{i}' for i in range(1, 25)]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# 학습, 테스트셋 분리
train_data, test_data = data.randomSplit([0.8, 0.2], seed=20230921)

+------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|    id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+------+---------+-----------+------------

In [9]:
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
print(dt.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: label)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for discreti

In [10]:
model = dt.fit(train_data)

In [11]:
# prediction 진행
predictions = model.transform(test_data)

# AUC-ROC는 BinaryClassificationEvaluator 에서 측정 가능
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
auc = evaluator.evaluate(predictions)

# Accuracy, Precision, and Recall는 MulticlassClassificationEvaluator에서 측정 가능
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

print(f"AUC-ROC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

AUC-ROC: 0.8834
Accuracy: 0.9344
Precision: 0.9376
Recall: 0.9344
