In [1]:
!pip install webdriver_manager bs4 selenium


[notice] A new release of pip available: 22.1.2 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time
import json

options = Options()
options.headless = True

URL = "https://www.bsp.gov.ph/SitePages/Statistics/ExchangeRate.aspx"

driver = webdriver.Chrome(ChromeDriverManager().install(), options=options)
driver.get(URL)
time.sleep(5)

with open("html.txt", "w", encoding='utf-8') as f:
    f.write(str(driver.page_source))

html = driver.page_source
soup = BeautifulSoup(html, "html.parser")

table = soup.find(id="tb1")
table2 = soup.find(id="tb2")

cols = [
    "country",
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
]

data = {}
data1 = list()

for index, _ in enumerate(table.contents):
    if index > 0:
        for index, tag in enumerate(_.contents):
            data[cols[index]] = tag.text
        data1.append(dict(data))

from datetime import datetime
DATE_FORMAT = "%Y%m%d_%T"
timestamp = datetime.now().strftime(DATE_FORMAT).replace(":", "")

with open(f"data/rates.json", "w", encoding='utf-8') as f:
    f.write(json.dumps(data1, indent=4))


  driver = webdriver.Chrome(ChromeDriverManager().install(), options=options)


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Exchange Rates Analysis") \
    .getOrCreate()

## Define the structure of the schema

In here, we use the **StructType** and **StructField** python classes to define the structure of our schema. 

In [5]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import (
    IntegerType, 
    StringType, 
    FloatType,
    BinaryType
)

path = r"data\rates.json"

struct = StructType([
    StructField("country", StringType(), False),
    StructField("unit", StringType(), False),
    StructField("symbol", StringType(), False),
    StructField("euro_equivalent", FloatType(), False),
    StructField("us_dollar_equivalent", FloatType(), False),
    StructField("phil_peso_equivalent", FloatType(), False)
])

df = spark \
    .read \
    .option("multiLine", True) \
    .json(path)

In [6]:
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- euro_equivalent: string (nullable = true)
 |-- phil_peso_equivalent: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- us_dollar_equivalent: string (nullable = true)



In [35]:
from pyspark.sql.functions import split

df1 = df.select(
    split("country", " ", -1).alias("country"),
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
)

df1.select("country").show()

+--------------------+
|             country|
+--------------------+
| [1, UNITED, STATES]|
|          [2, JAPAN]|
|[3, UNITED, KINGDOM]|
|       [4, HONGKONG]|
|    [5, SWITZERLAND]|
|         [6, CANADA]|
|      [7, SINGAPORE]|
|      [8, AUSTRALIA]|
|        [9, BAHRAIN]|
|        [10, KUWAIT]|
| [11, SAUDI, ARABIA]|
|        [12, BRUNEI]|
|     [13, INDONESIA]|
|      [14, THAILAND]|
|[15, UNITED, ARAB...|
|[16, EUROPEAN, MO...|
|         [17, KOREA]|
|         [18, CHINA]|
+--------------------+



In [36]:
from pyspark.sql.functions import slice, concat_ws, size

df1 = df1.select(
    slice("country", 2, size("country")).alias("country"),
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
)

df1.select("country").show()

df1_view = df1.createOrReplaceTempView("exchange_rates")

+--------------------+
|             country|
+--------------------+
|    [UNITED, STATES]|
|             [JAPAN]|
|   [UNITED, KINGDOM]|
|          [HONGKONG]|
|       [SWITZERLAND]|
|            [CANADA]|
|         [SINGAPORE]|
|         [AUSTRALIA]|
|           [BAHRAIN]|
|            [KUWAIT]|
|     [SAUDI, ARABIA]|
|            [BRUNEI]|
|         [INDONESIA]|
|          [THAILAND]|
|[UNITED, ARAB, EM...|
|[EUROPEAN, MONETA...|
|             [KOREA]|
|             [CHINA]|
+--------------------+



In [37]:
from pyspark.sql.functions import concat_ws

df1 = df1.select(
    concat_ws(" ", "country").alias("country"),
    "unit",
    "symbol",
    "euro_equivalent",
    "us_dollar_equivalent",
    "phil_peso_equivalent"
)

df1.select("country").show()

df1_view = df1.createOrReplaceTempView("exchange_rates")

+--------------------+
|             country|
+--------------------+
|       UNITED STATES|
|               JAPAN|
|      UNITED KINGDOM|
|            HONGKONG|
|         SWITZERLAND|
|              CANADA|
|           SINGAPORE|
|           AUSTRALIA|
|             BAHRAIN|
|              KUWAIT|
|        SAUDI ARABIA|
|              BRUNEI|
|           INDONESIA|
|            THAILAND|
|UNITED ARAB EMIRATES|
|EUROPEAN MONETARY...|
|               KOREA|
|               CHINA|
+--------------------+



In [38]:
script = """SELECT 
                INITCAP(country) AS country, 
                unit, 
                symbol, 
                euro_equivalent, 
                us_dollar_equivalent,
                phil_peso_equivalent 
            from 
                exchange_rates
    """

res = spark.sql(script)
res.select("country").show()

+--------------------+
|             country|
+--------------------+
|       United States|
|               Japan|
|      United Kingdom|
|            Hongkong|
|         Switzerland|
|              Canada|
|           Singapore|
|           Australia|
|             Bahrain|
|              Kuwait|
|        Saudi Arabia|
|              Brunei|
|           Indonesia|
|            Thailand|
|United Arab Emirates|
|European Monetary...|
|               Korea|
|               China|
+--------------------+



In [39]:
from pyspark.sql.functions import (
    current_date, 
    current_timestamp
)

res.select("*").withColumn("timestamp", current_date()).show()

+--------------------+--------+------+---------------+--------------------+--------------------+----------+
|             country|    unit|symbol|euro_equivalent|us_dollar_equivalent|phil_peso_equivalent| timestamp|
+--------------------+--------+------+---------------+--------------------+--------------------+----------+
|       United States|  DOLLAR|   USD|       0.928333|            1.000000|             55.9670|2023-05-19|
|               Japan|     YEN|   JPY|       0.006695|            0.007212|              0.4036|2023-05-19|
|      United Kingdom|   POUND|   GBP|       1.151875|            1.240800|             69.4439|2023-05-19|
|            Hongkong|  DOLLAR|   HKD|       0.118612|            0.127769|              7.1508|2023-05-19|
|         Switzerland|   FRANC|   CHF|       1.026009|            1.105217|             61.8557|2023-05-19|
|              Canada|  DOLLAR|   CAD|       0.687552|            0.740631|             41.4509|2023-05-19|
|           Singapore|  DOLL

In [None]:
res.dispaly