# Bank of Taiwan currency exchange rate

## Import Libraries

In [0]:
# create new Spark session
from pyspark.sql.functions import lit, col
from pyspark.sql.types import *
from pyspark.sql.functions import asc

# Requests, DateTime, JSON, and Pandas
import requests, datetime, json, os, io
import pandas as pd

## Get Spark Version

In [0]:
spark.version

Out[45]: '3.3.2'

## Download the recent currency exchange

In [0]:
currency_rate = {}
current_data_request = requests.get("https://rate.bot.com.tw/xrt/flcsv/0/day")
current_df = pd.read_csv(io.StringIO(current_data_request \
    .content.decode('utf-8')), index_col = False)

In [0]:
print(current_df.columns)

Index(['幣別', '匯率', '現金', '即期', '遠期10天', '遠期30天', '遠期60天', '遠期90天', '遠期120天',
       '遠期150天', '遠期180天', '匯率.1', '現金.1', '即期.1', '遠期10天.1', '遠期30天.1',
       '遠期60天.1', '遠期90天.1', '遠期120天.1', '遠期150天.1', '遠期180天.1'],
      dtype='object')


In [0]:
current_df

Unnamed: 0,幣別,匯率,現金,即期,遠期10天,遠期30天,遠期60天,遠期90天,遠期120天,遠期150天,...,匯率.1,現金.1,即期.1,遠期10天.1,遠期30天.1,遠期60天.1,遠期90天.1,遠期120天.1,遠期150天.1,遠期180天.1
0,USD,本行買入,31.105,31.43,31.428,31.342,31.215,31.105,30.992,30.875,...,本行賣出,31.775,31.58,31.534,31.458,31.345,31.245,31.14,31.038,30.94
1,HKD,本行買入,3.872,3.993,3.994,3.985,3.974,3.964,3.952,3.941,...,本行賣出,4.076,4.063,4.055,4.048,4.037,4.027,4.016,4.006,3.996
2,GBP,本行買入,38.6,39.495,39.696,39.588,39.458,39.326,39.189,39.052,...,本行賣出,40.72,40.125,40.112,40.037,39.907,39.772,39.649,39.525,39.402
3,AUD,本行買入,20.26,20.475,20.637,20.592,20.54,20.486,20.432,20.374,...,本行賣出,21.04,20.82,20.846,20.818,20.765,20.711,20.664,20.613,20.56
4,CAD,本行買入,22.85,23.18,23.301,23.244,23.173,23.101,23.026,22.95,...,本行賣出,23.76,23.51,23.507,23.462,23.39,23.317,23.25,23.183,23.116
5,SGD,本行買入,22.89,23.36,23.387,23.342,23.291,23.241,23.191,23.141,...,本行賣出,23.8,23.58,23.577,23.552,23.5,23.45,23.408,23.366,23.325
6,CHF,本行買入,35.0,35.61,35.817,35.791,35.778,35.765,35.748,35.731,...,本行賣出,36.2,36.0,36.082,36.087,36.075,36.06,36.059,36.057,36.056
7,JPY,本行買入,0.2001,0.2069,0.208,0.2081,0.2083,0.2085,0.2086,0.2088,...,本行賣出,0.2129,0.2119,0.212,0.2123,0.2125,0.2128,0.213,0.2133,0.2136
8,ZAR,本行買入,0.0,1.604,1.628,1.62,1.61,1.6,1.59,1.58,...,本行賣出,0.0,1.694,1.709,1.703,1.693,1.683,1.673,1.664,1.654
9,SEK,本行買入,2.67,3.0,3.007,3.001,2.994,2.987,2.98,2.972,...,本行賣出,3.19,3.12,3.108,3.104,3.097,3.09,3.085,3.079,3.073


In [0]:
fields = []
for field_name in current_df.columns:
    if field_name in ['幣別', '匯率', '匯率.1']: #, '匯率', "匯率.1"]:
        fields.append(StructField(field_name, StringType(), True))
    else:
        fields.append(StructField(field_name, FloatType(), True))
        
schema = StructType(fields)
spark_df = spark.createDataFrame(current_df, schema)
spark_df.printSchema()

