In [23]:
from pyspark.sql import SparkSession

In [24]:
def convert_to_cp932(value):
    if value is None:
        return None
    try:
        return value.encode('utf-8').decode('cp932')
    except UnicodeDecodeError:
        return value.encode('utf-8', 'replace').decode('cp932', 'replace')

In [49]:
# Sparkセッションの作成
spark = SparkSession.builder \
    .appName("CSV to Parquet") \
    .getOrCreate()

# CSVファイルの読み込み
input_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("input_utf8.csv")

# DataFrameをParquet形式で保存
input_df.write \
    .mode("overwrite") \
    .parquet("output_parquet")

In [50]:
input_df.show()

+--------+---+------+
|    name|age| score|
+--------+---+------+
|   Alice| 25| 100+5|
|山本太郎| 30|  85-5|
|   Carol| 28| 90+10|
|    Dave| 35| 80-10|
|     Eve| 22| 95+15|
|   Frank| 27| 70-15|
|阿南是清| 29|105+20|
|   Heidi| 32| 55-20|
|    Ivan| 26|110+25|
|    Judy| 31| 40-25|
+--------+---+------+



In [27]:
# DataFrameをParquet形式で保存
input_df.write \
    .mode("overwrite") \
    .parquet("output_parquet")

In [40]:
df_utf8 = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .parquet("output_parquet")

In [41]:
df_utf8.show()

+--------+---+------+
|    name|age| score|
+--------+---+------+
|   Alice| 25| 100+5|
|山本太郎| 30|  85-5|
|   Carol| 28| 90+10|
|    Dave| 35| 80-10|
|     Eve| 22| 95+15|
|   Frank| 27| 70-15|
|   Grace| 29|105+20|
|   Heidi| 32| 55-20|
|    Ivan| 26|110+25|
|    Judy| 31| 40-25|
+--------+---+------+



In [43]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType# 文字列をCP932に変換するUDFを定義
@udf(StringType())
def convert_to_cp932(value):
    if value is None:
        return None
    try:
        return value.encode('utf-8').decode('cp932')
    except UnicodeDecodeError:
        return value.encode('utf-8', 'replace').decode('cp932', 'replace')

In [36]:
import shutil
import os

# 一時的なディレクトリにDataFrameをCP932エンコーディングのCSV形式で保存
temp_output_dir = "temp_output_cp932"
df_utf8.write \
    .option("header", "true") \
    .option("encoding", "CP932") \
    .mode("overwrite") \
    .csv(temp_output_dir)

In [44]:
df_cp932 = df_utf8
for column, dtype in df_utf8.dtypes:
    if dtype == 'string':
        df_cp932 = df_cp932.withColumn(column, convert_to_cp932(col(column)))

In [45]:
df_cp932.write \
    .option("header", "true") \
    .option("encoding", "CP932") \
    .mode("overwrite") \
    .csv("temp_output_dir")

                                                                                

In [46]:
# 一時的なディレクトリから最初のパートファイルを取得
parquet_files = [file for file in os.listdir(temp_output_dir) if file.startswith("part-")]

In [47]:
parquet_files

['part-00000-61af7d05-b16a-497b-a0bf-7dc9ecb744af-c000.csv']

In [48]:
# 最初のパートファイルをリネームしてoutput_cp932.csvに保存
shutil.move(os.path.join(temp_output_dir, parquet_files[0]), "output_cp932.csv")

'output_cp932.csv/part-00000-61af7d05-b16a-497b-a0bf-7dc9ecb744af-c000.csv'