# 데이터 품질 테스트

## 0. Spark Session 생성

In [1]:
#pip install pyspark==3.1.2

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark
print(pyspark.__version__)

3.1.2


In [8]:
from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
    .appName("HDFS File Reading") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

In [4]:
spark.stop()

## 1. JSON, WAV 파일 DataFrame으로 변환(테스트, 파일 1개)

### 1. JSON 파일 DataFrame으로 변환

In [5]:
# HDFS에서 JSON 파일 읽기
file_path = "hdfs://localhost:9000/shared_data/label_data/1.Car/1.horn_of_car/1.car_horn_1.json"

In [9]:
file_path

'hdfs://localhost:9000/shared_data/label_data/1.Car/1.horn_of_car/1.car_horn_1.json'

In [10]:
df_test = spark.read.json(file_path, multiLine= True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [11]:
df_test.printSchema()

root
 |-- annotations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- area: struct (nullable = true)
 |    |    |    |-- end: double (nullable = true)
 |    |    |    |-- start: double (nullable = true)
 |    |    |-- categories: struct (nullable = true)
 |    |    |    |-- category_01: string (nullable = true)
 |    |    |    |-- category_02: string (nullable = true)
 |    |    |    |-- category_03: string (nullable = true)
 |    |    |-- decibel: long (nullable = true)
 |    |    |-- labelName: string (nullable = true)
 |    |    |-- soundQuality: string (nullable = true)
 |    |    |-- subCategory: string (nullable = true)
 |-- audio: struct (nullable = true)
 |    |-- bitRate: string (nullable = true)
 |    |-- duration: double (nullable = true)
 |    |-- fileFormat: string (nullable = true)
 |    |-- fileName: string (nullable = true)
 |    |-- fileSize: long (nullable = true)
 |    |-- recodingType: string (nullable = true)
 |    |-- sample

In [26]:
df_test.show()

+--------------------+--------------------+-----------------------------+--------------------+--------------------+
|         annotations|               audio|                  environment|                info|             license|
+--------------------+--------------------+-----------------------------+--------------------+--------------------+
|[{{4.88, 3.45}, {...|{705kbps, 9.2, wa...|{갤럭시탭S6, 자연적, 제작,...|{IMR, 2021-09-04,...|{CC 0, https://ww...|
+--------------------+--------------------+-----------------------------+--------------------+--------------------+



In [27]:
from pyspark.sql.functions import col, explode

# annotations 배열을 개별 행으로 변환
df_flattened = df_test.withColumn("annotation", explode(col("annotations")))

# 구조체 내부의 필드를 개별 컬럼으로 변환
df_flattened = df_flattened.select(
    # annotation 내부 필드
    col("annotation.area.start").alias("area_start"),
    col("annotation.area.end").alias("area_end"),
    col("annotation.categories.category_01").alias("category_01"),
    col("annotation.categories.category_02").alias("category_02"),
    col("annotation.categories.category_03").alias("category_03"),
    col("annotation.decibel").alias("decibel"),
    col("annotation.labelName").alias("labelName"),
    col("annotation.soundQuality").alias("soundQuality"),
    col("annotation.subCategory").alias("subCategory"),

    # audio 내부 필드
    col("audio.bitRate").alias("bitRate"),
    col("audio.duration").alias("duration"),
    col("audio.fileFormat").alias("fileFormat"),
    col("audio.fileName").alias("fileName"),
    col("audio.fileSize").alias("fileSize"),
    col("audio.recodingType").alias("recodingType"),
    col("audio.sampleRate").alias("sampleRate"),

    # environment 내부 필드
    col("environment.acqDevice").alias("acqDevice"),
    col("environment.acqMethod").alias("acqMethod"),
    col("environment.acqType").alias("acqType"),
    col("environment.areaUse").alias("areaUse"),
    col("environment.dayNight").alias("dayNight"),
    col("environment.direction").alias("direction"),
    col("environment.distance").alias("distance"),
    col("environment.district").alias("district"),
    col("environment.gps.latitude").alias("latitude"),
    col("environment.gps.longitude").alias("longitude"),
    col("environment.micClass").alias("micClass"),
    col("environment.obstacle").alias("obstacle"),
    col("environment.place").alias("place"),
    col("environment.recordingTime").alias("recordingTime"),
    col("environment.urban").alias("urban"),
    col("environment.weather").alias("weather"),

    # info 내부 필드
    col("info.contributor").alias("contributor"),
    col("info.dateCreated").alias("dateCreated"),
    col("info.description").alias("description"),
    col("info.uri").alias("uri"),
    col("info.version").alias("version"),
    col("info.year").alias("year"),

    # license 내부 필드
    col("license.name").alias("license_name"),
    col("license.url").alias("license_url")
)

# 결과 출력 (줄임 없이)
df_flattened.show(truncate=False)

25/02/14 13:25:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+--------+-----------+-----------+-----------+-------+----------------+------------+-----------+-------+--------+----------+--------------+--------+------------+----------+----------+---------+-------+--------+--------+---------+--------+--------+--------+---------+--------+--------+------+-------------+----------+-------+-----------+-----------+----------------+-----------------------+-------+----+------------+-----------------------+
|area_start|area_end|category_01|category_02|category_03|decibel|labelName       |soundQuality|subCategory|bitRate|duration|fileFormat|fileName      |fileSize|recodingType|sampleRate|acqDevice |acqMethod|acqType|areaUse |dayNight|direction|distance|district|latitude|longitude|micClass|obstacle|place |recordingTime|urban     |weather|contributor|dateCreated|description     |uri                    |version|year|license_name|license_url            |
+----------+--------+-----------+-----------+-----------+-------+----------------+------------+---

In [28]:
display(df_flattened.toPandas())

Unnamed: 0,area_start,area_end,category_01,category_02,category_03,decibel,labelName,soundQuality,subCategory,bitRate,...,urban,weather,contributor,dateCreated,description,uri,version,year,license_name,license_url
0,3.45,4.88,교통소음,자동차,차량경적,72,1.자동차_1_1.wav,정상,소형차경적,705kbps,...,서울특별시,맑음,IMR,2021-09-04,도시 소리 데이터,https://www.aihub.or.kr,1.0,2021,CC 0,https://www.aihub.or.kr


### 2. WAV 파일 DataFrame으로 변환

In [12]:
import pyspark
from pyspark.sql import SparkSession
import io
from scipy.io import wavfile
import librosa
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# HDFS 경로 설정
hdfs_path = "hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_9_1.wav"

# HDFS에서 WAV 파일 읽기 (binaryFile 포맷 사용)
binary_df = spark.read.format("binaryFile").load(hdfs_path)

# 바이너리 데이터 추출
binary_data = binary_df.select("content").collect()[0][0]

# 바이너리 데이터를 메모리 파일로 변환
audio_bytes = io.BytesIO(binary_data)

# scipy로 WAV 파일 읽기
sr, audio = wavfile.read(audio_bytes)
print("WAV 파일 샘플링 레이트 (scipy):", sr)

# librosa를 사용해 WAV 파일을 리샘플링
audio_librosa, sr_librosa = librosa.load(audio_bytes, sr=None)
print("librosa로 처리한 샘플링 레이트:", sr_librosa)

# MFCC 추출
mfcc = librosa.feature.mfcc(y=audio_librosa, sr=sr_librosa, n_mfcc=13)  # 13개의 MFCC 특징 추출

# MFCC 데이터 프레임으로 변환
# 각 MFCC 값을 배열로 변환
mfcc_list = mfcc.T.tolist()  # MFCC는 2D 배열이므로 이를 각 행별로 리스트로 변환

# Spark DataFrame으로 변환
df_mfcc = spark.createDataFrame([(i, *mfcc_list[i]) for i in range(len(mfcc_list))], 
                                ['index'] + [f'mfcc_{i+1}' for i in range(13)])

df_mfcc.show()

WAV 파일 샘플링 레이트 (scipy): 44100
librosa로 처리한 샘플링 레이트: 44100


25/02/17 14:27:10 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11

Py4JJavaError: An error occurred while calling o92.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (ip-172-31-11-83.ap-northeast-3.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [14]:
display(df_mfcc.toPandas())

Unnamed: 0,index,mfcc_1,mfcc_2,mfcc_3,mfcc_4,mfcc_5,mfcc_6,mfcc_7,mfcc_8,mfcc_9,mfcc_10,mfcc_11,mfcc_12,mfcc_13
0,0,-238.612213,133.487045,-1.284238,22.370865,22.564083,6.518252,-3.192569,1.113762,-7.715192,-3.464986,2.922217,7.885314,1.067608
1,1,-234.819626,152.233673,-10.859320,34.342796,19.500042,10.937912,1.760601,13.675949,-4.060371,-0.560500,-1.439846,-0.002022,-2.775231
2,2,-254.145233,165.326401,-27.593834,47.074913,6.318587,17.216387,-1.539557,20.285751,-3.986101,4.778692,-5.949762,-1.714797,-5.803444
3,3,-253.305832,159.803238,-33.335686,42.717648,4.379756,20.070423,1.405201,16.080278,-5.679544,3.018893,-3.229465,5.645140,-6.073312
4,4,-245.418915,153.193115,-32.850662,44.121162,0.510546,17.596771,-4.350383,5.619840,-7.632173,-1.984774,-6.154417,4.396658,-13.112883
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
915,915,-224.675110,166.235565,-21.254143,35.928074,9.500803,16.222107,4.120435,23.599682,7.030793,9.776043,4.211914,3.731602,0.523003
916,916,-216.340363,174.400177,-18.497841,32.879936,12.142467,19.131794,0.880060,14.683958,7.002939,8.431215,0.941998,4.359944,-2.096164
917,917,-220.770462,165.227661,-12.930012,33.392899,8.737799,18.818878,-3.677454,12.477207,5.245459,8.977969,-4.575911,-3.837185,-8.276455
918,918,-220.862579,161.087646,-15.604593,41.479973,11.880829,19.706432,-1.011476,15.012062,2.033504,9.690787,-2.105361,0.087690,-10.177952


In [18]:
import pyspark
from pyspark.sql import SparkSession
import io
from scipy.io import wavfile
import librosa
import numpy as np
import os  # os 모듈을 추가하여 경로에서 파일 이름만 추출
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# HDFS 경로 설정
hdfs_path = "hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_9_1.wav"

# HDFS에서 WAV 파일 읽기 (binaryFile 포맷 사용)
binary_df = spark.read.format("binaryFile").load(hdfs_path)

# 바이너리 데이터 추출
binary_data = binary_df.select("content").collect()[0][0]
file_path = binary_df.select("path").collect()[0][0]  # 전체 경로를 추출

# 파일 이름만 추출 (HDFS 경로에서 파일 이름만 분리)
file_name = os.path.basename(file_path)

# 바이너리 데이터를 메모리 파일로 변환
audio_bytes = io.BytesIO(binary_data)

# scipy로 WAV 파일 읽기
sr, audio = wavfile.read(audio_bytes)
print("WAV 파일 샘플링 레이트 (scipy):", sr)

# librosa를 사용해 WAV 파일을 리샘플링
audio_librosa, sr_librosa = librosa.load(audio_bytes, sr=None)
print("librosa로 처리한 샘플링 레이트:", sr_librosa)

# MFCC 추출
mfcc = librosa.feature.mfcc(y=audio_librosa, sr=sr_librosa, n_mfcc=13)  # 13개의 MFCC 특징 추출

# MFCC 평균 계산
mfcc_mean = np.mean(mfcc, axis=1).astype(float)  # (13, ) 크기의 평균 MFCC 값 배열, numpy float32에서 float로 변환

# 파일 이름과 평균 MFCC 값을 DataFrame에 넣기
df_mfcc_mean = spark.createDataFrame([(file_name, *mfcc_mean.tolist())],  # numpy 배열을 리스트로 변환
                                     ['file_name'] + [f'mfcc_{i+1}' for i in range(13)])

df_mfcc_mean.show()


WAV 파일 샘플링 레이트 (scipy): 44100
librosa로 처리한 샘플링 레이트: 44100
+------------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+-------------------+-----------------+------------------+
|         file_name|             mfcc_1|           mfcc_2|             mfcc_3|            mfcc_4|           mfcc_5|            mfcc_6|            mfcc_7|           mfcc_8|           mfcc_9|          mfcc_10|            mfcc_11|          mfcc_12|           mfcc_13|
+------------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+-------------------+-----------------+------------------+
|1.car_horn_9_1.wav|-251.55934143066406|173.8914794921875|-30.198183059692383|45.578636169433594|8.823283195495605|17.660017013549805|-4.3403968811

In [19]:
display(df_mfcc_mean.toPandas())

Unnamed: 0,file_name,mfcc_1,mfcc_2,mfcc_3,mfcc_4,mfcc_5,mfcc_6,mfcc_7,mfcc_8,mfcc_9,mfcc_10,mfcc_11,mfcc_12,mfcc_13
0,1.car_horn_9_1.wav,-251.559341,173.891479,-30.198183,45.578636,8.823283,17.660017,-4.340397,10.337169,4.226048,1.150458,-1.714232,2.508801,-1.459536


## 2. 데이터 전처리 - JSON

### 1. Json 파일 로드 및 DataFrame으로 변환

In [4]:
hdfs_path = "hdfs://localhost:9000/shared_data"

# JSON 파일만 불러오기
df = spark.read.option("recursiveFileLookup", "true") \
               .option("pathGlobFilter", "*.json") \
               .json(hdfs_path, multiLine=True)

df.show(5)

25/02/17 12:11:33 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
25/02/17 12:11:48 WARN TaskSetManager: Stage 0 contains a task of very large size (2196 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------------+--------------------+-----------------------------+--------------------+--------------------+
|         annotations|               audio|                  environment|                info|             license|
+--------------------+--------------------+-----------------------------+--------------------+--------------------+
|[{{7.55, 3.85}, {...|{1411kbps, 9.548,...|{갤럭시탭S6, 자연적, 제작,...|{IMR, 2021-09-04,...|{CC 0, https://ww...|
|[{{8.06, 2.45}, {...|{1411kbps, 10.064...|  {갤럭시S6Tab, 자연적, 제...|{IMR, 2021-09-04,...|{CC 0, https://ww...|
|[{{8.59, 4.02}, {...|{1411kbps, 11.066...|  {갤럭시S6Tab, 자연적, 제...|{IMR, 2021-09-04,...|{CC 0, https://ww...|
|[{{16.425, 2.0}, ...|{1411kbps, 18.425...|    {갤럭시TabS6Lite, 자연...|{IMR, 2021-09-04,...|{CC 0, https://ww...|
|[{{23.86, 7.44}, ...|{1411kbps, 28.611...|{갤럭시탭S6, 자연적, 제작,...|{IMR, 2021-09-04,...|{CC 0, https://ww...|
+--------------------+--------------------+-----------------------------+--------------------+---------------

### 2. JSON 데이터 구조 변환 (Flattening)

In [5]:
from pyspark.sql.functions import col

def flatten_json(df):
    """
    중첩된 JSON 구조를 평탄화하는 함수
    """
    # annotations 배열 내 요소들을 별도 컬럼으로 분리
    df = df.select(
        col("annotations.area.start").alias("area_start"),
        col("annotations.area.end").alias("area_end"),
        col("annotations.categories.category_01").alias("category_01"),
        col("annotations.categories.category_02").alias("category_02"),
        col("annotations.categories.category_03").alias("category_03"),
        col("annotations.decibel").alias("decibel"),
        col("annotations.labelName").alias("labelName"),
        col("annotations.soundQuality").alias("soundQuality"),
        col("annotations.subCategory").alias("subCategory"),
        col("audio.bitRate").alias("bitRate"),
        col("audio.duration").alias("duration"),
        col("audio.fileFormat").alias("fileFormat"),
        col("audio.fileName").alias("fileName"),
        col("audio.fileSize").alias("fileSize"),
        col("audio.recodingType").alias("recodingType"),
        col("audio.sampleRate").alias("sampleRate"),
        col("environment.acqDevice").alias("acqDevice"),
        col("environment.acqMethod").alias("acqMethod"),
        col("environment.acqType").alias("acqType"),
        col("environment.areaUse").alias("areaUse"),
        col("environment.dayNight").alias("dayNight"),
        col("environment.direction").alias("direction"),
        col("environment.distance").alias("distance"),
        col("environment.district").alias("district"),
        col("environment.gps.latitude").alias("latitude"),
        col("environment.gps.longitude").alias("longitude"),
        col("environment.micClass").alias("micClass"),
        col("environment.obstacle").alias("obstacle"),
        col("environment.place").alias("place"),
        col("environment.recordingTime").alias("recordingTime"),
        col("environment.urban").alias("urban"),
        col("environment.weather").alias("weather"),
        col("info.contributor").alias("contributor"),
        col("info.dateCreated").alias("dateCreated"),
        col("info.description").alias("description"),
        col("info.uri").alias("uri"),
        col("info.version").alias("version"),
        col("info.year").alias("year"),
        col("license.name").alias("license_name"),
        col("license.url").alias("license_url"),
    )
    return df

# Flatten 적용
df_flattened = flatten_json(df)
df_flattened.show(5, truncate=False)

25/02/17 12:59:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+--------+-----------+------------+--------------+-------+-------------------------+------------+--------------+--------+--------+----------+---------------------+--------+------------+----------+---------------+---------+-------+------------+--------+---------+--------+----------+--------+---------+--------+--------+------+-------------+----------+-------+-----------+-----------+----------------+-----------------------+-------+----+------------+-----------------------+
|area_start|area_end|category_01|category_02 |category_03   |decibel|labelName                |soundQuality|subCategory   |bitRate |duration|fileFormat|fileName             |fileSize|recodingType|sampleRate|acqDevice      |acqMethod|acqType|areaUse     |dayNight|direction|distance|district  |latitude|longitude|micClass|obstacle|place |recordingTime|urban     |weather|contributor|dateCreated|description     |uri                    |version|year|license_name|license_url            |
+----------+--------+-------

In [31]:
import pandas as pd

# 전체 열, 행 표시 설정
# pd.set_option("display.max_columns", None)  # 모든 컬럼 출력
# pd.set_option("display.max_rows", None)  # 모든 행 출력
# pd.set_option("display.width", 200)  # 출력 너비 조정

# DataFrame을 Pandas로 변환하여 출력
display(df_flattened.toPandas().head(5))

                                                                                

Unnamed: 0,area_start,area_end,category_01,category_02,category_03,decibel,labelName,soundQuality,subCategory,bitRate,...,urban,weather,contributor,dateCreated,description,uri,version,year,license_name,license_url
0,[3.85],[7.55],[교통소음],[이륜자동차],[이륜차주행음],[71],[2.이륜자동차_1867_1.wav],[노이즈],[이륜차주행음],1411kbps,...,서울특별시,맑음,IMR,2021-09-04,도시 소리 데이터,https://www.aihub.or.kr,1.0,2021,CC 0,https://www.aihub.or.kr
1,[2.45],[8.06],[교통소음],[이륜자동차],[이륜차주행음],[120],[2.이륜자동차_2117_1.wav],[정상],[이륜차주행음],1411kbps,...,서울특별시,맑음,IMR,2021-09-04,도시 소리 데이터,https://www.aihub.or.kr,1.0,2021,CC 0,https://www.aihub.or.kr
2,[4.02],[8.59],[교통소음],[이륜자동차],[이륜차주행음],[125],[2.이륜자동차_1964_1.wav],[정상],[이륜차주행음],1411kbps,...,서울특별시,맑음,IMR,2021-09-04,도시 소리 데이터,https://www.aihub.or.kr,1.0,2021,CC 0,https://www.aihub.or.kr
3,[2.0],[16.425],[교통소음],[이륜자동차],[이륜차주행음],[108],[2.이륜자동차_2263_1.wav],[노이즈],[이륜차주행음],1411kbps,...,광주광역시,흐림,IMR,2021-09-04,도시 소리 데이터,https://www.aihub.or.kr,1.0,2021,CC 0,https://www.aihub.or.kr
4,[7.44],[23.86],[교통소음],[이륜자동차],[이륜차주행음],[92],[2.이륜자동차_2392_1.wav],[정상],[이륜차주행음],1411kbps,...,서울특별시,맑음,IMR,2021-09-04,도시 소리 데이터,https://www.aihub.or.kr,1.0,2021,CC 0,https://www.aihub.or.kr


### 3. 병렬 처리 및 성능 최적화

In [32]:
df_flattened = df_flattened.cache()  # 캐싱하여 속도 향상
df_flattened = df_flattened.repartition(4)  # 4개의 파티션으로 병렬 처리

### 4. CSV 또는 Parquet으로 저장

In [33]:
# CSV로 저장
#df_flattened.write.mode("overwrite").option("header", "true").csv("hdfs://localhost:9000/output/csv_data")

# Parquet으로 저장 (속도 빠름)
#df_flattened.write.mode("overwrite").parquet("hdfs://localhost:9000/output/parquet_data")

## 3. 데이터 전처리 - WAV

In [20]:
pip install hdfs

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [12]:
import librosa
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


# HDFS 내 모든 .wav 파일 찾기
def find_all_wav_files(hdfs_dir):
    hdfs_files = []
    result = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jsc.hadoopConfiguration()
    ).listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dir))

    for file_status in result:
        file_path = file_status.getPath().toString()
        if file_status.isDirectory():  # 디렉터리라면 재귀 탐색
            hdfs_files.extend(find_all_wav_files(file_path))
        elif file_path.endswith(".wav"):  # .wav 파일이면 추가
            hdfs_files.append(file_path)

    return hdfs_files

# WAV 파일에서 MFCC 특징 추출
def extract_mfcc_from_wav(wav_path):
    try:
        y, sr = librosa.load(wav_path, sr=None)
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
        mfccs_mean = mfccs.mean(axis=1)  # 평균값을 사용하여 특징 벡터 생성
        return mfccs_mean.tolist()
    except Exception as e:
        print(f"파일 {wav_path} 처리 중 오류 발생: {e}")
        return None

# /shared_data/ 내 모든 .wav 파일 찾기
wav_files = find_all_wav_files("hdfs://localhost:9000/shared_data/")

# 모든 .wav 파일에 대해 MFCC 추출
mfcc_list = []
for wav_file in wav_files:
    mfcc_features = extract_mfcc_from_wav(wav_file)
    if mfcc_features is not None:
        filename = os.path.basename(wav_file)
        mfcc_list.append([filename, wav_file] + mfcc_features)

# Pandas DataFrame으로 변환
columns = ['filename', 'filepath'] + [f'mfcc_{i+1}' for i in range(13)]
df = pd.DataFrame(mfcc_list, columns=columns)

# PySpark DataFrame 변환
spark_df = spark.createDataFrame(df)

# 결과 출력
spark_df.show()

# 결과를 HDFS에 저장 (CSV 파일)
output_path = "hdfs://localhost:9000/shared_data/mfcc_features/"
spark_df.write.csv(output_path, header=True, mode="overwrite")

print(f"MFCC 데이터가 HDFS에 저장됨: {output_path}")



  y, sr = librosa.load(wav_path, sr=None)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


파일 hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10000_1.wav 처리 중 오류 발생: [Errno 2] No such file or directory: 'hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10000_1.wav'
파일 hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10001_1.wav 처리 중 오류 발생: [Errno 2] No such file or directory: 'hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10001_1.wav'
파일 hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10002_1.wav 처리 중 오류 발생: [Errno 2] No such file or directory: 'hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10002_1.wav'
파일 hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10004_1.wav 처리 중 오류 발생: [Errno 2] No such file or directory: 'hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10004_1.wav'
파일 hdfs://localhost:9000/shared_data/raw_data/1.Car/1.horn_of_car/1.car_horn_10005_1.wav 처리 중 오류

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)

