## Google Drive 연동

In [14]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## JAVA 설치
- JVM 실행 위해서는 JAVA 설치 필수

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

 ## Spark 설치
 - 기존 : Web Link를 통해서 다운 후, 압축파일 불기
 - 오늘 : 구글 드라이브에서 파일 가져오기

In [3]:
%cd /content/drive/MyDrive/멀티캠퍼스/spark

/content/drive/MyDrive/멀티캠퍼스/spark


In [4]:
!pwd

/content/drive/MyDrive/멀티캠퍼스/spark


In [5]:
!ls

chapter02  chapter03  spark-3.1.1-bin-hadoop2.7.tgz  머신러닝


In [6]:
!cp -r spark-3.1.1-bin-hadoop2.7.tgz /content

In [None]:
%cd /content/

In [8]:
!pwd

/content


In [9]:
!ls

drive  sample_data  spark-3.1.1-bin-hadoop2.7.tgz


In [10]:
!tar xf spark-3.1.1-bin-hadoop2.7.tgz > /dev/null

## 환경변수 설정
- 일반적으로 vi 편집기를 열어서 작업
- 구글코랩 : os 라이브러리 사용해서 환경변수 지정

In [11]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

## PySpark 설치
- 무조건 설치 파일에 맞춰서 설치를 해준다!!

In [12]:
!pip install -q pyspark==3.1.1

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.3/212.3 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m30.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


처음 시작할 땐 sparksession을 만들어줘야한다.

In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port','4050').getOrCreate()
spark

## CSV파일 불러오기
- CSV 포맷으로 파일 읽어서 데이터프레임에 저장함
- 스키마 추론, 쉼표로 구분된 컬럼 이름이 제공되는 헤더가 있음 지정

In [15]:
df = spark.read.csv("/content/drive/MyDrive/멀티캠퍼스/spark/multifpp/fppML.csv", header=True, inferSchema=True)
df.show()  # 데이터 확인

+----+---+---+----+------+----------+-------------+-------+---------+-----------+---------+-----------+------+---------+--------+--------+-----------+-------+--------------+------------+------------+------------+--------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------------------+
|  년| 월| 일|요일|공휴일|터미널이름|가구/인테리어|   기타|도서/음반|디지털/가전|생활/건강|스포츠/레저|  식품|출산/육아|패션의류|패션잡화|화장품/미용|총 인구|20대 미만 비율|20-30대 비율|40-50대 비율|60-70대 비율|80대 이상 비율|소득2천만원주민비율|소득3천만원주민비율|소득4천만원주민비율|소득5천만원주민비율|소득6천만원주민비율|소득7천만원주민비율|소득7천만원이상주민비율|
+----+---+---+----+------+----------+-------------+-------+---------+-----------+---------+-----------+------+---------+--------+--------+-----------+-------+--------------+------------+------------+------------+--------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------------------+
|2022|  1|  1|   6|     1|   

### 머신러닝 라이브러리 불러오기

In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

### 범주형 변수 인코딩

In [None]:
# 범주형 변수 인코딩
indexer = StringIndexer(inputCol="터미널이름", outputCol="터미널이름_index")
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCols=["터미널이름_index"], outputCols=["터미널이름_encoded"])
df = encoder.fit(df).transform(df)

###  RandomForestRegressor 모델을 학습

In [23]:
# 종속 변수 리스트
dependent_variables = [
    '가구/인테리어', '기타', '도서/음반', '디지털/가전',
    '생활/건강', '스포츠/레저', '식품', '출산/육아',
    '패션의류', '패션잡화', '화장품/미용'
]

# 수치형 컬럼 선택 (StringType 및 인덱스 컬럼 제외)
numeric_columns = [field for (field, dataType) in df.dtypes if dataType != 'string' and field not in dependent_variables and not field.endswith('_index')]

# 인코딩된 범주형 컬럼을 추가
numeric_columns.append("터미널이름_encoded")

# 피처 벡터 생성
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")
df = assembler.transform(df)



