In [11]:
import pandas as pd
from pandas.testing import assert_series_equal

questions: pd.Series = pd.read_csv('./config/questions.csv', index_col=0).squeeze()

# Setup choices_df
# choices_df contains multiple candidates of choice indexed by column name
choices_df = pd.read_csv('./config/choices.csv')
col_names = questions.index.to_series()

assert_series_equal(choices_df['設問文章'], questions[questions != "タイムスタンプ"], check_names=False, check_index=False)
choices_df.index = col_names.drop(index=questions[questions == "タイムスタンプ"].index).values
choices_df['選択肢'] = choices_df['選択肢'].str.split(',')
choices_df['選択肢'].where(choices_df['選択肢'].notna(), None, inplace=True)

# NOTE: list() will be loaded as np.array()
df = pd.read_feather('out/after_coded.feather')
df.set_index('回答番号', inplace=True)
for col in choices_df[choices_df['複数回答'] == 1].index:
    df[col] = df[col].map(lambda x: x.tolist())

label_q_ids = ['Y', 'AN', 'AS']

exploded_choices_df: pd.Series = choices_df.loc[label_q_ids, '選択肢'].explode()
sorted_choices_df = exploded_choices_df.sort_values(key=lambda series: series.map({ 'アロマンティック': 1, 'アセクシュアル':1 }))
updated_exploded_choices_df = sorted_choices_df.groupby(level=0).agg(list)
choices_df.update(updated_exploded_choices_df)

location_source = {
    '北海道': ['北海道'],
    '東北': ['青森県', '岩手県', '宮城県', '秋田県', '山形県', '福島県'],
    '南関東 (東京都を除く)': ['埼玉県', '千葉県', '神奈川県'],
    '東京都': ['東京都'],
    '北関東・甲信': ['茨城県', '栃木県', '群馬県', '山梨県', '長野県'],
    '北陸': ['新潟県', '富山県', '石川県', '福井県'],
    '東海': ['岐阜県', '静岡県', '愛知県', '三重県'],
    '近畿': ['滋賀県', '京都府', '大阪府', '兵庫県', '奈良県', '和歌山県'],
    '中国': ['鳥取県', '島根県', '岡山県', '広島県', '山口県'],
    '四国': ['徳島県', '香川県', '愛媛県', '高知県'],
    '九州・沖縄': ['福岡県', '佐賀県', '長崎県', '熊本県', '大分県', '宮崎県', '鹿児島県', '沖縄県'],
}

location_map = {}

for cat, values in location_source.items():
    for value in values:
        location_map[value] = cat


if (df['D'] <= 12).sum():
    raise ValueError('Column D has one or more values less than 13')

ages_bin_map = {
    'D': [
        (18, '13~18歳'),
        (22, '19~22歳'),
        (25, '23~25歳'),
        (29, '26~29歳'),
        (34, '30~34歳'),
        (39, '35~39歳'),
        (44, '40~44歳'),
        (10000, '45歳以上'),
    ],
    'AT': [
        (0, '0 (違いを感じたことがない)'),
        (12, '12歳以下'),
        (15, '13~15歳'),
        (18, '16~18歳'),
        (22, '19~22歳'),
        (25, '23~25歳'),
        (29, '26~29歳'),
        (34, '30~34歳'),
        (10000, '35歳以上'),
    ],
    'AX': [
        (0, '0 (本調査で知った)'),
        (15, '15歳以下'),
        (18, '16~18歳'),
        (22, '19~22歳'),
        (25, '23~25歳'),
        (29, '26~29歳'),
        (34, '30~34歳'),
        (10000, '35歳以上'),
    ],
    'AY': [
        (0, '0 (本調査で知った)'),
        (15, '15歳以下'),
        (18, '16~18歳'),
        (22, '19~22歳'),
        (25, '23~25歳'),
        (29, '26~29歳'),
        (34, '30~34歳'),
        (39, '35歳〜39歳'),
        (10000, '40歳以上'),
    ],
    'CY': [
        (15, '15歳以下'),
        (18, '16~18歳'),
        (22, '19~22歳'),
        (25, '23~25歳'),
        (29, '26~29歳'),
        (10000, '30歳以上'),
    ],
}
def cut_ages_bin(ages):
    return pd.cut(ages, [-1] + [x[0] for x in ages_bin_map[column]], labels=[x[1] for x in ages_bin_map[column]], right=True)