root
 |-- 幣別: string (nullable = true)
 |-- 匯率: string (nullable = true)
 |-- 現金: float (nullable = true)
 |-- 即期: float (nullable = true)
 |-- 遠期10天: float (nullable = true)
 |-- 遠期30天: float (nullable = true)
 |-- 遠期60天: float (nullable = true)
 |-- 遠期90天: float (nullable = true)
 |-- 遠期120天: float (nullable = true)
 |-- 遠期150天: float (nullable = true)
 |-- 遠期180天: float (nullable = true)
 |-- 匯率.1: string (nullable = true)
 |-- 現金.1: float (nullable = true)
 |-- 即期.1: float (nullable = true)
 |-- 遠期10天.1: float (nullable = true)
 |-- 遠期30天.1: float (nullable = true)
 |-- 遠期60天.1: float (nullable = true)
 |-- 遠期90天.1: float (nullable = true)
 |-- 遠期120天.1: float (nullable = true)
 |-- 遠期150天.1: float (nullable = true)
 |-- 遠期180天.1: float (nullable = true)



In [0]:
display(spark_df)

幣別,匯率,現金,即期,遠期10天,遠期30天,遠期60天,遠期90天,遠期120天,遠期150天,遠期180天,匯率.1,現金.1,即期.1,遠期10天.1,遠期30天.1,遠期60天.1,遠期90天.1,遠期120天.1,遠期150天.1,遠期180天.1
USD,本行買入,31.105,31.43,31.428,31.342,31.215,31.105,30.992,30.875,30.768,本行賣出,31.775,31.58,31.534,31.458,31.345,31.245,31.14,31.038,30.94
HKD,本行買入,3.872,3.993,3.994,3.985,3.974,3.964,3.952,3.941,3.929,本行賣出,4.076,4.063,4.055,4.048,4.037,4.027,4.016,4.006,3.996
GBP,本行買入,38.6,39.495,39.696,39.588,39.458,39.326,39.189,39.052,38.916,本行賣出,40.72,40.125,40.112,40.037,39.907,39.772,39.649,39.525,39.402
AUD,本行買入,20.26,20.475,20.637,20.592,20.54,20.486,20.432,20.374,20.315,本行賣出,21.04,20.82,20.846,20.818,20.765,20.711,20.664,20.613,20.56
CAD,本行買入,22.85,23.18,23.301,23.244,23.173,23.101,23.026,22.95,22.875,本行賣出,23.76,23.51,23.507,23.462,23.39,23.317,23.25,23.183,23.116
SGD,本行買入,22.89,23.36,23.387,23.342,23.291,23.241,23.191,23.141,23.091,本行賣出,23.8,23.58,23.577,23.552,23.5,23.45,23.408,23.366,23.325
CHF,本行買入,35.0,35.61,35.817,35.791,35.778,35.765,35.748,35.731,35.715,本行賣出,36.2,36.0,36.082,36.087,36.075,36.06,36.059,36.057,36.056
JPY,本行買入,0.2001,0.2069,0.208,0.2081,0.2083,0.2085,0.2086,0.2088,0.209,本行賣出,0.2129,0.2119,0.212,0.2123,0.2125,0.2128,0.213,0.2133,0.2136
ZAR,本行買入,0.0,1.604,1.628,1.62,1.61,1.6,1.59,1.58,1.57,本行賣出,0.0,1.694,1.709,1.703,1.693,1.683,1.673,1.664,1.654
SEK,本行買入,2.67,3.0,3.007,3.001,2.994,2.987,2.98,2.972,2.965,本行賣出,3.19,3.12,3.108,3.104,3.097,3.09,3.085,3.079,3.073


## Remove Unused Columns

In [0]:
spark_df = spark_df.drop(
    "匯率", "現金", "匯率.1", "現金.1", "遠期10天", "遠期30天", "遠期60天", "遠期90天", "遠期120天", "遠期150天", "遠期180天",
    "遠期10天.1", "遠期30天.1", "遠期60天.1", "遠期90天.1", "遠期120天.1", "遠期150天.1", "遠期180天.1"
)
spark_df.printSchema()

root
 |-- 幣別: string (nullable = true)
 |-- 即期: float (nullable = true)
 |-- 即期.1: float (nullable = true)



## Clean Missing Data

In [0]:
clear_missing_data = spark_df.filter(spark_df['即期'] != 0)
display(clear_missing_data)

幣別,即期,即期.1
USD,31.43,31.58
HKD,3.993,4.063
GBP,39.495,40.125
AUD,20.475,20.82
CAD,23.18,23.51
SGD,23.36,23.58
CHF,35.61,36.0
JPY,0.2069,0.2119
ZAR,1.604,1.694
SEK,3.0,3.12


