In [84]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import regexp_extract, col
import re

In [89]:
import os 
cwd = os.getcwd()
wine_data_path= os.path.join(cwd, 'running_spark_data/redwine3.csv')
wine_data_path

'/home/jovyan/work/start_spark/start_spark/running_spark_data/redwine3.csv'

In [90]:
file_path = f"file:///{wine_data_path.replace(os.sep,'/')}"
file_path

'file:////home/jovyan/work/start_spark/start_spark/running_spark_data/redwine3.csv'

In [91]:
from pyspark.sql import SparkSession
MAX_MEMORY ='8g'
spark = SparkSession.builder.appName("practice_wine").config('spark.driver.memory',MAX_MEMORY).config('spark.executer.memory',MAX_MEMORY).getOrCreate()

In [92]:
wine_df = spark.read.csv(wine_data_path, header=True, inferSchema=True)
wine_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- producer: string (nullable = true)
 |-- nation: string (nullable = true)
 |-- local1: string (nullable = true)
 |-- local2: string (nullable = true)
 |-- local3: string (nullable = true)
 |-- local4: string (nullable = true)
 |-- varieties1: string (nullable = true)
 |-- varieties2: string (nullable = true)
 |-- varieties3: string (nullable = true)
 |-- varieties4: string (nullable = true)
 |-- varieties5: string (nullable = true)
 |-- varieties6: string (nullable = true)
 |-- varieties7: string (nullable = true)
 |-- varieties8: string (nullable = true)
 |-- varieties9: string (nullable = true)
 |-- varieties10: string (nullable = true)
 |-- varieties11: string (nullable = true)
 |-- varieties12: string (nullable = true)
 |-- type: string (nullable = true)
 |-- use: string (nullable = true)
 |-- abv: string (nullable = true)
 |-- degree: string (nullable = true)
 |-- sw

In [93]:
from pyspark.sql.functions import regexp_extract, col

wine_df = wine_df.withColumn('sweet', regexp_extract(col('sweet'), r'\d+', 0).cast('int'))
wine_df = wine_df.withColumn('acidity', regexp_extract(col('acidity'), r'\d+', 0).cast('int'))
wine_df = wine_df.withColumn('body', regexp_extract(col('body'), r'\d+', 0).cast('int'))
wine_df = wine_df.withColumn('tannin', regexp_extract(col('tannin'), r'\d+', 0).cast('int'))

In [94]:
from pyspark.sql.functions import col, sum, desc, expr
from functools import reduce

def filter_by_top_n(df, limit_local1, limit_variety):
    """
    지정된 상위 N개 생산지와 품종으로 데이터프레임을 필터링하는 함수.

    :param df: PySpark DataFrame (wine_df)
    :param limit_local1: 상위 생산지 개수
    :param limit_variety: 상위 품종 개수
    :return: 필터링된 DataFrame과 그 개수
    """
    
    print(f"\n--- 생산지 상위 {limit_local1}개, 품종 상위 {limit_variety}개로 필터링 시작 ---")
    
    # 1. 생산지(local1) 상위 목록 추출
    top_local1_list = [
        row.local1 for row in df.groupBy("local1")
                                 .agg(sum("ml").alias("total_ml"))
                                 .orderBy(desc("total_ml"))
                                 .limit(limit_local1)
                                 .collect()
    ]
    
    # 2. 품종(varieties1 ~ varieties12) 상위 목록 추출
    varieties_cols = [f"varieties{i}" for i in range(1, 13)]
    stack_expr = expr("stack(12, " + ", ".join(f"'{c}', `{c}`" for c in varieties_cols) + ") as (col_name, variety)")
    
    top_variety_df = (
        df.select(col("ml"), stack_expr)
        .groupBy("variety")
        .agg(sum("ml").alias("total_ml"))
        .orderBy(desc("total_ml"))
        .filter(col("variety").isNotNull())
        .limit(limit_variety)
    )
    top_variety_list = [row.variety for row in top_variety_df.collect()]
    
    # 3. 필터링 실행
    variety_filter_condition = reduce(
        lambda x, y: x | y, 
        [col(variety).isin(top_variety_list) for variety in varieties_cols]
    )
    filtered_df = df.filter(
        col("local1").isin(top_local1_list) & 
        variety_filter_condition
    )
    
    # 4. 결과 반환
    filtered_count = filtered_df.count()
    return filtered_df, filtered_count

