In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np
import pandas as pd

MAX_MEMORY="7g"
spark = SparkSession.builder.appName("lotte")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

df=pd.read_csv("cluster.csv",encoding='cp949')

In [2]:
df.head()

Unnamed: 0.1,Unnamed: 0,고객ID,거주지,cluster,A_상반기_가구/인테리어_변화,B_상반기_가구/인테리어_변화,A_상반기_디지털/가전_변화,B_상반기_디지털/가전_변화,A_상반기_생활/건강_변화,B_상반기_생활/건강_변화,...,B_하반기_스포츠/레저_변화,A_하반기_식품_변화,B_하반기_식품_변화,A_하반기_패션의류_변화,B_하반기_패션의류_변화,A_하반기_패션잡화_변화,B_하반기_패션잡화_변화,A_하반기_화장품/미용_변화,B_하반기_화장품/미용_변화,구매감소
0,0,4,서울시,6,-100.0,-100.0,0.0,0.0,315.398707,95.736677,...,0.0,38.165866,-33.128021,-36.252868,229.12844,-6.124604,9.197635,-97.745811,0.0,True
1,1,6,강원도,6,458.833333,0.0,0.0,0.0,-68.156313,-67.096377,...,0.0,-66.721886,15.417325,5.471029,-16.732026,353.0,-25.0,-80.937404,-57.507673,False
2,2,7,서울시,6,-7.225352,0.0,0.0,0.0,1220.0,175.584462,...,0.0,12.152325,-99.622139,-33.755887,0.0,-89.187595,-100.0,934.444444,0.0,False
3,3,9,경기도,6,0.0,0.0,310.762724,0.0,392.873684,0.0,...,0.0,149.433975,0.0,-53.825702,0.0,54.685786,0.0,370.517802,0.0,False
4,4,10,서울시,6,0.0,0.0,0.0,0.0,-100.0,0.0,...,0.0,-62.386286,-55.704908,754.358974,0.0,162.5,0.0,-71.832669,375.86,True


In [5]:
df=df.drop(['Unnamed: 0','고객ID'],axis=1)

In [6]:
df_1=df.astype({"구매감소":'str'})
df_1['구매감소']=df_1['구매감소'].map({
    "True":"1",
    "False":"0"},na_action=None)

In [8]:
df=df_1.astype({"구매감소":'float'})

In [9]:
sdf=spark.createDataFrame(df)

In [10]:
train_df, test_df=sdf.randomSplit([0.8,0.2],seed=1)
print(train_df.count())
print(test_df.count())

12656
3242


In [12]:
toy_df=train_df.sample(False,.1,seed=261)

In [13]:
data_dir="/home/jovyan/work"
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")
toy_df.write.format("parquet").save(f"{data_dir}/toy/")

In [14]:
data_dir="/home/jovyan/work"
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
toy_df = spark.read.parquet(f"{data_dir}/toy/")

In [19]:
#one-hot Encoding

#카테고리형 피쳐들
cat_feats = [
    "거주지",
    "cluster",
    "구매감소"
]

#파이프라인 스테이지
stages = []

#카테고리 피쳐 프리프로세싱
for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [20]:
#피처 정규화

#Numerical Features
num_feats = [
    'A_상반기_가구/인테리어_변화','B_상반기_가구/인테리어_변화', 'A_상반기_디지털/가전_변화', 'B_상반기_디지털/가전_변화',
    'A_상반기_생활/건강_변화', 'B_상반기_생활/건강_변화', 'A_상반기_스포츠/레저_변화',
    'B_상반기_스포츠/레저_변화', 'A_상반기_식품_변화', 'B_상반기_식품_변화', 'A_상반기_패션의류_변화',
    'B_상반기_패션의류_변화', 'A_상반기_패션잡화_변화', 'B_상반기_패션잡화_변화', 'A_상반기_화장품/미용_변화',
    'B_상반기_화장품/미용_변화', 'A_하반기_가구/인테리어_변화', 'B_하반기_가구/인테리어_변화',
    'A_하반기_디지털/가전_변화', 'B_하반기_디지털/가전_변화', 'A_하반기_생활/건강_변화',
    'B_하반기_생활/건강_변화', 'A_하반기_스포츠/레저_변화', 'B_하반기_스포츠/레저_변화', 'A_하반기_식품_변화',
    'B_하반기_식품_변화', 'A_하반기_패션의류_변화', 'B_하반기_패션의류_변화', 'A_하반기_패션잡화_변화',
    'B_하반기_패션잡화_변화', 'A_하반기_화장품/미용_변화', 'B_하반기_화장품/미용_변화'
]

#Vector assembler
for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

In [21]:
#categorical + Numeric features

assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

In [22]:
from pyspark.ml import Pipeline

transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

In [23]:
vtrain_df = fitted_transformer.transform(train_df)

In [24]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, 
                      regParam=0.1,
                      elasticNetParam=0.8,
                      solver="normal", 
                      labelCol="구매감소",
                      featuresCol="feature_vector")

In [25]:
model = lr.fit(vtrain_df)


In [26]:
vtest_df = fitted_transformer.transform(test_df)


In [27]:
predictions = model.transform(vtest_df)


In [28]:
predictions.cache()


DataFrame[거주지: string, cluster: bigint, A_상반기_가구/인테리어_변화: double, B_상반기_가구/인테리어_변화: double, A_상반기_디지털/가전_변화: double, B_상반기_디지털/가전_변화: double, A_상반기_생활/건강_변화: double, B_상반기_생활/건강_변화: double, A_상반기_스포츠/레저_변화: double, B_상반기_스포츠/레저_변화: double, A_상반기_식품_변화: double, B_상반기_식품_변화: double, A_상반기_패션의류_변화: double, B_상반기_패션의류_변화: double, A_상반기_패션잡화_변화: double, B_상반기_패션잡화_변화: double, A_상반기_화장품/미용_변화: double, B_상반기_화장품/미용_변화: double, A_하반기_가구/인테리어_변화: double, B_하반기_가구/인테리어_변화: double, A_하반기_디지털/가전_변화: double, B_하반기_디지털/가전_변화: double, A_하반기_생활/건강_변화: double, B_하반기_생활/건강_변화: double, A_하반기_스포츠/레저_변화: double, B_하반기_스포츠/레저_변화: double, A_하반기_식품_변화: double, B_하반기_식품_변화: double, A_하반기_패션의류_변화: double, B_하반기_패션의류_변화: double, A_하반기_패션잡화_변화: double, B_하반기_패션잡화_변화: double, A_하반기_화장품/미용_변화: double, B_하반기_화장품/미용_변화: double, 구매감소: double, 거주지_idx: double, 거주지_onehot: vector, cluster_idx: double, cluster_onehot: vector, 구매감소_idx: double, 구매감소_onehot: vector, A_상반기_가구/인테리어_변화_vecotr: vector, A_상반기_가구/인테리어_변화_scaled:

In [29]:
model.summary.rootMeanSquaredError


0.08822398273236283

In [30]:
model.summary.r2


0.9684577731532277