# Apply transformations
column: str

for column in questions[questions.str.contains(r'年齢|何歳', regex=True)].index.values:
    df[column] = cut_ages_bin(df[column])
    choices_df.at[column, '選択肢'] = [x[1] for x in ages_bin_map[column]] 
df['E'] = df['E'].replace(location_map)
choices_df.at['E', '選択肢'] = location_source.keys()

for column in questions[questions.str.contains('強さを１～５で表現すると', regex=False)].index.values:
    num_orig_na = df[column].isna().sum()
    df[column] = df[column].map({ 1: '1 (弱い)', 2: '2', 3: '3', 4: '4', 5: '5 (強い)' })
    assert df[column].isna().sum() == num_orig_na


In [12]:
# Calculate n for excluded answers

b_name ='【必須項目】上記説明を全て読み、理解した上で本調査への協力をご了承いただける場合は、「了承して、回答する」を選択してください。'
c_name = '【必須項目】自分のことをアロマンティック／アセクシュアル・スペクトラム(Aro/Ace)に当てはまると思いますか。'

source_df = pd.read_csv('./data/source.csv', na_values=[''], usecols=[b_name, c_name])

source_df.index += 1
source_df.index.name = '回答番号'

ac_drop_df = pd.read_csv('./data/after_codings/drops.csv')
ac_drop_df = ac_drop_df[ac_drop_df['理由'] != '回答拒否のため']

source_df.drop(pd.Index(ac_drop_df['回答番号']), inplace=True)

source_df.loc[source_df[b_name] == 'いいえ', c_name] = pd.NA

In [13]:
# Generate tabulation.json
from typing import TypedDict
import json

other_groups_map = {
    'CO': ['その他', 'とくになかった'],
    'CQ': ['その他', 'とくにない'],
}

column: str
q_text: str
num = 0

class Tabulation(TypedDict):
    title: str
    n: int
    table: list[list[str]]

tabulations: list[Tabulation] = []

for column, title in questions.drop(['A']).items():
    if column == 'B':
        values = source_df[b_name].dropna()
    elif column == 'C':
        values = source_df[c_name].dropna()
    else:
        values = df[column].dropna()
    n = len(values)

    tabulation: Tabulation = {}
    tabulation['title'] = title
    tabulation['n'] = n

    if choices_df.loc[column, 'テキスト回答'] == 1:
        tabulation['table'] = [['(自由記述)']]
        tabulations.append(tabulation)
        continue

    if choices_df.loc[column, '複数回答'] == 1:
        values = values[df[column].astype(bool)]  # drop empty list

    # 度数分布表
    data = values.explode().value_counts(sort=False).rename('割合').to_frame()
    data['割合'] = (data['割合'] * 100 / n).round(1).astype(str) + '%'

    # reorder table
    choices_list = choices_df.at[column, '選択肢']
    if choices_list is None:
        data.sort_index(inplace=True)
    else:
        choices = pd.Index(choices_list).union(data.index, sort=False)

        other_groups = other_groups_map.get(column, [])
        index_order = choices.drop(other_groups, errors='ignore').append(pd.Index(other_groups))

        data = data.reindex(index_order, fill_value=0)
    data.index.name = '選択肢'
    data.reset_index(inplace=True)

    tabulation['table'] = data.values.tolist()

    tabulations.append(tabulation)

with open('out/tabulation.json', 'w') as f:
    json.dump(tabulations, f)