# 사용 예시
# 데이터프레임 'wine_df'는 미리 선언되어 있어야 합니다.



# 예시 3: 생산지 상위 30개, 품종 상위 25개
filtered_df, count_27_30 = filter_by_top_n(wine_df, 27, 30)
print(f"생산지 27개, 품종 30개로 필터링된 데이터 수: {count_27_30}개")


--- 생산지 상위 27개, 품종 상위 30개로 필터링 시작 ---
생산지 27개, 품종 30개로 필터링된 데이터 수: 4313개


In [95]:
wine_df=filtered_df

In [96]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType
import re

# UDF(사용자 정의 함수) 생성
# '12~13' 형태의 문자열을 받아 평균값(12.5)을 반환하는 함수입니다.
@udf(FloatType())
def calculate_average_abv(abv_str):
    if abv_str is None:
        return None
    
    # 숫자만 추출하기 위해 ~ 기호로 분리
    parts = abv_str.split('~')
    
    try:
        if len(parts) == 2:
            # '12~13'인 경우: (12+13)/2
            return (float(parts[0]) + float(parts[1])) / 2
        elif len(parts) == 1:
            # '12'인 경우: 12
            return float(parts[0])
        else:
            # 예상치 못한 형식인 경우: None 반환
            return None
    except (ValueError, IndexError):
        # 숫자로 변환할 수 없는 경우 None 반환
        return None

# 'final_clean_df'에 새로운 'numeric_abv' 컬럼 추가
# 기존 abv 컬럼은 유지하고, 변환된 값을 새로운 컬럼에 저장합니다.
final_clean_df_with_numeric_abv = wine_df.withColumn(
    "numeric_abv", 
    calculate_average_abv(col("abv"))
)

# 결과 확인 (원하는 만큼의 행을 보여줍니다)
final_clean_df_with_numeric_abv.select("abv", "numeric_abv").show(10)
final_clean_df_with_numeric_abv.printSchema()

+-----+-----------+
|  abv|numeric_abv|
+-----+-----------+
|14~15|       14.5|
|14~15|       14.5|
|13~14|       13.5|
|13~14|       13.5|
|13~14|       13.5|
|12~13|       12.5|
|12~13|       12.5|
| 13.5|       13.5|
|13~14|       13.5|
|13~14|       13.5|
+-----+-----------+
only showing top 10 rows

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- producer: string (nullable = true)
 |-- nation: string (nullable = true)
 |-- local1: string (nullable = true)
 |-- local2: string (nullable = true)
 |-- local3: string (nullable = true)
 |-- local4: string (nullable = true)
 |-- varieties1: string (nullable = true)
 |-- varieties2: string (nullable = true)
 |-- varieties3: string (nullable = true)
 |-- varieties4: string (nullable = true)
 |-- varieties5: string (nullable = true)
 |-- varieties6: string (nullable = true)
 |-- varieties7: string (nullable = true)
 |-- varieties8: string (nullable = true)
 |-- varieties9:

In [97]:
# 'collect()'를 사용하지 않고 변수에 바로 할당
wine_df = final_clean_df_with_numeric_abv

# 이제 wine_df를 사용하면 numeric_abv 컬럼이 포함된 데이터프레임으로 작업할 수 있습니다.
wine_df.show(5)
wine_df.printSchema()

+---+------+--------------------+--------+------+---------------+--------------------+------+------+------------------+------------------+----------+--------------+--------------+------------+----------+----------+----------+-----------+-----------+-----------+----+-----+-----+------+-----+-------+----+------+------+----+---+-----------+
|_c0|    id|                name|producer|nation|         local1|              local2|local3|local4|        varieties1|        varieties2|varieties3|    varieties4|    varieties5|  varieties6|varieties7|varieties8|varieties9|varieties10|varieties11|varieties12|type|  use|  abv|degree|sweet|acidity|body|tannin| price|year| ml|numeric_abv|
+---+------+--------------------+--------+------+---------------+--------------------+------+------+------------------+------------------+----------+--------------+--------------+------------+----------+----------+----------+-----------+-----------+-----------+----+-----+-----+------+-----+-------+----+------+------+--

