In [26]:
import sys
from pathlib import Path

PROJECT_ROOT = Path().resolve().parents[0]
print(PROJECT_ROOT)
sys.path.append(str(PROJECT_ROOT))


/Users/samanehhajigholam/Desktop/equity-analytics-pipeline


In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, trim, regexp_replace, when
from pyspark.sql.types import DoubleType
from src.utils.config import Paths

spark = SparkSession.builder.appName("Test Bronze to Silver").getOrCreate()
paths = Paths()

# Read bronze
df = spark.read.parquet(paths.BRONZE_COMPANIES_DIR)
df.show(5)

df = (
        df
        .withColumnRenamed("market cap", "market_cap")
    )
# Apply numeric casting fix
df = df.withColumn(
    "market_cap",
    when(
        regexp_replace(col("market_cap"), ",", "").rlike("^[0-9]+(\\.[0-9]+)?$"),
        regexp_replace(col("market_cap"), ",", "").cast(DoubleType())
    ).otherwise(None)
)

# Check results
df.select("ticker", "market_cap").show(20)


+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-----------+------------------+------------------+--------------------+--------------------+
|ticker|        company name|          short name|            industry|         description|             website|    logo|                 ceo|            exchange| market cap|            sector|             tag 1|               tag 2|               tag 3|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-----------+------------------+------------------+--------------------+--------------------+
|     A|Agilent Technolog...|             Agilent|Medical Diagnosti...|Agilent Technolog...|http://www.agilen...|   A.png| Michael R. McMullen|New York Stock Ex...|24218068096|        Healthcare|        Healthcare|Diagnostics & R

In [28]:
# Read bronze
df = spark.read.parquet(paths.BRONZE_PRICES_DIR)
df.show(5)
df.printSchema()


+----------+--------------------+-------------------+-------------------+-------------------+-------------------+-----------+------+
|      Date|           Adj Close|              Close|               High|                Low|               Open|     Volume|ticker|
+----------+--------------------+-------------------+-------------------+-------------------+-------------------+-----------+------+
|1986-03-13|0.059826720505952835|0.09722200036048889|    0.1015629991889|0.08854199945926666|0.08854199945926666|1.0317888E9|  MSFT|
|1986-03-14|  0.0619632825255394|0.10069400072097778|0.10243099927902222|0.09722200036048889|0.09722200036048889|   3.0816E8|  MSFT|
|1986-03-17| 0.06303215026855469|0.10243099927902222|0.10329899936914444|0.10069400072097778|0.10069400072097778| 1.331712E8|  MSFT|
|1986-03-18| 0.06142912432551384|0.09982600063085556|0.10329899936914444|0.09895800054073334|0.10243099927902222|  6.77664E7|  MSFT|
|1986-03-19|0.060360878705978394|0.09809000045061111|0.10069400072097

In [33]:
silver_df = spark.read.parquet(paths.SILVER_PRICES_DIR)

silver_df.filter(col("ticker") == "MSFT") \
  .orderBy(col("date").desc()) \
  .show(5)

+----------+------------------+------------------+------------------+-----------------+------------------+--------+------+
|      date|         adj_close|             close|              high|              low|              open|  volume|ticker|
+----------+------------------+------------------+------------------+-----------------+------------------+--------+------+
|2025-02-05| 413.2900085449219| 413.2900085449219| 413.8269958496094|410.4200134277344|  411.635009765625|15034087|  MSFT|
|2025-02-04| 412.3699951171875| 412.3699951171875| 413.9200134277344| 409.739990234375|412.69000244140625|20532100|  MSFT|
|2025-02-03| 410.9200134277344| 410.9200134277344| 415.4100036621094|408.6600036621094| 411.6000061035156|25679100|  MSFT|
|2025-01-31|415.05999755859375|415.05999755859375|420.69000244140625|414.9100036621094| 418.9800109863281|34223400|  MSFT|
|2025-01-30|  414.989990234375|  414.989990234375| 422.8599853515625|413.1600036621094| 418.7699890136719|54586300|  MSFT|
+----------+----

