In [None]:
!pip install selenium
!apt-get update  # to update ubuntu to correctly run apt install
!apt install chromium-chromedriver
!cp / usr/lib/chromium-browser/chromedriver / usr/bin

!pip install pyspark


Web Crawler


In [None]:
import time
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
import sys
sys.path.insert(0, '/usr/lib/chromium-browser/chromedriver')


chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_prefs = {"download.default_directory": './content/drive'}
chrome_options.experimental_options["prefs"] = chrome_prefs
driver = webdriver.Chrome('chromedriver', options=chrome_options)
url = "https://plvr.land.moi.gov.tw/DownloadOpenData"


xpath_list = [
    "//select[@id='historySeason_id']/option[@value='108S2']",
    "//select[@id='fileFormatId']/option[@value='csv']",  # csv
    "//input[@id='downloadTypeId2']",  # 進階
    "//input[@value='A_lvr_land_A']",  # 台北
    "//input[@value='F_lvr_land_A']",  # 新北
    "//input[@value='H_lvr_land_A']",  # 桃園
    "//input[@value='B_lvr_land_A']",  # 台中
    "//input[@value='E_lvr_land_A']",  # 高雄
    "//input[@id='downloadBtnId']"  # 下載
]


def clawer():
    driver.get(url)
    WebDriverWait(driver, 20).until(
        EC.element_to_be_clickable((By.ID, 'ui-id-2'))).click()
    for xpath in xpath_list:
        WebDriverWait(driver, 20).until(
            EC.element_to_be_clickable((By.XPATH, xpath))).click()
    print("Operation successful !")
    time.sleep(60)


try:
    clawer()
except Exception:
    driver.quit()


Data clean and transform


In [30]:
from pyspark.sql.functions import desc, collect_list, struct, col, to_json, when, regexp_replace, translate, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession, DataFrame
from functools import reduce
import zipfile
import io

spark = SparkSession.builder\
    .master("local[1]")\
    .appName("SparkReadCSVExample")\
    .getOrCreate()

zip_path = './content/drive/download.zip'
unzip_path = './content/files'

zip_ref = zipfile.ZipFile(zip_path, 'r')
zip_ref.extractall(unzip_path)
zip_ref.close()

file_dict = {
    spark.read.options(header='True', inferSchema='True')
    .csv(unzip_path+'/A_lvr_land_A.csv'): '臺北市',
    spark.read.options(header='True', inferSchema='True')
    .csv(unzip_path+'/F_lvr_land_A.csv'): '新北市',
    spark.read.options(header='True', inferSchema='True')
    .csv(unzip_path+'/H_lvr_land_A.csv'): '桃園市',
    spark.read.options(header='True', inferSchema='True')
    .csv(unzip_path+'/B_lvr_land_A.csv'): '台中市',
    spark.read.options(header='True', inferSchema='True')
    .csv(unzip_path+'/E_lvr_land_A.csv'): '高雄市'
}

# add city column
df_list = []
for item in file_dict:
    df_list.append(
        item.withColumn('city',
             when((item.土地位置建物門牌[0:3] == file_dict[item]), item.土地位置建物門牌[0:3])
            .otherwise(file_dict[item]))
    )

# marge dataframe by spark
spark_df = reduce(DataFrame.unionAll, df_list)

# remove english row
spark_df = spark_df.where('`鄉鎮市區` not like "The villages%"')

spark_df = spark_df.withColumn('floor_num',
                    when(spark_df.總樓層數.endswith('層'),
                    regexp_replace(spark_df.總樓層數, '層', ''))
                   .otherwise('')
           )


In [31]:
def covert_num(floor_name):
    result = 0
    if isinstance(floor_name, int):
        return floor_name

    if isinstance(floor_name, float):
        result = int(floor_name)
        return result

    # List of words
    num_list = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5,
                "六": 6, "七": 7, "八": 8, "九": 9, "十": 10}

    for i in range(0, len(floor_name)):
        for k in num_list:
            if len(floor_name) == 1:
                if (k == floor_name[i]):
                    result = num_list[k]
            elif len(floor_name) == 2:
                if (k == floor_name[0]):
                    result = num_list[k] + num_list[floor_name[1]]
                else:
                    result = num_list[floor_name[0]] * 10
            elif len(floor_name) == 3:
                if (k == floor_name[1]):
                    result = num_list[floor_name[0]] * \
                        10 + num_list[floor_name[2]]
                else:
                    result = 0
        return result

def convert_western_date(date):
    if len(date) > 0:
        date = date.replace( 
            date[0:3], str(int(date[0:3])+1911))
        date = date[0:4] + "-" + date[4:6] + "-" + date[6:8]
    return date

num_udf = udf(covert_num, IntegerType())
western_date_udf = udf(convert_western_date, StringType())

spark_df = spark_df.withColumn("floor_num", num_udf(col("floor_num")))
spark_df = spark_df.withColumn("交易年月日", western_date_udf(col("交易年月日")))

In [32]:
filiter_df = spark_df.where('`主要用途` == "住家用"')\
                     .where('`建物型態` like "住宅大樓%"')\
                     .where('floor_num >= 13')

result_struct = struct(col('鄉鎮市區').alias('district'),
                       col('建物型態').alias('building_state'))

result = filiter_df\
    .groupBy(['city', '交易年月日'])\
    .agg(collect_list(result_struct).alias('events'))\
    .sort(desc('交易年月日'))\
    .groupBy(['city'])\
    .agg(collect_list(struct(col('交易年月日').alias('date'), col('events'))).alias('time_slots'))\
    .toJSON()\
    .collect()

with io.open('result-part1.json', 'w', encoding='utf-8') as f:
    for item in result[:2]:
        f.write(item + "\n")

with io.open('result-part2.json', 'w', encoding='utf-8') as f:
    for item in result[2:]:
        f.write(item + "\n")


dataFrame save to SQLite3 for creating RESTful API


In [None]:
import sqlite3
import pandas as pd

# create db
conn = sqlite3.connect('land.db')
cursor = conn.cursor()
conn.commit()

df = spark_df.toPandas()
# replace: Drop the table before inserting new values.
df.to_sql('land_txn_log', conn, if_exists='replace', index=False)
us_df = pd.read_sql("SELECT count(*) FROM land_txn_log;", conn)
print(us_df)
