In [23]:
# !pip install python-dotenv

In [1]:
import os
from dotenv import load_dotenv

In [2]:
load_dotenv()

BRONZE_PATH = os.environ.get("BRONZE_PATH")
SILVER_PATH = os.environ.get("SILVER_PATH")
MINIO_ACCESS_KEY = os.environ.get("MINIO_ACCESS_KEY")
MINIO_SECRET_KEY = os.environ.get("MINIO_SECRET_KEY")

bronze_path = f"s3a:/{BRONZE_PATH}/vnstock3/stock_quote_history_daily/"
silver_path = f"s3a:/{SILVER_PATH}/vnstock3/daily_stock_prices/"

endpoint_url = "http://localhost:9000"

# bronze_path = f"s3a://dev/data/bronze/vnstock3/stock_quote_history_daily/"
# silver_path = f"s3a://dev/data/silver/vnstock3/daily_stock_prices/"

print(bronze_path)
print(silver_path)

s3a://dev/data/bronze/vnstock3/stock_quote_history_daily/
s3a://dev/data/silver/vnstock3/daily_stock_prices/


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, from_unixtime

In [4]:
# Setup Spark sesssion
spark = (SparkSession.builder
    .master("spark://spark-master:7077")
    .appName("DailyStockPriceProcessor")
    .getOrCreate()
)

spark

In [10]:
# print(spark.sparkContext.getConf().getAll())

In [5]:
# Test Spark operations

data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Id"])
df

df.show()

+-----+---+
| Name| Id|
+-----+---+
|Alice|  1|
|  Bob|  2|
|Cathy|  3|
+-----+---+



In [6]:
# spark.sparkContext.getConf().getAll()

In [7]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, IntegerType, DateType

