In [1]:
import os, sys, logging
from datetime import datetime
from typing import Dict

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import(
    col, when, lag, avg, stddev, max as spark_max, min as spark_min,
    count, lit, round as spark_round,
    log as spark_log, abs as spark_abs, current_timestamp
)
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType, StructField, StringType, DateType,
    DecimalType, IntegerType
)

In [2]:
schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("date", DateType(), True),
    StructField("open", DecimalType(10, 2), True),
    StructField("high", DecimalType(10, 2), True),
    StructField("low", DecimalType(10, 2), True),
    StructField("close", DecimalType(10, 2), True),
    StructField("volume", IntegerType(), True),
])

In [21]:
MASTER:   str = os.getenv('SPARK_MASTER', 'local[*]')

# Spark SQL warehouse directory
WAREHOUSE_DIR: str = os.path.join(os.getcwd(), 'spark-warehouse')

# Memory and performance tuning
DRIVER_MEMORY:   str = '2g'
EXECUTOR_MEMORY: str = '2g'

# Spark configuration dictionary
SPARK_CONF: Dict[str,str] = {
    'spark.sql.adaptive.enabled': 'true',
    'spark.sql.adaptive.coalescePartitions.enabled': 'true',
    'spark.sql.warehouse.dir': WAREHOUSE_DIR,
    'spark.driver.memory': DRIVER_MEMORY,
    'spark.executor.memory': EXECUTOR_MEMORY
}

builder = SparkSession.builder.appName('StockETL')
# Apply configuration from settings
for key, value in SPARK_CONF.items():
    builder = builder.config(key, value)
spark = builder.getOrCreate()

path = 'data/raw/'

df = spark.read.csv(path,
                   header=True,
                   schema = schema,
                   timestampFormat = 'yyyy-MM-dd',
                   comment='#')

df.show(5)

+------+----------+------+------+------+------+--------+
|symbol|      date|  open|  high|   low| close|  volume|
+------+----------+------+------+------+------+--------+
|  AAPL|2025-04-24|204.89|208.83|202.94|208.37|47310989|
|  AAPL|2025-04-25|206.37|209.75|206.20|209.28|38222258|
|  AAPL|2025-04-28|210.00|211.50|207.46|210.14|38743074|
|  AAPL|2025-04-29|208.69|212.24|208.37|211.21|36827633|
|  AAPL|2025-04-30|209.30|213.58|206.67|212.50|52286454|
+------+----------+------+------+------+------+--------+
only showing top 5 rows


25/10/03 12:06:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/03 12:02:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+--------------------+----+----+----+----+-----+------+
|              symbol|date|open|high| low|close|volume|
+--------------------+----+----+----+----+-----+------+
|  # Symbols: ['AAPL'|NULL|NULL|NULL|NULL| NULL|  NULL|
|      # Records: 800|NULL|NULL|NULL|NULL| NULL|  NULL|
|# Date range: 202...|NULL|NULL|NULL|NULL| NULL|  NULL|
|   # Columns: symbol|NULL|NULL|NULL|NULL| NULL|  NULL|
|              symbol|NULL|NULL|NULL|NULL| NULL|  NULL|
+--------------------+----+----+----+----+-----+------+
only showing top 5 rows


25/10/03 12:04:25 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 7
CSV file: file:///Users/justinclancy/Documents/Projects/Stock_ETL_Pipeline2/data/raw/stock_data_20250917_115419.csv