In [98]:
# 'collect()'를 사용하지 않고 변수에 바로 할당
wine_df = final_clean_df_with_numeric_abv

# 이제 wine_df를 사용하면 numeric_abv 컬럼이 포함된 데이터프레임으로 작업할 수 있습니다.
wine_df.show(5)
wine_df.printSchema()

+---+------+--------------------+--------+------+---------------+--------------------+------+------+------------------+------------------+----------+--------------+--------------+------------+----------+----------+----------+-----------+-----------+-----------+----+-----+-----+------+-----+-------+----+------+------+----+---+-----------+
|_c0|    id|                name|producer|nation|         local1|              local2|local3|local4|        varieties1|        varieties2|varieties3|    varieties4|    varieties5|  varieties6|varieties7|varieties8|varieties9|varieties10|varieties11|varieties12|type|  use|  abv|degree|sweet|acidity|body|tannin| price|year| ml|numeric_abv|
+---+------+--------------------+--------+------+---------------+--------------------+------+------+------------------+------------------+----------+--------------+--------------+------------+----------+----------+----------+-----------+-----------+-----------+----+-----+-----+------+-----+-------+----+------+------+--

In [99]:
wine_df.createOrReplaceTempView('wines')

In [275]:
query = """
SELECT 
    id,
    nation,
    local1,
    varieties1,
    numeric_abv,
    sweet,
    acidity,
    body,
    tannin,
    year,
    price
FROM
    wines

"""

In [276]:
wine_df = spark.sql(query)

In [277]:
wine_df.count()

4313

In [278]:
stages = []

In [279]:
train_df, test_df = wine_df.randomSplit([0.8,0.2], seed=5)

In [280]:
from pyspark.sql.functions import concat, col, lit, log
train_df = train_df.withColumn("local_varieties", concat(col("local1"), lit("_"), col("varieties1")))

In [281]:
train_df = train_df.withColumn("log_price", log(col("price")))

In [282]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

cat_features = ['local1', 'varieties1', 'nation']
num_features = ['numeric_abv', 'sweet', 'acidity', 'body', 'tannin','year']
stages = []

# 1. 범주형 특성 전처리: StringIndexer -> OneHotEncoder
for cat in cat_features:
    cat_index = StringIndexer(inputCol=cat, outputCol=cat + '_idx').setHandleInvalid('keep')
    onehot_encode = OneHotEncoder(inputCols=[cat_index.getOutputCol()], outputCols=[cat + '_onehot'])
    stages += [cat_index, onehot_encode]

# 2. 숫자형 특성 전처리: 모든 숫자 컬럼을 하나의 벡터로 통합 후 표준화
num_assembler = VectorAssembler(inputCols=num_features, outputCol="num_features_vector", handleInvalid="keep" )
num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol="scaled_num_features")
stages += [num_assembler, num_scaler]

# 3. 최종 통합: 범주형 원-핫 벡터 + 표준화된 숫자형 벡터를 하나의 'feature_vector'로 합치기
assembler_inputs = [cat + '_onehot' for cat in cat_features] + ['scaled_num_features']
final_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='feature_vector')
stages.append(final_assembler)

# 4. 파이프라인 구축
pipeline = Pipeline(stages=stages)

# 'train_df' 데이터프레임으로 파이프라인 학습 및 변환
fitted_pipeline = pipeline.fit(train_df)
vtrain_df = fitted_pipeline.transform(train_df)

