In [None]:
import os
from os.path import join
from dotenv import load_dotenv

import pyspark as ps
from pyspark import StorageLevel
from pyspark.sql import SparkSession, DataFrame, types
from pyspark.sql.functions import col

In [None]:
ps_conf = ps.SparkConf()\
            .set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")\
            .set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")\
            .set("spark.sql.shuffle.partitions", 100)\
            .set("spark.sql.dynamicPartitionPruning.enabled", True)
            # '_started'と'_committed_'で始まるファイルを書き込まないように設定
            # '_SUCCESS'で始まるファイルを書き込まないように設定
            # パーティション数を調整する
            # 動的パーティションプルーニングの有効化
spark = SparkSession.builder.config(conf=ps_conf).getOrCreate()

In [None]:
load_dotenv(join(os.getcwd(), '.env'))
BASE_PATH     = os.environ.get("BASE_PATH")
WORK_PATH     = BASE_PATH + os.environ.get("WORK_PATH")
PROJECT_NAME  = os.environ.get("PROJECT_NAME")
INSTRUCT_PATH = WORK_PATH + PROJECT_NAME

In [None]:
def calc_migration(df_devided:DataFrame) -> DataFrame:
    # df_deviedに期待する構成
    #  | 'ORIGIN' | 'DESTINATION' | '移動影響量'
    # 0, 30943,     30943,          0.0
    # 1, 30943,     30984,          0.1013
    # 2, 30943,     30985,          0.1043
    # 3, 30943,     31177,          0.0
    # ・
    # ・
    # ・
    
    # 出力用データフレームの構成
    spark      = SparkSession.getActiveSession()
    df_schema  = types.StructType([
        types.StructField('ORIGIN',         types.StringType(),    False),
        types.StructField('VIA_1',          types.StringType(),    False),
        types.StructField('DESTINATION',    types.StringType(),    False),
        types.StructField('移動影響量_VIA_1', types.FloatType(),     False),
    ])
    
    # 経由地1のunitリスト
    v_unit_list  = sorted(df_devided.select('DESTINATION').drop_duplicates().rdd.flatMap(lambda x: x).collect())
    
    # 空のデータフレームの作成
    df_migrate = spark.createDataFrame([], df_schema)
    for v_unit in v_unit_list:
        # 1) 2階層回遊におけるDESTINATIONのunitを抽出
        df_v1_to_d = df_devided.filter(col('DESTINATION') == v_unit)\
                                .withColumnRenamed('移動影響量', '移動影響量_VIA_0')\
                                .select(['ORIGIN', '移動影響量_VIA_0'])
        
        # 2) 1)のunitを出発地とするirfを抽出
        df_v1_to_o = df_devided.filter(col('ORIGIN')      == v_unit)\
                                .withColumnsRenamed({'ORIGIN':'VIA_1', '移動影響量':'移動影響量_VIA_1'})\
                                .select(['VIA_1', 'DESTINATION', '移動影響量_VIA_1'])
        
        # 3) 交差結合する
        df_v1 = df_v1_to_d.crossJoin(df_v1_to_o)
        
        # 4) 出発地・経由地1のirf と 経由地1・目的地のirf を掛ける
        df_v1 = df_v1.withColumn('移動影響量_VIA_1', col('移動影響量_VIA_0') * col('移動影響量_VIA_1'))
        
        # 5) 列を整える
        df_v1 = df_v1.select(['ORIGIN', 'VIA_1', 'DESTINATION', '移動影響量_VIA_1'])
        
        # 6) 縦結合する
        df_migrate = df_migrate.union(df_v1)

    # データの整形を行う
    df_migrate = df_migrate\
                    .orderBy(col('ORIGIN').asc(), col('VIA_1').asc(), col('DESTINATION').asc())
    
    # df_migrateに期待する構成
    #  | 'ORIGIN' | 'VIA_1' | 'DESTINATION' | '移動影響量_VIA_1'
    # 0, 30943,     30984,    30943,          0.0
    # 1, 30943,     30984,    30984,          0.01087849
    # 2, 30943,     30984,    30985,          0.01087849
    # 3, 30943,     30984,    31177,          0.0
    # ・
    # ・
    # ・
    
    return df_migrate

In [None]:
unit_terminal = 50
df_schema = types.StructType([
        types.StructField('ORIGIN',      types.StringType(), False),
        types.StructField('DESTINATION', types.StringType(), False),
        types.StructField('移動影響量',    types.FloatType(),  False),
    ])
df_test_data = spark.read\
                .option('inferSchema', 'True')\
                .option('header', 'True')\
                .csv(INSTRUCT_PATH + f'csv_data/test_terminal{unit_terminal}.csv', schema=df_schema)

In [None]:
df_migrate = calc_migration(df_test_data)
df_migrate.persist(StorageLevel.MEMORY_ONLY)
df_migrate.count()

df_migrate\
    .toPandas()\
    .to_csv(INSTRUCT_PATH + 'csv_data/test_migrate.csv', index=False, header=True)