In [24]:
# Initialize Spark Session with PostgreSQL driver
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_date, sum as _sum, avg as _avg, count, last, row_number, expr
)
from pyspark.sql.window import Window


In [25]:

spark = SparkSession.builder \
    .appName("PostgreSQL Integration") \
    .config("spark.jars", "/home/jovyan/postgresql-42.7.5.jar") \
    .config("spark.driver.extraClassPath", "/home/jovyan/postgresql-42.7.5.jar") \
    .getOrCreate()

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/nepse") \
    .option("dbtable", "nepse1") \
    .option("user", "admin") \
    .option("password", "admin") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.show()

+---+----------------+------+-----+------+--------+------+----------+--------------------+----------+-----------+------------+--------------------+
| id|  transaction_no|symbol|buyer|seller|quantity|  rate|    amount|          scraped_at|trade_date|page_number|kafka_offset|        processed_at|
+---+----------------+------+-----+------+--------+------+----------+--------------------+----------+-----------+------------+--------------------+
|585|2025032401016000| NABIL|   51|    57|      10|484.00|  4,840.00|2025-04-06 21:49:...|2025-03-24|          1|        NULL|2025-04-06 16:05:...|
|586|2025032401015807| NABIL|   13|    57|      19|484.00|  9,196.00|2025-04-06 21:49:...|2025-03-24|          1|        NULL|2025-04-06 16:05:...|
|587|2025032401015652| NABIL|   38|    57|     498|485.00|241,530.00|2025-04-06 21:49:...|2025-03-24|          1|        NULL|2025-04-06 16:05:...|
|588|2025032401015651| NABIL|   38|    54|      50|484.90| 24,245.00|2025-04-06 21:49:...|2025-03-24|          1

In [59]:
nabil_df = df.filter(
    (col("symbol") == "NABIL") 
).withColumn("rate", col("rate").cast("double")) \
 .withColumn("quantity", col("quantity").cast("long")) \
 .withColumn("amount", regexp_replace("amount", ",", "").cast("float"))

In [60]:
nabil_df1 = nabil_df.withColumn("quantity", col("quantity").cast("int")) \
       .withColumn("rate", col("rate").cast("float")) \
 .withColumn("trade_date", to_date(col("trade_date")))
       # .withColumn("amount", col("amount").cast("float")) \
      

In [61]:
nabil_df1.show()

+---+----------------+------+-----+------+--------+-----+--------+--------------------+----------+-----------+------------+--------------------+
| id|  transaction_no|symbol|buyer|seller|quantity| rate|  amount|          scraped_at|trade_date|page_number|kafka_offset|        processed_at|
+---+----------------+------+-----+------+--------+-----+--------+--------------------+----------+-----------+------------+--------------------+
|585|2025032401016000| NABIL|   51|    57|      10|484.0|  4840.0|2025-04-06 21:49:...|2025-03-24|          1|        NULL|2025-04-06 16:05:...|
|586|2025032401015807| NABIL|   13|    57|      19|484.0|  9196.0|2025-04-06 21:49:...|2025-03-24|          1|        NULL|2025-04-06 16:05:...|
|587|2025032401015652| NABIL|   38|    57|     498|485.0|241530.0|2025-04-06 21:49:...|2025-03-24|          1|        NULL|2025-04-06 16:05:...|
|588|2025032401015651| NABIL|   38|    54|      50|484.9| 24245.0|2025-04-06 21:49:...|2025-03-24|          1|        NULL|2025-04

In [62]:
# Window to get last transaction of the day (for closing price)
window_day = Window.partitionBy("trade_date").orderBy(col("processed_at").desc())

# Add row number to identify last row per day
nabil_df2 = nabil_df1.withColumn("row_num", row_number().over(window_day))
# Get closing price (last rate of each day)
nabil_df2 = nabil_df2.filter(col("row_num") == 1) \
    .select("trade_date", col("rate").alias("closing_rate"))

In [63]:
nabil_df2.show()

+----------+------------+
|trade_date|closing_rate|
+----------+------------+
|2025-03-24|       478.0|
|2025-03-25|       480.0|
|2025-03-26|       481.0|
|2025-03-27|       485.0|
|2025-03-30|       491.0|
|2025-04-01|       484.6|
|2025-04-02|       485.0|
|2025-04-03|       489.5|
|2025-04-07|       488.0|
+----------+------------+



In [67]:
nabil_df3 = nabil_df1.groupBy("trade_date").agg(
    _sum("quantity").alias("total_quantity"),
    _sum("amount").alias("total_amount"),
    count("transaction_no").alias("num_transactions")
).join(nabil_df2, on="trade_date", how="inner")

In [68]:
nabil_df3.show()

