In [1]:
# [+] SparkSession 설정
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('trip_data').getOrCreate()

## MLlib 예제: Logistic Regression
+ ```pyspark.ml.linalg```: 선형대수(linear algebra) 관련 패키지
    + ```Vectors```: 벡터 데이터 타입
+ ```pyspark.ml.classification```: 분류 모델 관련 패키지
    + ```LogisticRegression```: 가장 기본적인 분류 모델

In [2]:
# Vectors, LogisticRegression 임포트
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

In [14]:
"""
    데이터 정의: 레이블과 특징
    - dense(): 모든 값을 저장하는 벡터 생성
    - sparse(): 0이 아닌 값만 저장하는 벡터 생성
"""

train_data = [
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5])),
]

In [15]:
# [+] 스키마 정의
schema = ['table','features']

In [16]:
# [+] DataFrame 생성
train_df = spark.createDataFrame(train_data,schema=schema)

In [17]:
# [+] DataFrame 출력
train_df.show()

+-----+--------------+
|table|      features|
+-----+--------------+
|  1.0| [0.0,1.1,0.1]|
|  0.0|[2.0,1.0,-1.0]|
|  0.0| [2.0,1.3,1.0]|
|  1.0|[0.0,1.2,-0.5]|
+-----+--------------+



In [18]:
"""
    LogisticRegression (Estimator) 생성
    - maxIter: 최대 학습 횟수 매개변수
    - regParam: 정규화 매개변수
"""

lr = LogisticRegression(maxIter=30, regParam=0.01)

In [20]:
# 모델 학습
model = lr.fit(train_df)

IllegalArgumentException: label does not exist. Available: table, features

In [None]:
model

In [21]:
# 모델 테스트
test_data = [
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5])),
]

In [None]:
# [+] DataFrame 생성
test_df =

In [None]:
# 모델 예측
predictions = 

In [None]:
# 예측결과 출력
predictions.show()

## LogisticRegression 파이프라인 구현하기
+ 예측 문제: Spark에 대한 텍스트 인지 아닌지를 분류
+ ```Pipeline```: 머신러닝 과정에 대한 파이프라인 정의
+ ```HashingTF```: 컬럼을 용어빈도(term-frequency) 컬럼으로 변환하는 Transformer
+ ```Tokenizer```: 컬럼의 텍스트를 여러 개의 단어로 분할하는 Transformer

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer

In [None]:
# 훈련 데이터에 대한 DataFrame 생성
train_df = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

In [None]:
"""
    머신러닝 파이프라인 과정: Tokenizer -> HashingTF -> LinearRegression
"""

# Tokenizer 객체 생성
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [None]:
# HashingTF 객체 생성
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

In [None]:
"""
    [+] LogisticRegression 객체 생성
    - maxIter = 10
    - regParam = 0.001
"""

lr = 

In [None]:
# [+] Pipeline 객체 생성
pipeline = 

In [None]:
# [+] Pipeline 실행 -> 모델 생성
model = 

In [None]:
# 테스트 데이터에 대한 DataFrame 생성
test_df = ss.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

In [None]:
# [+] 테스트 데이터에 대한 예측
predictions = 

In [None]:
predictions.show()