# CSVファイルの取り込み

## 認証情報をSpark コンフィグレーションへセット

In [None]:
from pyspark.sql import SparkSession

# Azure storage access info
abfs_account_name = '_storage_account_' # replace with your storage account name
abfs_container_name = 'dl2' # replace with your container name
linked_service_name = '_linked_service_name_' # replace with your linked service name

abfs_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from DataLake Gen2  remotely

dl2Uri = f"abfss://{abfs_container_name}@{abfs_account_name}.dfs.core.windows.net"

print('Remote Data Lake Gen2 path: ' + dl2Uri)

## CSV ファイルの読み込み

In [None]:
shiresaki = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/仕入先/")
zyuchu = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/受注/")
zyuchumeisai = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/受注明細/")
shohin = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/商品/")
shohinkubun = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/商品区分/")
tokuisaki = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/得意先/")
shain = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/社員/")
unso = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/運送会社/")
todofuken = spark.read.format("csv").option("header", "false").option("mode", "DROPMALFORMED").load(f"{dl2Uri}/sample/都道府県/")

## カラム名の設定

In [None]:
shiresaki = shiresaki.withColumnRenamed("_c0", "仕入先コード") \
                     .withColumnRenamed("_c1", "フリガナ") \
                     .withColumnRenamed("_c2", "仕入先名") \
                     .withColumnRenamed("_c3", "担当者名") \
                     .withColumnRenamed("_c4", "部署") \
                     .withColumnRenamed("_c5", "郵便番号") \
                     .withColumnRenamed("_c6", "トドウフケン") \
                     .withColumnRenamed("_c7", "都道府県") \
                     .withColumnRenamed("_c8", "住所1") \
                     .withColumnRenamed("_c9", "住所2") \
                     .withColumnRenamed("_c10", "電話番号") \
                     .withColumnRenamed("_c11", "ファクシミリ") \
                     .withColumnRenamed("_c12", "ホームページ")

zyuchu = zyuchu.withColumnRenamed("_c0", "受注コード") \
               .withColumnRenamed("_c1", "得意先コード") \
               .withColumnRenamed("_c2", "社員コード") \
               .withColumnRenamed("_c3", "出荷先名") \
               .withColumnRenamed("_c4", "出荷先郵便番号") \
               .withColumnRenamed("_c5", "出荷先都道府県") \
               .withColumnRenamed("_c6", "出荷先住所1") \
               .withColumnRenamed("_c7", "出荷先住所2") \
               .withColumnRenamed("_c8", "運送区分") \
               .withColumnRenamed("_c9", "受注日") \
               .withColumnRenamed("_c10", "締切日") \
               .withColumnRenamed("_c11", "出荷日") \
               .withColumnRenamed("_c12", "運送料")


zyuchumeisai = zyuchumeisai.withColumnRenamed("_c0", "受注コード") \
                           .withColumnRenamed("_c1", "商品コード") \
                           .withColumnRenamed("_c2", "単価") \
                           .withColumnRenamed("_c3", "数量") \
                           .withColumnRenamed("_c4", "割引")


shohin = shohin.withColumnRenamed("_c0", "商品コード") \
               .withColumnRenamed("_c1", "フリガナ") \
               .withColumnRenamed("_c2", "商品名") \
               .withColumnRenamed("_c3", "仕入先コード") \
               .withColumnRenamed("_c4", "区分コード") \
               .withColumnRenamed("_c5", "梱包単位") \
               .withColumnRenamed("_c6", "単価") \
               .withColumnRenamed("_c7", "在庫") \
               .withColumnRenamed("_c8", "発注済") \
               .withColumnRenamed("_c9", "発注点") \
               .withColumnRenamed("_c10", "生産中止")


shohinkubun = shohinkubun.withColumnRenamed("_c0", "区分コード") \
                         .withColumnRenamed("_c1", "区分名") \
                         .withColumnRenamed("_c2", "商品名") \
                         .withColumnRenamed("_c3", "説明") \
                         .withColumnRenamed("_c4", "図")


tokuisaki = tokuisaki.withColumnRenamed("_c0", "得意先コード") \
               .withColumnRenamed("_c1", "フリガナ") \
               .withColumnRenamed("_c2", "得意先名") \
               .withColumnRenamed("_c3", "担当者名") \
               .withColumnRenamed("_c4", "部署") \
               .withColumnRenamed("_c5", "郵便番号") \
               .withColumnRenamed("_c6", "トドウフケン") \
               .withColumnRenamed("_c7", "都道府県") \
               .withColumnRenamed("_c8", "住所1") \
               .withColumnRenamed("_c9", "住所2") \
               .withColumnRenamed("_c10", "電話番号") \
               .withColumnRenamed("_c11", "ファクシミリ")


shain = shain.withColumnRenamed("_c0", "社員コード") \
             .withColumnRenamed("_c1", "フリガナ") \
             .withColumnRenamed("_c2", "氏名") \
             .withColumnRenamed("_c3", "在籍支社") \
             .withColumnRenamed("_c4", "部署名") \
             .withColumnRenamed("_c5", "誕生日") \
             .withColumnRenamed("_c6", "入社日") \
             .withColumnRenamed("_c7", "自宅郵便番号") \
             .withColumnRenamed("_c8", "自宅都道府県") \
             .withColumnRenamed("_c9", "自宅住所1") \
             .withColumnRenamed("_c10", "自宅住所2") \
             .withColumnRenamed("_c11", "自宅電話番号") \
             .withColumnRenamed("_c11", "内線") \
             .withColumnRenamed("_c11", "写真") \
             .withColumnRenamed("_c12", "プロフィール1") \
             .withColumnRenamed("_c13", "プロフィール2") \
             .withColumnRenamed("_c14", "プロフィール3")


unso = unso.withColumnRenamed("_c0", "運送コード") \
           .withColumnRenamed("_c1", "運送会社") \
           .withColumnRenamed("_c2", "電話番号")


todofuken = todofuken.withColumnRenamed("_c0", "トドウフケン") \
           .withColumnRenamed("_c1", "都道府県") \
           .withColumnRenamed("_c2", "ローマ字") \
           .withColumnRenamed("_c3", "地域名ローマ字") \
           .withColumnRenamed("_c4", "地域")

## Parquet ファイルのエクスポート

In [None]:
shiresaki.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/shiiresaki/")
zyuchu.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/zyuchu/")
zyuchumeisai.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/zyuchumeisai/")
shohin.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/shohin/")
shohinkubun.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/shohinkubun/")
tokuisaki.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/tokuisaki/")
shain.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/shain/")
unso.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/unso/")
todofuken.write.mode('overwrite').parquet(f"{dl2Uri}/parquet/todofuken/")

### ORC ファイルのエクスポート

In [None]:
shiresaki.write.mode('overwrite').orc(f"{dl2Uri}/orc/shiiresaki/")
zyuchu.write.mode('overwrite').orc(f"{dl2Uri}/orc/zyuchu/")
zyuchumeisai.write.mode('overwrite').orc(f"{dl2Uri}/orc/zyuchumeisai/")
shohin.write.mode('overwrite').orc(f"{dl2Uri}/orc/shohin/")
shohinkubun.write.mode('overwrite').orc(f"{dl2Uri}/orc/shohinkubun/")
tokuisaki.write.mode('overwrite').orc(f"{dl2Uri}/orc/tokuisaki/")
shain.write.mode('overwrite').orc(f"{dl2Uri}/orc/shain/")
unso.write.mode('overwrite').orc(f"{dl2Uri}/orc/unso/")
todofuken.write.mode('overwrite').orc(f"{dl2Uri}/orc/todofuken/")