# 각 종속 변수에 대한 모델 학습 및 평가
for column in dependent_variables:
    # 데이터를 학습 및 테스트 세트로 분리
    train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

    # 모델 초기화
    rf = RandomForestRegressor(featuresCol="features", labelCol=column)

    # 모델 학습
    rf_model = rf.fit(train_data)

    # 예측 수행
    predictions = rf_model.transform(test_data)

    # 평가
    evaluator = RegressionEvaluator(labelCol=column, predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) for {column}: {rmse}")



Root Mean Squared Error (RMSE) for 가구/인테리어: 214.31080884259126
Root Mean Squared Error (RMSE) for 기타: 1979.89491121395
Root Mean Squared Error (RMSE) for 도서/음반: 403.996592819785
Root Mean Squared Error (RMSE) for 디지털/가전: 749.7940492810993
Root Mean Squared Error (RMSE) for 생활/건강: 769.4653751045702
Root Mean Squared Error (RMSE) for 스포츠/레저: 141.7726405620895
Root Mean Squared Error (RMSE) for 식품: 1077.3120337539417
Root Mean Squared Error (RMSE) for 출산/육아: 122.64828294182104
Root Mean Squared Error (RMSE) for 패션의류: 1283.9520115192236
Root Mean Squared Error (RMSE) for 패션잡화: 515.021630174649
Root Mean Squared Error (RMSE) for 화장품/미용: 497.73193275650635


### 피쳐 중요도 출력

In [25]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# 범주형 변수 인코딩 및 원-핫 인코딩, 피처 벡터 생성 등 이전 단계를 수행한 후...

# 피처 컬럼 리스트 생성 (종속 변수 및 범주형 변수 인덱스 컬럼 제외)
feature_columns = [c for c in df.columns if c not in dependent_variables and c not in ['터미널이름', '터미널이름_index']]

# 각 종속 변수에 대한 모델 학습 및 평가
for column in dependent_variables:
    # 데이터를 학습 및 테스트 세트로 분리
    train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

    # 모델 초기화
    rf = RandomForestRegressor(featuresCol="features", labelCol=column)

    # 모델 학습
    rf_model = rf.fit(train_data)

    # 예측 수행
    predictions = rf_model.transform(test_data)

    # 평가
    evaluator = RegressionEvaluator(labelCol=column, predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) for {column}: {rmse}")

    # 피처 중요도 출력
    importances = rf_model.featureImportances
    feature_importance_info = [(feature_columns[i], importances[i]) for i in range(len(feature_columns)) if importances[i] > 0]
    feature_importance_info_sorted = sorted(feature_importance_info, key=lambda x: x[1], reverse=True)

    print(f"Feature importances for {column}:")
    for feature, importance in feature_importance_info_sorted:
        print(f"  - {feature}: {importance}")


Root Mean Squared Error (RMSE) for 가구/인테리어: 214.31080884259126
Feature importances for 가구/인테리어:
  - 요일: 0.42940552615889815
  - 공휴일: 0.171497314524067
  - 터미널이름_indexed: 0.14351550457846732
  - 월: 0.1428127873843345
  - 80대 이상 비율: 0.01661281501074096
  - features: 0.012079171498273814
  - 소득5천만원주민비율: 0.007913930930144277
  - 소득7천만원주민비율: 0.007206028351747903
  - 40-50대 비율: 0.006135204191257659
  - 일: 0.0050002540761665245
  - 소득4천만원주민비율: 0.0049695272075805826
  - 60-70대 비율: 0.002816004195884257
  - 20-30대 비율: 0.0028140256180227254
  - 총 인구: 0.0012930856777228473
  - 20대 미만 비율: 0.0008541616831862291
  - 소득6천만원주민비율: 0.0003172012288460019
  - 소득7천만원이상주민비율: 0.00030477394108234664
  - 소득3천만원주민비율: 1.3788696567704683e-05
  - 소득2천만원주민비율: 9.704560798393978e-06
Root Mean Squared Error (RMSE) for 기타: 1979.89491121395
Feature importances for 기타:
  - 월: 0.5537764573045878
  - 요일: 0.18684153604566972
  - 공휴일: 0.07788520967399908
  - 터미널이름_indexed: 0.04842208990439072
  - 일: 0.045664594914151646
  - 6

## Spark 종료

In [42]:
spark.stop()