
# ETL Foreign Exchange Rates from Bangko Sentral ng Pilipinas

## Todos:

1. Create a python script that will scrape exchange rate conversions from [Bangko Sentral ng Pilipinas](https://www.bsp.gov.ph/SitePages/Statistics/ExchangeRate.aspx) official website.
2. Save the raw unprocessed scraped data to `data/raw/rates` and name it as **rates_[timestamp].json**.
2. Use **pyspark** to do cleansing and transformation against the scraped data.
3. Save the processed data as **rates.json** to `data/processed/rates` folder.
4. Schedule the ETL task to run on a daily basis.

## Prerequisites:

- **Selenium** is going to be used for automating browser operations.
- **WebDriver Manager (_optional_)** will provide us the driver we need to work with selenium.
- **BeautifulSoup** will parse the html markup to be generated by selenium.
- **PySpark** will be used for data wrangling, cleansing and transformation.

In [1]:
!pip install webdriver_manager bs4 selenium


[notice] A new release of pip available: 22.1.2 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time
import json

options = Options()
options.headless = True

URL = "https://www.bsp.gov.ph/SitePages/Statistics/ExchangeRate.aspx"

driver = webdriver.Chrome(ChromeDriverManager().install(), options=options)
driver.get(URL)
time.sleep(5)

# with open("html.txt", "w", encoding='utf-8') as f:
#    f.write(str(driver.page_source))

html = driver.page_source
soup = BeautifulSoup(html, "html.parser")

table = soup.find(id="tb1")
table2 = soup.find(id="tb2")

cols = [
    "country",
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
]

data = {}
data1 = list()

for index, _ in enumerate(table.contents):
    if index > 0:
        for index, tag in enumerate(_.contents):
            data[cols[index]] = tag.text
        data1.append(dict(data))

from datetime import datetime
DATE_FORMAT = "%Y%m%d_%T"
timestamp = datetime.now().strftime(DATE_FORMAT).replace(":", "")

with open(f"data/rates_{timestamp}.json", "w", encoding='utf-8') as f:
    f.write(json.dumps(data1, indent=4))


  driver = webdriver.Chrome(ChromeDriverManager().install(), options=options)


In [131]:
%%markdown

## Create a spark session 


## Create a spark session 


In [196]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("ETL Foreign Exchange Rates from Bangko Sentral ng Pilipinas") \
    .getOrCreate()

In [198]:
SparkConf().set("spark.app.name", "ETL Foreign Exchange Rates from Bangko Sentral ng Pilipinas")

<pyspark.conf.SparkConf at 0x20e144dceb0>

In [201]:
spark.conf.get("spark.app.name")

'ETL Foreign Exchange Rates from Bangko Sentral ng Pilipinas'

In [203]:
spark.conf.get("spark.master")

'local[*]'

In [204]:
spark

## Define the structure of the schema

In here, we use the **StructType** and **StructField** python classes to define the structure of our schema. 

In [150]:
from pyspark.sql.types import (
    StructType, 
    StructField, 
    FloatType,
    StringType
)

struct = StructType([StructField("country", StringType(), False),
    StructField("euro_equivalent", FloatType(), False),
    StructField("phil_peso_equivalent", FloatType(), False),
    StructField("symbol", StringType(), False),
    StructField("unit", StringType(), False),
    StructField("us_dollar_equivalent", FloatType(), False)
])

df = spark \
    .read \
    .schema(struct) \
    .option("multiLine", True) \
    .json(path)

df.show()

Py4JJavaError: An error occurred while calling o724.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 54.0 failed 1 times, most recent failure: Lost task 0.0 in stage 54.0 (TID 49) (MIS-JPDelmundo.CEGROUP.com executor driver): java.lang.ClassCastException: class org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to class org.apache.spark.unsafe.types.UTF8String (org.apache.spark.sql.catalyst.util.GenericArrayData and org.apache.spark.unsafe.types.UTF8String are in unnamed module of loader 'app')
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$1(FileFormat.scala:156)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:208)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1623)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at jdk.internal.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.lang.ClassCastException: class org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to class org.apache.spark.unsafe.types.UTF8String (org.apache.spark.sql.catalyst.util.GenericArrayData and org.apache.spark.unsafe.types.UTF8String are in unnamed module of loader 'app')
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$1(FileFormat.scala:156)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:208)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


## Verify the schema

In [130]:
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- euro_equivalent: string (nullable = true)
 |-- phil_peso_equivalent: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- us_dollar_equivalent: string (nullable = true)



In [128]:
from pyspark.sql.types import (
    StringType, 
    FloatType
)

type_casted_df = df.select(
    df.country.cast(StringType()),
    df.unit.cast(StringType()),
    df.symbol.cast(StringType()),
    df.euro_equivalent.cast(FloatType()),
    df.us_dollar_equivalent.cast(FloatType()),
    df.phil_peso_equivalent.cast(FloatType()),
    )
type_casted_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- euro_equivalent: float (nullable = true)
 |-- us_dollar_equivalent: float (nullable = true)
 |-- phil_peso_equivalent: float (nullable = true)



## Viewing data summary

In [129]:
df.summary().show()