+----------+--------------+--------------------+----------------+------------+
|trade_date|total_quantity|        total_amount|num_transactions|closing_rate|
+----------+--------------+--------------------+----------------+------------+
|2025-04-07|         42030|2.2422635998046875E7|             432|       488.0|
|2025-03-30|         51866| 2.586842710107422E7|             541|       491.0|
|2025-04-02|         36189|1.7575535215820312E7|             393|       485.0|
|2025-03-27|         34710|1.8513985205078125E7|             422|       485.0|
|2025-04-03|         37941| 2.060228469189453E7|             393|       489.5|
|2025-03-25|         67226|3.7494028221191406E7|             659|       480.0|
|2025-04-01|         34404| 1.815553009716797E7|             418|       484.6|
|2025-03-26|         42411|2.3916849490234375E7|             456|       481.0|
|2025-03-24|         50581| 2.652469679989624E7|             583|       478.0|
+----------+--------------+--------------------+----

In [70]:
# Calculate 3-day and 5-day SMA
sma_window_3 = Window.orderBy("trade_date").rowsBetween(-2, 0)
sma_window_5 = Window.orderBy("trade_date").rowsBetween(-4, 0)

nabil_df4 = nabil_df3.withColumn("SMA_3", _avg("closing_rate").over(sma_window_3)) \
                   .withColumn("SMA_5", _avg("closing_rate").over(sma_window_5))

In [57]:
nabil_df4.show()

+----------+--------------+--------------------+----------------+------------+-----------------+------------------+
|trade_date|total_quantity|        total_amount|num_transactions|closing_rate|            SMA_3|             SMA_5|
+----------+--------------+--------------------+----------------+------------+-----------------+------------------+
|2025-03-24|         50581| 2.652469679989624E7|             583|       478.0|            478.0|             478.0|
|2025-03-25|         67226|3.7494028221191406E7|             659|       480.0|            479.0|             479.0|
|2025-03-26|         42411|2.3916849490234375E7|             456|       481.0|479.6666666666667| 479.6666666666667|
|2025-03-27|         34710|1.8513985205078125E7|             422|       485.0|            482.0|             481.0|
|2025-03-30|         51866| 2.586842710107422E7|             541|       491.0|485.6666666666667|             483.0|
|2025-04-01|         34404| 1.815553009716797E7|             418|       

In [71]:

vwap_df = nabil_df1.groupBy("trade_date").agg(
    (_sum(col("rate") * col("quantity")) / _sum("quantity")).alias("VWAP")
)

# Join VWAP
final_df = daily_df.join(vwap_df, on="trade_date", how="left")



+----------+--------------+------------+----------------+------------+-----------------+------------------+------------------+
|trade_date|total_quantity|total_amount|num_transactions|closing_rate|            SMA_3|             SMA_5|              VWAP|
+----------+--------------+------------+----------------+------------+-----------------+------------------+------------------+
|2025-03-24|       87135.0|      1590.2|             959|       478.0|            478.0|             478.0|482.37365393413205|
|2025-03-25|      121381.0|       763.0|            1112|       480.0|            479.0|             479.0|479.84805238928203|
|2025-03-26|       78925.0|      2108.0|             791|       481.0|479.6666666666667| 479.6666666666667| 480.7343265990103|
|2025-03-27|       65488.0|       769.0|             767|       485.0|            482.0|             481.0| 484.5674220606904|
|2025-03-30|       97919.0|      1086.0|             972|       491.0|485.6666666666667|             483.0| 489

In [72]:

window_all = Window.orderBy("trade_date").rowsBetween(Window.unboundedPreceding, 0)

final_df = final_df.withColumn("cumulative_volume", _sum("total_quantity").over(window_all)) \
                   .withColumn("cumulative_amount", _sum("total_amount").over(window_all))

In [73]:

final_df.show()

+----------+--------------+------------+----------------+------------+-----------------+------------------+------------------+-----------------+-----------------+
|trade_date|total_quantity|total_amount|num_transactions|closing_rate|            SMA_3|             SMA_5|              VWAP|cumulative_volume|cumulative_amount|
+----------+--------------+------------+----------------+------------+-----------------+------------------+------------------+-----------------+-----------------+
|2025-03-24|       87135.0|      1590.2|             959|       478.0|            478.0|             478.0|482.37365393413205|          87135.0|           1590.2|
|2025-03-25|      121381.0|       763.0|            1112|       480.0|            479.0|             479.0|479.84805238928203|         208516.0|           2353.2|
|2025-03-26|       78925.0|      2108.0|             791|       481.0|479.6666666666667| 479.6666666666667| 480.7343265990103|         287441.0|           4461.2|
|2025-03-27|       654

In [103]:
# Save result to CSV or Parquet
final_df.orderBy("trade_date").coalesce(1).write.mode("overwrite").csv("./output/nabil_stock_analysis", header=True)