26/01/04 02:52:02 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 992959 ms exceeds timeout 120000 ms
26/01/04 02:52:02 WARN SparkContext: Killing executors is not supported by current scheduler.
26/01/04 03:09:22 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:359)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:674)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1324)
	at o

In [5]:
# Read bronze
bronze_df = spark.read.parquet(paths.BRONZE_COMPANIES_DIR)
bronze_df.show(5)


+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-----------+------------------+------------------+--------------------+--------------------+
|ticker|        company name|          short name|            industry|         description|             website|    logo|                 ceo|            exchange| market cap|            sector|             tag 1|               tag 2|               tag 3|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-----------+------------------+------------------+--------------------+--------------------+
|     A|Agilent Technolog...|             Agilent|Medical Diagnosti...|Agilent Technolog...|http://www.agilen...|   A.png| Michael R. McMullen|New York Stock Ex...|24218068096|        Healthcare|        Healthcare|Diagnostics & R

+------+--------------------+-----------------+------------------+--------------------+--------------------+---------------+--------------------+
|ticker|        company_name|       short_name|            sector|            industry|            exchange|     market_cap|             website|
+------+--------------------+-----------------+------------------+--------------------+--------------------+---------------+--------------------+
|     A|Agilent Technolog...|          Agilent|        Healthcare|Medical Diagnosti...|New York Stock Ex...|2.4218068096E10|http://www.agilen...|
|    AA|   Alcoa Corporation|            Alcoa|   Basic Materials|     Metals & Mining|New York Stock Ex...|  5.374966543E9|http://www.alcoa.com|
|  AABA|         Altaba Inc.|           Altaba|Financial Services|    Asset Management|Nasdaq Global Select|4.1223683414E10|http://www.altaba...|
|   AAC|   AAC Holdings Inc.|              AAC|        Healthcare|Health Care Provi...|New York Stock Ex...|    6.3720099E7|

In [16]:
prices = spark.read.parquet(paths.SILVER_PRICES_DIR)
prices.show(5)
from pyspark.sql.functions import isnan

class DataQualityError(RuntimeError):
    """Custom exception for data quality issues."""
    pass


def _require_columns(df, required, dataset_name: str) -> None:
    cols = set([c.lower() for c in df.columns])
    missing = [c for c in required if c.lower() not in cols]
    if missing:
        raise DataQualityError(f"[{dataset_name}] Missing required columns: {missing}")


def _fail_if(df, condition, message: str) -> None:
    bad = df.filter(condition).limit(1).count()
    if bad > 0:
        raise DataQualityError(message)
    
# Required columns
_require_columns(prices, ["date", "open", "high", "low", "close", "adj_close", "volume", "ticker"], "silver_prices")

# Null checks on core fields
# for c in ["date", "ticker", "adj_close", "volume"]:
#     _fail_if(prices, col(c).isNull() | isnan(col(c)), f"[silver_prices] Null/NaN found in {c}")

# Null checks (non-numeric)
for c in ["date", "ticker"]:
    _fail_if(
        prices,
        col(c).isNull(),
        f"[silver_prices] Null found in {c}"
    )

# Null / NaN checks (numeric only)
for c in ["adj_close", "volume"]:
    _fail_if(
        prices,
        col(c).isNull() | isnan(col(c)),
        f"[silver_prices] Null/NaN found in {c}"
    )


+----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+------+
|      date|          adj_close|              close|               high|                low|               open|  volume|ticker|
+----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+------+
|1986-03-20|0.05875847116112709|0.09548600018024445|0.09809000045061111|0.09461800009012222|0.09809000045061111|58435200|  MSFT|
|1986-03-24|0.05555365979671478|0.09027799963951111|0.09288199990987778|0.08940999954938889|0.09288199990987778|65289600|  MSFT|
|1986-04-14| 0.0619632825255394|0.10069400072097778|    0.1015629991889|0.09982600063085556|0.09982600063085556|12153600|  MSFT|
|1986-04-21|0.06249803304672241|    0.1015629991889|0.10243099927902222|0.09895800054073334|    0.1015629991889|22924800|  MSFT|
|1986-04-30|0.06890761852264404|0.11197900027036667|0.11545100063085556|           0.109375|0.114