+-------+---------------+------------------+--------------------+------+--------+--------------------+
|summary|        country|   euro_equivalent|phil_peso_equivalent|symbol|    unit|us_dollar_equivalent|
+-------+---------------+------------------+--------------------+------+--------+--------------------+
|  count|             18|                18|                  18|    18|      18|                  18|
|   mean|           null|0.5901108235294118|   35.57639999999999|  null|    null|  0.6356674117647058|
| stddev|           null|0.6294966581344211|  37.950888866171496|  null|    null|  0.6780937404024663|
|    min|1 UNITED STATES|          0.000062|              0.0037|   AED|BAHT****|            0.000067|
|    25%|           null|          0.118612|              7.1508|  null|    null|            0.127769|
|    50%|           null|          0.614649|             37.0558|  null|    null|              0.6621|
|    75%|           null|          0.928333|              55.967|  null| 

## Transforming the data

In [35]:
from pyspark.sql.functions import split

df1 = df.select(
    split("country", " ", -1).alias("country"),
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
)

df1.select("country").show()

+--------------------+
|             country|
+--------------------+
| [1, UNITED, STATES]|
|          [2, JAPAN]|
|[3, UNITED, KINGDOM]|
|       [4, HONGKONG]|
|    [5, SWITZERLAND]|
|         [6, CANADA]|
|      [7, SINGAPORE]|
|      [8, AUSTRALIA]|
|        [9, BAHRAIN]|
|        [10, KUWAIT]|
| [11, SAUDI, ARABIA]|
|        [12, BRUNEI]|
|     [13, INDONESIA]|
|      [14, THAILAND]|
|[15, UNITED, ARAB...|
|[16, EUROPEAN, MO...|
|         [17, KOREA]|
|         [18, CHINA]|
+--------------------+



In [36]:
from pyspark.sql.functions import slice, concat_ws, size

df1 = df1.select(
    slice("country", 2, size("country")).alias("country"),
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
)

df1.select("country").show()

df1_view = df1.createOrReplaceTempView("exchange_rates")

+--------------------+
|             country|
+--------------------+
|    [UNITED, STATES]|
|             [JAPAN]|
|   [UNITED, KINGDOM]|
|          [HONGKONG]|
|       [SWITZERLAND]|
|            [CANADA]|
|         [SINGAPORE]|
|         [AUSTRALIA]|
|           [BAHRAIN]|
|            [KUWAIT]|
|     [SAUDI, ARABIA]|
|            [BRUNEI]|
|         [INDONESIA]|
|          [THAILAND]|
|[UNITED, ARAB, EM...|
|[EUROPEAN, MONETA...|
|             [KOREA]|
|             [CHINA]|
+--------------------+



In [37]:
from pyspark.sql.functions import concat_ws

df1 = df1.select(
    concat_ws(" ", "country").alias("country"),
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
)

df1.select("country").show()

df1_view = df1.createOrReplaceTempView("exchange_rates")

+--------------------+
|             country|
+--------------------+
|       UNITED STATES|
|               JAPAN|
|      UNITED KINGDOM|
|            HONGKONG|
|         SWITZERLAND|
|              CANADA|
|           SINGAPORE|
|           AUSTRALIA|
|             BAHRAIN|
|              KUWAIT|
|        SAUDI ARABIA|
|              BRUNEI|
|           INDONESIA|
|            THAILAND|
|UNITED ARAB EMIRATES|
|EUROPEAN MONETARY...|
|               KOREA|
|               CHINA|
+--------------------+



In [38]:
script = """SELECT 
                INITCAP(country) AS country, 
                unit, 
                symbol, 
                euro_equivalent, 
                us_dollar_equivalent,
                phil_peso_equivalent 
            from 
                exchange_rates
    """

res = spark.sql(script)
res.select("country").show()

+--------------------+
|             country|
+--------------------+
|       United States|
|               Japan|
|      United Kingdom|
|            Hongkong|
|         Switzerland|
|              Canada|
|           Singapore|
|           Australia|
|             Bahrain|
|              Kuwait|
|        Saudi Arabia|
|              Brunei|
|           Indonesia|
|            Thailand|
|United Arab Emirates|
|European Monetary...|
|               Korea|
|               China|
+--------------------+



In [48]:
from pyspark.sql.functions import (
    current_date, 
    current_timestamp
)

res.select("*").withColumn("timestamp", current_date()).show()
res = res.select("*").withColumn("timestamp", current_date())

+--------------------+--------+------+---------------+--------------------+--------------------+----------+
|             country|    unit|symbol|euro_equivalent|us_dollar_equivalent|phil_peso_equivalent| timestamp|
+--------------------+--------+------+---------------+--------------------+--------------------+----------+
|       United States|  DOLLAR|   USD|       0.928333|            1.000000|             55.9670|2023-05-19|
|               Japan|     YEN|   JPY|       0.006695|            0.007212|              0.4036|2023-05-19|
|      United Kingdom|   POUND|   GBP|       1.151875|            1.240800|             69.4439|2023-05-19|
|            Hongkong|  DOLLAR|   HKD|       0.118612|            0.127769|              7.1508|2023-05-19|
|         Switzerland|   FRANC|   CHF|       1.026009|            1.105217|             61.8557|2023-05-19|
|              Canada|  DOLLAR|   CAD|       0.687552|            0.740631|             41.4509|2023-05-19|
|           Singapore|  DOLL

In [50]:
res.write. \
    option("header", "true") \
    .mode("overwrite") \
    .format("csv") \
    .save("data\output")

In [154]:
from pyspark.sql.types import ArrayType

data = [("Alice", ["Java", "Scala"]), ("Bob", ["Python", "Scala"])]
schema = StructType([
    StructField("name", StringType()),
    StructField("languagesSkills", ArrayType(StringType())),
])
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- languagesSkills: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [160]:
spark