In [1]:
import os
os.environ["SPARK_HOME"] = "/home/hadoop/spark"

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName = "tomato")

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

24/06/06 18:50:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import os

# 새로운 SparkSession 생성
spark = SparkSession.builder \
    .appName("Tomato Data Analysis") \
    .getOrCreate()

# 경로 설정
data_path="hdfs://master01:9000/user/hadoop/strawberry/"
# CSV 파일 리스트
csv_files = [
    "STRAWBERRY_ACIDITY_ENV_20221212.csv",
    "STRAWBERRY_FLOWER_NUM_ENV_20221209.csv",
    "STRAWBERRY_FLOWERLESS_NUM_ENV_20221210.csv",
    "STRAWBERRY_FLOWER_NUM_ENV_20221209.csv",
    "STRAWBERRY_FRUIT_LEN_ENV_20221209.csv",
    "STRAWBERRY_FRUIT_QUALITY_INFO_ENV_20221209.csv",
    "STRAWBERRY_FRUIT_QUANTITY_INFO_ENV_20221209.csv",
    "STRAWBERRY_FRUIT_SETTING_ENV_20221209.csv",
    "STRAWBERRY_FRUIT_WEIGHT_ENV_20221209.csv",
    "STRAWBERRY_FRUIT_WIDTH_ENV_20221209.csv",
    "STRAWBERRY_GROWTH_LENGTH_ENV_20221209.csv",
    "STRAWBERRY_LEAF_INFO_ENV_20221209.csv",
    "STRAWBERRY_LEAF_LEN_ENV_20221203.csv",
    "STRAWBERRY_LEAF_NUM_ENV_20221209.csv",
    "STRAWBERRY_LEAF_WIDTH_ENV_20221209.csv",
    "STRAWBERRY_PETIOLE_LEN_ENV_20221209.csv",
    "STRAWBERRY_PRODUCTION_ENV_20221209.csv",
    "STRAWBERRY_ROOTING_SEASON_ENV_20221203.csv",
    "STRAWBERRY_ROOT_NUM_ENV_20221209.csv",
    "STRAWBERRY_SOIL_SURFACE_LEN_ENV_20221202.csv",
    "STRAWBERRY_STEM_INFO_ENV_20221209.csv",
    "STRAWBERRY_SUGAR_CONTENT_ENV_20221209.csv",
    "STRAWBERRY_THECA_DIAMETER_ENV_20221209.csv"
]

# 모든 파일에서 열 이름 수집
all_columns = set()
for file in csv_files:
    df = spark.read.csv(data_path+file, header=True, inferSchema=True)
    all_columns.update(df.columns)

all_columns = list(all_columns)
all_columns.remove("STRG_DT")  # MSRM_DT는 중복으로 처리하지 않음
all_columns = ["STRG_DT"] + all_columns

# 각 파일을 읽어와서 데이터 채우기
dataframes = []
for file in csv_files:
    df = spark.read.csv(os.path.join(data_path, file), header=True, inferSchema=True)
    df = df.drop("ZONE_NM")  # ZONE_NM 열 제거
    for col in all_columns:
        if col not in df.columns:
            df = df.withColumn(col, F.lit(None).cast(T.StringType()))
    dataframes.append(df.select(all_columns))

# 모든 데이터프레임 병합
merged_df = dataframes[0]
for df in dataframes[1:]:
    merged_df = merged_df.union(df)

# 중복된 열에 대한 평균 계산
agg_exprs = []
for col in merged_df.columns:
    if col != "STRG_DT":
        agg_exprs.append(F.mean(col).alias(col))

final_df = merged_df.groupBy("STRG_DT").agg(*agg_exprs)
final_df_sorted = final_df.orderBy("STRG_DT")

# 통합된 데이터 저장
output_path = "hdfs://master01:9000/user/hadoop/test_merge_strawberry_data.csv"
final_df_sorted.coalesce(1).write.csv(output_path, header=True, mode='overwrite')

                                                                                