## Rename Columns

In [0]:
old_name = ["幣別", "即期", "即期.1"]
new_name = ["currency", "buy_rate", "sell_rate"]

for old, new in zip(old_name, new_name):
    clear_missing_data = clear_missing_data.withColumnRenamed(old, new)

clear_missing_data.printSchema()

root
 |-- currency: string (nullable = true)
 |-- buy_rate: float (nullable = true)
 |-- sell_rate: float (nullable = true)



In [0]:
display(clear_missing_data)

currency,buy_rate,sell_rate
USD,31.43,31.58
HKD,3.993,4.063
GBP,39.495,40.125
AUD,20.475,20.82
CAD,23.18,23.51
SGD,23.36,23.58
CHF,35.61,36.0
JPY,0.2069,0.2119
ZAR,1.604,1.694
SEK,3.0,3.12


## Get Current Date & Add Date Column

In [0]:
date = datetime.datetime.now().strftime("%Y%m%d")
include_date = clear_missing_data.withColumn("date", lit(date))
include_date.printSchema()

root
 |-- currency: string (nullable = true)
 |-- buy_rate: float (nullable = true)
 |-- sell_rate: float (nullable = true)
 |-- date: string (nullable = false)



In [0]:
display(include_date)

currency,buy_rate,sell_rate,date
USD,31.43,31.58,20240222
HKD,3.993,4.063,20240222
GBP,39.495,40.125,20240222
AUD,20.475,20.82,20240222
CAD,23.18,23.51,20240222
SGD,23.36,23.58,20240222
CHF,35.61,36.0,20240222
JPY,0.2069,0.2119,20240222
ZAR,1.604,1.694,20240222
SEK,3.0,3.12,20240222


## Get Currency List

In [0]:
currency_name = include_date.select('currency') \
    .toPandas()['currency'].tolist()

## Download Historical Dataset

In [0]:
old_name = ["資料日期", "幣別", "即期", "即期.1"] #4", "即期14"]
new_name = ["date", "currency", "buy_rate", "sell_rate"]

overall_data = None
for idx, currency_each in enumerate(currency_name):
    # Download Historical Data
    try:
        print(f"[*] Getting historical currency exchange data of { currency_each }.")
        hist_data_request = requests.get(f"https://rate.bot.com.tw/xrt/flcsv/0/l6m/{ currency_each }")
        temporary_df = pd.read_csv(io.StringIO(hist_data_request.content.decode('utf-8')), index_col = False)
    except Exception as e:
        print(f"[*] Error => { e }")
        continue

    fields = []
    for field_name in temporary_df.columns:
        if field_name in ['幣別', '匯率', '匯率.1']: #, '匯率', "匯率.1"]:
            fields.append(StructField(field_name, StringType(), True))
        elif field_name == '資料日期':
            fields.append(StructField(field_name, IntegerType(), True))
        else:
            fields.append(StructField(field_name, FloatType(), True))
            
    schema = StructType(fields)
    temporary_df = spark.createDataFrame(temporary_df, schema)

    # 資料日期| 幣別| 匯率2| 現金3| 即期4|遠期10天5|遠期30天6|遠期60天7|遠期90天8|遠期120天9|遠期150天10|遠期180天11
    # 匯率12|  現金13|  即期14|遠期10天15|遠期30天16|遠期60天17|遠期90天18|遠期120天19|遠期150天20|遠期180天21
    temporary_df = temporary_df.drop(
        "匯率", "現金", "匯率.1", "現金.1", "遠期10天", "遠期30天", "遠期60天", "遠期90天", "遠期120天", "遠期150天", "遠期180天",
        "遠期10天.1", "遠期30天.1", "遠期60天.1", "遠期90天.1", "遠期120天.1", "遠期150天.1", "遠期180天.1"
    )

    for old, new in zip(old_name, new_name):
        temporary_df = temporary_df.withColumnRenamed(old, new)

    temporary_df = temporary_df.withColumn('tempdate', temporary_df['date'])
    temporary_df = temporary_df.drop('date') 
    temporary_df = temporary_df.withColumnRenamed('tempdate', 'date')
    temporary_df = temporary_df.filter(col('date') != date)

    # Concatenate
    if overall_data is None:
        overall_data = include_date
        print(overall_data.printSchema())

    overall_data = overall_data.union(temporary_df)
    overall_data = overall_data.distinct()