In [38]:
    # Non-negative prices and volume
_fail_if(prices, (col("open") < 0) | (col("high") < 0) | (col("low") < 0) |
                    (col("close") < 0) | (col("adj_close") < 0),
            "[silver_prices] Negative price found")
_fail_if(prices, col("volume") < 0, "[silver_prices] Negative volume found")

# High >= Low
_fail_if(prices, col("high") < col("low"), "[silver_prices] Found high < low")

In [39]:
dup_count = prices.groupBy("ticker", "date").count().filter(col("count") > 1).limit(1).count()
if dup_count > 0:
    raise DataQualityError("[silver_prices] Duplicate (ticker, date) rows found")

In [61]:
enriched = spark.read.parquet(paths.GOLD_ENRICHED_DIR)
_require_columns(enriched, ["ticker", "date", "adj_close", "company_name", "sector", "industry"], "gold_prices_enriched")

# Join success: sector and industry should not be null
_fail_if(enriched, col("sector").isNull(), "[gold_prices_enriched] sector is null (join likely failed)")
_fail_if(enriched, col("industry").isNull(), "[gold_prices_enriched] industry is null (join likely failed)")

# Daily return sanity range
_fail_if(enriched, (col("daily_return") < -0.9) | (col("daily_return") > 0.9),
            "[gold_prices_enriched] Extreme daily_return values detected")


In [49]:
prices = spark.read.parquet(paths.SILVER_PRICES_DIR)
companies = spark.read.parquet(paths.SILVER_COMPANIES_DIR)

prices.select("ticker").distinct().count()
companies.select("ticker").distinct().count()

prices.join(companies, "ticker", "left") \
      .filter(col("sector").isNull()) \
      .select("ticker") \
      .distinct() \
      .show(20, False)


+------+
|ticker|
+------+
|SPY   |
+------+



In [57]:
# Check for mismatches
prices_tickers = prices.select("ticker").distinct()
companies_tickers = companies.select("ticker").distinct()

print(f"Prices tickers: {prices_tickers.count()}")
print(f"Companies tickers: {companies_tickers.count()}")

# Find tickers in prices but not in companies
missing = prices_tickers.join(companies_tickers, on="ticker", how="left_anti")
print(f"Tickers in prices but not in companies: {missing.count()}")
missing.show(20, truncate=False)

Prices tickers: 3
Companies tickers: 5469
Tickers in prices but not in companies: 1
+------+
|ticker|
+------+
|SPY   |
+------+



In [60]:
#is spy there?
spark.read.parquet(paths.GOLD_ENRICHED_DIR) \
     .select("ticker") \
     .distinct() \
     .orderBy("ticker") \
     .show(50, False)


+------+
|ticker|
+------+
|MSFT  |
|NVDA  |
|SPY   |
+------+



In [2]:
!pytest ../tests/unit -v --maxfail=1 --disable-warnings

platform darwin -- Python 3.12.4, pytest-9.0.2, pluggy-1.5.0 -- /Users/samanehhajigholam/Desktop/equity-analytics-pipeline/venv/bin/python3.12
cachedir: .pytest_cache
rootdir: /Users/samanehhajigholam/Desktop/equity-analytics-pipeline
plugins: time-machine-2.16.0, anyio-4.7.0
collected 0 items / 1 error                                                    [0m

[31m[1m_____________ ERROR collecting tests/unit/test_bronze_to_silver.py _____________[0m
[31mImportError while importing test module '/Users/samanehhajigholam/Desktop/equity-analytics-pipeline/tests/unit/test_bronze_to_silver.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
[1m[31m/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/importlib/__init__.py[0m:90: in import_module
    [0m[94mreturn[39;49;00m _bootstrap._gcd_import(name[level:], package, level)[90m[39;49;00m
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[90m

In [6]:
import os
from src.utils.config import Paths
paths = Paths()
print(os.path.exists(paths.GOLD_ANALYTICS_DIR))


True