# 결과 확인
vtrain_df.select('feature_vector').show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|feature_vector                                                                                                                                             |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|(79,[6,27,67,73,74,75,76,77,78],[1.0,1.0,1.0,18.842783331592937,4.738568725394357,6.61519058363259,6.323813752752317,5.249496841501109,425.8773378589119]) |
|(79,[6,27,67,73,74,75,76,77,78],[1.0,1.0,1.0,18.842783331592937,4.738568725394357,4.961392937724443,5.059051002201853,5.249496841501109,426.3002547783348])|
|(79,[1,29,64,73,74,75,76,77,78],[1.0,1.0,1.0,17.54328103286239,4.738568725394357,4.961392937724443,2.5295255011009266,3.937122631125832,425.2429624797775])|
|(79,[9,27,64,73,74,75,76,77,78],[1.0,1.0,1.0,17.543

In [283]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
fitted_transform = pipeline.fit(train_df)
vtrain_df = fitted_transform.transform(train_df)
vtrain_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nation: string (nullable = true)
 |-- local1: string (nullable = true)
 |-- varieties1: string (nullable = true)
 |-- numeric_abv: float (nullable = true)
 |-- sweet: integer (nullable = true)
 |-- acidity: integer (nullable = true)
 |-- body: integer (nullable = true)
 |-- tannin: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- local_varieties: string (nullable = true)
 |-- log_price: double (nullable = true)
 |-- local1_idx: double (nullable = false)
 |-- local1_onehot: vector (nullable = true)
 |-- varieties1_idx: double (nullable = false)
 |-- varieties1_onehot: vector (nullable = true)
 |-- nation_idx: double (nullable = false)
 |-- nation_onehot: vector (nullable = true)
 |-- num_features_vector: vector (nullable = true)
 |-- scaled_num_features: vector (nullable = true)
 |-- feature_vector: vector (nullable = true)



In [284]:
vtrain_df.select('feature_vector').show()

+--------------------+
|      feature_vector|
+--------------------+
|(79,[6,27,67,73,7...|
|(79,[6,27,67,73,7...|
|(79,[1,29,64,73,7...|
|(79,[9,27,64,73,7...|
|(79,[9,29,64,73,7...|
|(79,[1,27,64,73,7...|
|(79,[1,29,64,73,7...|
|(79,[1,29,64,73,7...|
|(79,[1,27,64,73,7...|
|(79,[1,27,64,73,7...|
|(79,[1,27,64,73,7...|
|(79,[1,27,64,73,7...|
|(79,[1,27,64,73,7...|
|(79,[1,27,64,73,7...|
|(79,[6,36,67,73,7...|
|(79,[10,27,67,73,...|
|(79,[10,36,67,73,...|
|(79,[11,27,67,73,...|
|(79,[6,36,67,73,7...|
|(79,[0,27,66,73,7...|
+--------------------+
only showing top 20 rows



In [285]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression( maxIter=50, solver='normal', labelCol='log_price', featuresCol='feature_vector')

In [286]:
model = lr.fit(vtrain_df)

In [287]:
#테스트데이터 변환
vtest_df = fitted_transform.transform(test_df)
#테스트데이터로 예측
pred=model.transform(vtest_df)

In [288]:
pred.cache()

DataFrame[id: int, nation: string, local1: string, varieties1: string, numeric_abv: float, sweet: int, acidity: int, body: int, tannin: int, year: int, price: int, local1_idx: double, local1_onehot: vector, varieties1_idx: double, varieties1_onehot: vector, nation_idx: double, nation_onehot: vector, num_features_vector: vector, scaled_num_features: vector, feature_vector: vector, prediction: double]

In [269]:
pred.select('price','prediction').show(100)

+-------+------------------+
|  price|        prediction|
+-------+------------------+
| 250000|11.564127549579332|
|  32000|10.676691132268902|
|  98000|11.596781713708836|
|  50000|10.582721678861919|
|  30000|10.611048584959754|
| 130000| 12.37494297403019|
| 230000|12.866311105532374|
|  14000|   9.8694852340638|
|  20000| 9.608655888054141|
|  20000| 9.540900832061492|
|  70000|10.973244407433581|
|  80000|11.235996674237027|
|  45000|11.371729534413209|
|  45000|10.968069658717669|
|  21000|  10.4835861977227|
|  17000| 9.480817816963432|
|  25000|10.402923167134704|
| 240000|12.462779570784168|
| 500000|12.878777673622112|
|  36000|11.754418819770384|
|  42000|11.512841978542738|
|  82000|11.698791800033476|
|  64000|11.347803504849113|
| 500000|11.700833776162932|
| 145000| 11.99377947134037|
| 350000|11.811948680452984|
|  55000|10.724837098305708|
| 100000|11.640987502767452|
|  24000|11.577319392707379|
| 103000|12.067879762854098|
|  24000|10.575947693096612|
|  24800| 10.4

In [270]:
from pyspark.sql.functions import exp, col

#    'prediction' 컬럼에는 log(price) 값이 들어있습니다.
predictions = model.transform(vtrain_df)

predictions_with_actual_price = predictions.withColumn(
    "predicted_price", exp(col("prediction"))
)

# 3. 원본 가격('price')과 변환된 예측 가격('predicted_price')을 비교합니다.
predictions_with_actual_price.select('price', 'predicted_price').show(100)

+-------+------------------+
|  price|   predicted_price|
+-------+------------------+
| 220000|133745.55448520146|
| 110000| 76956.07605978107|
|  21000| 35863.00831890071|
|  19000|20024.709446152854|
|  19000| 36009.61842251612|
|  32000| 39834.93875593972|
|  42000| 19584.33820559214|
|  85000| 58302.31215336572|
|  50000| 92556.37245663983|
|  50000|62464.955015978194|
| 140000|108747.25570204889|
|  80000|  53556.0353123658|
| 120000|158734.66100435582|
| 130000|108217.34383057151|
| 450000|175359.46391172236|
|  22000|23714.511476594882|
|  22000|23620.149586948606|
| 100000| 57359.41894063797|
| 100000| 76649.86183462487|
|  55000|268298.83704738016|
|  59000|139194.35881657578|
|  70000|118088.45215290408|
|  70000| 96992.33702761288|
|  70000| 94595.55599597152|
|  22500|49681.969465775204|
|  36000|50605.017860820066|
|  34000| 41056.96490289258|
|  45000| 51419.25275617721|
|  26000| 34745.08623851022|
|  45000|43333.926635697564|
|  45000| 42616.59025806644|
|  17000|14345

In [271]:
model.summary.r2, model.summary.rootMeanSquaredError

(0.642213770799891, 0.5741125275680323)

In [272]:
# 추가로 임포트할 RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressor

In [234]:
# 1. RandomForestRegressor 모델 객체 생성
# labelCol에는 로그 변환된 'log_price' 컬럼을 사용합니다.
rf = RandomForestRegressor(featuresCol='feature_vector', labelCol='log_price')

# 2. 파이프라인에 RandomForestRegressor 추가
# 기존 파이프라인의 LinearRegression을 RandomForestRegressor로 교체합니다.
stages_for_rf = stages + [rf]  # stages는 이전 답변에서 만들었던 리스트입니다.
rf_pipeline = Pipeline(stages=stages_for_rf)

# 3. 파이프라인 학습
rf_model = rf_pipeline.fit(train_df)

# 4. 테스트 데이터셋에 적용 및 예측
predictions = rf_model.transform(test_df)

# 5. 예측값(로그 스케일)을 실제 가격으로 변환
predict = predictions.withColumn(
    "predicted_price", exp(col("prediction"))
)

# 6. 결과 확인
predict.select('price', 'predicted_price').show(10)

+------+------------------+
| price|   predicted_price|
+------+------------------+
|250000| 85987.25275210818|
| 32000|44939.715379450936|
| 98000| 95560.97855535509|
| 50000| 78713.07741211768|
| 30000|49239.157473770996|
|130000|166277.96643124262|
|230000|101116.61488612968|
| 14000|38477.163722884936|
| 20000| 42893.38708016147|
| 20000| 42893.38708016147|
+------+------------------+
only showing top 10 rows



In [235]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="log_price", predictionCol="prediction", metricName="r2"
)

In [236]:
# 1. 모델이 예측한 predictions 데이터프레임에 'price' 컬럼을 로그 변환하여 'log_price' 컬럼을 추가합니다.
predictions_with_log_price = predictions.withColumn("log_price", log(col("price")))

# 2. RegressionEvaluator를 사용하여 R²를 계산합니다.
#    이제 predictions_with_log_price에는 'log_price' 컬럼이 존재합니다.
evaluator = RegressionEvaluator(
    labelCol="log_price", predictionCol="prediction", metricName="r2"
)

r2_score = evaluator.evaluate(predictions_with_log_price)

print(f"Random Forest 모델의 R-squared 값: {r2_score}")

Random Forest 모델의 R-squared 값: 0.5333010249555261