In [8]:
schema = StructType([
    StructField("time", StringType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("volume", IntegerType(), True),
    StructField("symbol", StringType(), True),
    StructField("loaded_timestamp", StringType(), True),
    StructField("updated_at", DateType(), True)
])

# Read the raw data from the bronze path in MinIO
raw_df = spark.read.schema(schema).json(bronze_path)

In [9]:
raw_df.show(10, truncate=False)

+------------+----+----+----+-----+------+------+----------------+----------+
|time        |open|high|low |close|volume|symbol|loaded_timestamp|updated_at|
+------------+----+----+----+-----+------+------+----------------+----------+
|965347200000|3.5 |3.5 |3.5 |3.5  |1500  |HAP   |1730438412099   |2024-11-01|
|965606400000|3.56|3.56|3.56|3.56 |3000  |HAP   |1730438412099   |2024-11-01|
|965779200000|3.63|3.63|3.63|3.63 |2100  |HAP   |1730438412099   |2024-11-01|
|965952000000|3.7 |3.7 |3.7 |3.7  |2300  |HAP   |1730438412099   |2024-11-01|
|966211200000|3.76|3.76|3.76|3.76 |1500  |HAP   |1730438412099   |2024-11-01|
|966384000000|3.83|3.83|3.83|3.83 |7600  |HAP   |1730438412099   |2024-11-01|
|966556800000|3.89|3.89|3.89|3.89 |1400  |HAP   |1730438412099   |2024-11-01|
|966816000000|3.96|3.96|3.96|3.96 |1000  |HAP   |1730438412099   |2024-11-01|
|966988800000|4.02|4.02|4.02|4.02 |2000  |HAP   |1730438412099   |2024-11-01|
|967161600000|4.09|4.09|4.09|4.09 |2100  |HAP   |1730438412099  

In [15]:
# Convert 'loaded_timestamp' from string (epoch time in milliseconds) to TimestampType
raw_df = (raw_df
    .withColumn(
        "loaded_timestamp",
        from_unixtime(col("loaded_timestamp").cast("long") / 1000).cast(TimestampType())  # Convert milliseconds to seconds
    )
    .withColumn(
        "time",
        from_unixtime(col("time").cast("long") / 1000).cast(TimestampType())  # Convert milliseconds to seconds
    )
)

raw_df.filter(raw_df['symbol']=='TV2').show(10, truncate=False)
raw_df.printSchema()

+-------------------+----+----+----+-----+------+------+-------------------+----------+
|time               |open|high|low |close|volume|symbol|loaded_timestamp   |updated_at|
+-------------------+----+----+----+-----+------+------+-------------------+----------+
|2009-10-13 00:00:00|0.68|0.99|0.68|0.94 |84300 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-14 00:00:00|1.01|1.01|0.88|0.91 |47300 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-15 00:00:00|0.89|0.91|0.86|0.88 |41800 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-16 00:00:00|0.86|0.86|0.82|0.82 |47500 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-19 00:00:00|0.8 |0.82|0.79|0.79 |34400 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-20 00:00:00|0.77|0.85|0.75|0.85 |78200 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-21 00:00:00|0.88|0.88|0.88|0.88 |29800 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-22 00:00:00|0.88|0.93|0.88|0.88 |27100 |TV2   |2024-11-01 05:06:33|2024-11-01|
|2009-10-23 00:00:00|0.88|0.91|0

In [16]:
# drop na

df = raw_df.dropna(subset=["time", "open", "high", "low", "close", "volume", "symbol"])

df.count()

1844693

In [17]:
# drop duplicate
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("symbol", "time").orderBy(col("updated_at"), col("loaded_timestamp").desc())

df = df.withColumn("row_number",
                  row_number().over(window_spec))

df_drop_dup = df.filter(col("row_number") == 1) \
                .drop("row_number")

df.filter((col("symbol") == "TV2") & (col("time") == "2024-11-01")).show(20, truncate=False)

+-------------------+-----+-----+-----+-----+------+------+-------------------+----------+----------+
|time               |open |high |low  |close|volume|symbol|loaded_timestamp   |updated_at|row_number|
+-------------------+-----+-----+-----+-----+------+------+-------------------+----------+----------+
|2024-11-01 00:00:00|28.2 |28.4 |28.0 |28.3 |51000 |TV2   |2024-11-01 05:15:20|2024-11-01|1         |
|2024-11-01 00:00:00|28.2 |28.4 |28.0 |28.3 |51000 |TV2   |2024-11-01 05:14:02|2024-11-01|2         |
|2024-11-01 00:00:00|28.2 |28.4 |28.0 |28.3 |51000 |TV2   |2024-11-01 05:13:24|2024-11-01|3         |
|2024-11-01 00:00:00|28.2 |28.4 |28.0 |28.3 |51000 |TV2   |2024-11-01 05:13:01|2024-11-01|4         |
|2024-11-01 00:00:00|28.2 |28.4 |28.0 |28.3 |51000 |TV2   |2024-11-01 05:10:52|2024-11-01|5         |
|2024-11-01 00:00:00|28.2 |28.4 |28.0 |28.3 |51000 |TV2   |2024-11-01 05:06:33|2024-11-01|6         |
|2024-11-01 00:00:00|27.32|27.51|27.03|27.03|106318|TV2   |2024-12-06 05:06:08|202

In [18]:
df_drop_dup.filter((col("symbol") == "TV2") & (col("time") == "2024-11-01")).show(20, truncate=False)

+-------------------+----+----+----+-----+------+------+-------------------+----------+
|time               |open|high|low |close|volume|symbol|loaded_timestamp   |updated_at|
+-------------------+----+----+----+-----+------+------+-------------------+----------+
|2024-11-01 00:00:00|28.2|28.4|28.0|28.3 |51000 |TV2   |2024-11-01 05:15:20|2024-11-01|
+-------------------+----+----+----+-----+------+------+-------------------+----------+



In [19]:
# Calculate the daily price range (high - low)
df_cal = df_drop_dup.withColumn(
    "price_range",
    df.high - df.low
)

df_cal.filter(
    (col("symbol") == "TV2") 
    & (col("time") == "2024-11-01")
) \
.show(20)

+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+
|               time|open|high| low|close|volume|symbol|   loaded_timestamp|updated_at|price_range|
+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+
|2024-11-01 00:00:00|28.2|28.4|28.0| 28.3| 51000|   TV2|2024-11-01 05:15:20|2024-11-01| 0.39999962|
+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+



In [20]:
from pyspark.sql import functions as F

In [21]:
# Calculate the percentage change from open to close
df_cal = df_cal.withColumn(
    "price_percent_change",
    (F.col("close") - F.col("open")) / F.col("open")
)

df_cal.filter(
    (col("symbol") == "TV2") 
    & (col("time") == "2024-11-01")
) \
.show(20)

+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+--------------------+
|               time|open|high| low|close|volume|symbol|   loaded_timestamp|updated_at|price_range|price_percent_change|
+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+--------------------+
|2024-11-01 00:00:00|28.2|28.4|28.0| 28.3| 51000|   TV2|2024-11-01 05:15:20|2024-11-01| 0.39999962|0.003546045085662...|
+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+--------------------+



In [22]:
# Add metadata - processing time

df_cal = df_cal.withColumn(
    "processing_time",
    F.current_timestamp()
)

df_cal.filter(
    (col("symbol") == "TV2") 
    & (col("time") == "2024-11-01")
) \
.show(20)

+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+--------------------+--------------------+
|               time|open|high| low|close|volume|symbol|   loaded_timestamp|updated_at|price_range|price_percent_change|     processing_time|
+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+--------------------+--------------------+
|2024-11-01 00:00:00|28.2|28.4|28.0| 28.3| 51000|   TV2|2024-11-01 05:15:20|2024-11-01| 0.39999962|0.003546045085662...|2024-12-13 04:55:...|
+-------------------+----+----+----+-----+------+------+-------------------+----------+-----------+--------------------+--------------------+



In [23]:
# Write to silver layer

print(bronze_path) 
print(silver_path)

df_cal.write.mode("overwrite").parquet(silver_path)

s3a://dev/data/bronze/vnstock3/stock_quote_history_daily/
s3a://dev/data/silver/vnstock3/daily_stock_prices/


In [24]:
df_silver = spark.read.parquet(silver_path)

df_silver.show()

+-------------------+-----+-----+-----+-----+------+------+-------------------+----------+-----------+--------------------+--------------------+
|               time| open| high|  low|close|volume|symbol|   loaded_timestamp|updated_at|price_range|price_percent_change|     processing_time|
+-------------------+-----+-----+-----+-----+------+------+-------------------+----------+-----------+--------------------+--------------------+
|2018-10-25 00:00:00|13.19|13.19|13.19|13.19|     0|   A32|2024-11-01 05:16:42|2024-11-01|        0.0|                 0.0|2024-12-13 04:55:...|
|2018-10-31 00:00:00|13.19|13.19|13.19|13.19|     0|   A32|2024-11-01 05:16:42|2024-11-01|        0.0|                 0.0|2024-12-13 04:55:...|
|2018-11-07 00:00:00|13.19|13.19|13.19|13.19|     0|   A32|2024-11-01 05:16:42|2024-11-01|        0.0|                 0.0|2024-12-13 04:55:...|
|2018-11-14 00:00:00|13.19|13.19|13.19|13.19|     0|   A32|2024-11-01 05:16:42|2024-11-01|        0.0|                 0.0|2024-12

In [10]:
spark.stop()