In [68]:
import os
import sys
import glob
import json
from datetime import datetime
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, regexp_replace, translate
from pyspark.sql.types import IntegerType, FloatType, DoubleType, DateType, StructType, StructField, StringType

from pycnnum import num2cn, cn2num

spark = SparkSession.builder.appName("realEstatePriceSession").getOrCreate()

In [2]:
download_path = os.path.abspath("./downloads")
os.makedirs(download_path, exist_ok=True)
zip_file_path = os.path.join(download_path,'download.zip')
unzip_dir = os.path.join(download_path,'datas')

# {}_lvr_land_A
publish = '108年第2季'
file_format = 'csv'
names = ['臺北市','新北市','桃園市','臺中市','高雄市']
name_codes = ['A','F','H','B','E']
code_name_map = {code:name for code, name in zip(name_codes,names)}
category = '不動產資料'
category_map = {'不動產資料':'A','預售屋買賣':'B','不動產租賃':'C'}
name_set = {f'{code}_lvr_land_{category_map[category]}'.lower() for code in name_codes}
file_paths = [file for file in glob.glob(rf'{unzip_dir}/*.csv', recursive=True) if os.path.basename(file).split('.')[0].lower() in name_set]

In [37]:
# 4. Merge Spark DF
union_df = None
ext_col_name = '城市'
for file_path in file_paths:
    file_name = os.path.basename(file_path).split('.')[0]
    zone_code = file_name[0]
    zone_name = code_name_map[zone_code]
    df = spark.read.csv(file_path,header=True).cache()
    df = df.withColumn(ext_col_name, lit(zone_name))
    if union_df is None:
        union_df = df
    else:
        union_df = union_df.unionAll(df)

In [38]:
# Cast Date Type
union_df = union_df.withColumn("交易年月日", union_df["交易年月日"].cast(IntegerType()))
# Convert Floor String to Number
union_df = union_df.withColumn("總樓層數N", regexp_replace('總樓層數', '層', ''))
for i in range(1,100):
    c = num2cn(i)
    if i // 10 == 1:
        c = c[1:]
    union_df = union_df.withColumn("總樓層數N", regexp_replace('總樓層數N', rf"^{c}$", f'{i}'))
union_df = union_df.withColumn("總樓層數N", union_df["總樓層數N"].cast(IntegerType()))

In [57]:
# 4. Search Condition
n = 13
part4_df = union_df\
    .where(union_df['主要用途']=='住家用')\
    .where(union_df['建物型態'].contains('住宅大樓'))\
    .where(union_df['總樓層數N'] >= n)
# union_df.where(union_df['建物型態'].contains('住宅大樓')).count()
# union_df.where(union_df['總樓層數N'] >= n).count()
part4_df = part4_df.orderBy(part4_df['交易年月日'].desc())
part4_datas = part4_df.collect()

In [69]:
def date_num2str(date_number):
    y = date_number // 10000
    m = (date_number - y * 10000) // 100
    d = (date_number - y * 10000) % 100
    return datetime(year=y+1911,month=m,day=d).strftime('%Y-%m-%d')

def output_dataframe(df):
    df = df.orderBy(df['交易年月日'].desc())
    result = {}
    for row in df.collect():
        row_data = row.asDict()
        city = row_data.get('城市')
        date = date_num2str(row_data.get('交易年月日'))
        town = row_data.get('鄉鎮市區')
        btye = row_data.get('建物型態')
        if not city in result:
            result[city] = {date:[{'district':town,'building_state':btye}]}
        else:
            if not date in result[city]:
                result[city][date] = [{'district':town,'building_state':btye}]
            else:
                result[city][date].append({'district':town,'building_state':btye})
    result_output = []
    for city in result:
        time_slots = []
        for date in result[city]:
            events = result[city][date]
            time_slots.append({'date':date,'events':events})
        result_output.append({'city':city,'time_slots':time_slots})
    with open(os.path.join(os.path.abspath('.'),'result-part1.json'),'w',encoding='utf-8') as fp:
        json.dump(result_output, fp)

In [70]:
output_dataframe(part4_datas)

In [71]:
twnships = ''
building_type = '住宅大樓'
main_purpose = '住家用'
n_floors = 13
transaction_date = ''
city = '臺北市'

res = union_df
if twnships:
    res = res.where(union_df['鄉鎮市區']==twnships)
if building_type:
    res = res.where(union_df['建物型態'].contains(building_type))
if main_purpose:
    res = res.where(union_df['主要用途']==main_purpose)
if n_floors:
    res = res.where(union_df['總樓層數N']>=n_floors)
if transaction_date:
    res = res.where(union_df['交易年月日']==transaction_date)
if city:
    res = res.where(union_df['城市'] == city)
res = res.orderBy(union_df['交易年月日'].desc())
res

DataFrame[鄉鎮市區: string, 交易標的: string, 土地位置建物門牌: string, 土地移轉總面積平方公尺: string, 都市土地使用分區: string, 非都市土地使用分區: string, 非都市土地使用編定: string, 交易年月日: int, 交易筆棟數: string, 移轉層次: string, 總樓層數: string, 建物型態: string, 主要用途: string, 主要建材: string, 建築完成年月: string, 建物移轉總面積平方公尺: string, 建物現況格局-房: string, 建物現況格局-廳: string, 建物現況格局-衛: string, 建物現況格局-隔間: string, 有無管理組織: string, 總價元: string, 單價元平方公尺: string, 車位類別: string, 車位移轉總面積(平方公尺): string, 車位總價元: string, 備註: string, 編號: string, 主建物面積: string, 附屬建物面積: string, 陽台面積: string, 電梯: string, 移轉編號: string, 城市: string, 總樓層數N: int]