[*] Getting historical currency exchange data of USD.
root
 |-- currency: string (nullable = true)
 |-- buy_rate: float (nullable = true)
 |-- sell_rate: float (nullable = true)
 |-- date: string (nullable = false)

None
[*] Getting historical currency exchange data of HKD.
[*] Getting historical currency exchange data of GBP.
[*] Getting historical currency exchange data of AUD.
[*] Getting historical currency exchange data of CAD.
[*] Getting historical currency exchange data of SGD.
[*] Getting historical currency exchange data of CHF.
[*] Getting historical currency exchange data of JPY.
[*] Getting historical currency exchange data of ZAR.
[*] Getting historical currency exchange data of SEK.
[*] Getting historical currency exchange data of NZD.
[*] Getting historical currency exchange data of THB.
[*] Getting historical currency exchange data of EUR.
[*] Getting historical currency exchange data of CNY.


In [0]:
overall_data = overall_data.dropDuplicates()
display(overall_data)

currency,buy_rate,sell_rate,date
USD,32.195,32.295,20230927
HKD,4.001,4.061,20240221
HKD,4.022,4.082,20231122
HKD,3.899,3.959,20231228
USD,32.375,32.475,20231031
USD,31.08,31.18,20231225
HKD,3.989,4.049,20231204
USD,32.1,32.2,20231106
USD,31.845,31.945,20230905
CAD,23.18,23.51,20240222


## Sort the date

In [0]:
sorted_df = overall_data.sort(asc('date'))

In [0]:
display(sorted_df)

currency,buy_rate,sell_rate,date
USD,31.88,31.98,20230823
GBP,40.44,40.84,20230823
HKD,4.042,4.102,20230823
AUD,20.43,20.63,20230823
CAD,23.46,23.66,20230823
SGD,23.45,23.63,20230823
JPY,0.2177,0.2217,20230823
CHF,36.14,36.39,20230823
ZAR,1.67,1.75,20230823
SEK,2.87,2.97,20230823


## Save to Table

Databricks is a delta lake solution.

In [0]:
sorted_df.printSchema()

root
 |-- currency: string (nullable = true)
 |-- buy_rate: float (nullable = true)
 |-- sell_rate: float (nullable = true)
 |-- date: string (nullable = true)



In [0]:
sorted_df.coalesce(1).write.mode("overwrite").saveAsTable("default.bot_exchange")

## Read from Table

In [0]:
pd_df = spark.sql('select * from default.bot_exchange')

In [0]:
display(pd_df)

currency,buy_rate,sell_rate,date
USD,31.88,31.98,20230823
GBP,40.44,40.84,20230823
HKD,4.042,4.102,20230823
AUD,20.43,20.63,20230823
CAD,23.46,23.66,20230823
SGD,23.45,23.63,20230823
JPY,0.2177,0.2217,20230823
CHF,36.14,36.39,20230823
ZAR,1.67,1.75,20230823
SEK,2.87,2.97,20230823


## SQL

In [0]:
%sql
SELECT * FROM default.bot_exchange

currency,buy_rate,sell_rate,date
USD,31.88,31.98,20230823
GBP,40.44,40.84,20230823
HKD,4.042,4.102,20230823
AUD,20.43,20.63,20230823
CAD,23.46,23.66,20230823
SGD,23.45,23.63,20230823
JPY,0.2177,0.2217,20230823
CHF,36.14,36.39,20230823
ZAR,1.67,1.75,20230823
SEK,2.87,2.97,20230823


### View only the buy rate of THB

In [0]:
%sql
SELECT date, buy_rate FROM default.bot_exchange WHERE currency = "THB" ORDER BY date DESC;

date,buy_rate
20240222,0.8585
20240221,0.8644
20240220,0.8563
20240219,0.8581
20240217,0.8568
20240216,0.8549
20240215,0.8537
20240207,0.8667
20240206,0.8632
20240205,0.8616


### View only the sale rate of THB

In [0]:
%sql
SELECT date, sell_rate FROM default.bot_exchange WHERE currency = "THB" ORDER BY date DESC;

date,sell_rate
20240222,0.9045
20240221,0.9044
20240220,0.8963
20240219,0.8981
20240217,0.8968
20240216,0.8949
20240215,0.8937
20240207,0.9067
20240206,0.9032
20240205,0.9016
