# Data Engineering Pipeline

Dataset: https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset. 

# Problem 1: Raw Data Processing

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import input_file_name

spark = SparkSession.builder.appName('data_engineer_work_sample')\
    .config("spark.network.timeout", "1200s").getOrCreate()

# set the path for ETF and stock data
etf_path = 'stock-market-dataset/etfs/'
stock_path = 'stock-market-dataset/stocks/'

# Read Data using Spark's Lazy Evaluation
etf_df = spark.read.csv(etf_path, header=True, inferSchema=True) \
        .withColumn("Symbol", F.regexp_extract(F.input_file_name(), "/([^/]+)\.csv$",1))
stock_df = spark.read.csv(stock_path, header=True, inferSchema=True) \
        .withColumn("Symbol", F.regexp_extract(F.input_file_name(), "/([^/]+)\.csv$",1))

combined_df = etf_df.union(stock_df)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 18:47:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
symbols_df = spark.read.csv("stock-market-dataset/symbols_valid_meta.csv", header = True, inferSchema = True)

In [4]:
data = combined_df.join(symbols_df, on = "Symbol")

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, FloatType
from pyspark.sql.functions import col

data_schema = StructType([
    StructField("Symbol", StringType(), True),
    StructField("Security_Name", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Open", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Adj_Close", FloatType(), True),
    StructField("Volume", FloatType(), True)
])

In [6]:
#Creating Compiled Dataset
data = data.withColumn("Date", col("Date").cast("string"))
data = spark.createDataFrame(data.select("Symbol","Security Name","Date","Open","High","Low","Close","Adj Close","Volume") \
                             .rdd, schema= data_schema)
data.show()

[Stage 9:>                                                          (0 + 1) / 1]

+------+--------------------+----------+----+------+------+------+---------+-------+
|Symbol|       Security_Name|      Date|Open|  High|   Low| Close|Adj_Close| Volume|
+------+--------------------+----------+----+------+------+------+---------+-------+
|   CEF|Sprott Physical G...|1986-04-03| 0.0|  4.75| 4.625| 4.625| 4.449552|15300.0|
|   CEF|Sprott Physical G...|1986-04-04| 0.0|  4.75|4.6875|  4.75| 4.569811|12000.0|
|   CEF|Sprott Physical G...|1986-04-07| 0.0| 4.875|  4.75|  4.75| 4.569811|11500.0|
|   CEF|Sprott Physical G...|1986-04-08| 0.0|4.8125|4.6875|  4.75| 4.569811|21000.0|
|   CEF|Sprott Physical G...|1986-04-09| 0.0|4.8125| 4.625|4.6875|4.5096807|22800.0|
|   CEF|Sprott Physical G...|1986-04-10| 0.0|4.6875| 4.625| 4.625| 4.449552| 6200.0|
|   CEF|Sprott Physical G...|1986-04-11| 0.0|4.6875|4.5625| 4.625| 4.449552|37100.0|
|   CEF|Sprott Physical G...|1986-04-14| 0.0| 4.625|   4.5|4.5625|4.3894224|28200.0|
|   CEF|Sprott Physical G...|1986-04-15| 0.0|4.6875|4.5625| 4.625



In [7]:
#Saving data to Disk
data.write.mode("overwrite").parquet("pyspark_data_raw.parquet")

23/05/03 18:49:07 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 18:49:07 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/05/03 18:49:12 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 18:49:14 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 18:49:14 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/05/03 18:49:16 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 18:49:17 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014

23/05/03 18:50:09 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/05/03 18:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 18:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/05/03 18:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 18:50:11 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/05/03 18:50:14 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 18:50:14 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014

# Problem 2: Feature Engineering

In [8]:
from pyspark.sql.functions import col, avg
from pyspark.sql.window import Window

window_spec = Window.orderBy(F.col('Date').cast('timestamp')).rowsBetween(-29, 0)
feat_data = data.withColumn("vol_moving_avg", avg(col("Volume")).over(window_spec))
feat_data.show()

23/05/03 18:51:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 18:51:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 18:52:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 13:>                                                         (0 + 1) / 1]

+------+--------------------+----------+----------+-----------+----------+----------+------------+---------+------------------+
|Symbol|       Security_Name|      Date|      Open|       High|       Low|     Close|   Adj_Close|   Volume|    vol_moving_avg|
+------+--------------------+----------+----------+-----------+----------+----------+------------+---------+------------------+
|   HPQ|HP Inc. Common Stock|1962-01-02| 0.1312727|  0.1312727|0.12417688|0.12417688| 0.006887285|2480300.0|         2480300.0|
|    AA|Alcoa Corporation...|1962-01-02|  6.532155|   6.556185|  6.532155|  6.532155|   1.5366576|  55900.0|         1268100.0|
|    GE|General Electric ...|1962-01-02| 0.7512019| 0.76372194| 0.7436899|0.74869794|0.0017815924|2156500.0|1564233.3333333333|
|    IP|International Pap...|1962-01-02|       0.0|   9.064277|  8.852777|  8.852777|  0.77576894|  48600.0|         1185325.0|
|   DIS|Walt Disney Compa...|1962-01-02|0.09290839|0.096026115|0.09290839|0.09290839| 0.035517246| 81740

                                                                                

In [9]:
from pyspark.sql.functions import col, expr

window_spec = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-29, 0)
feat_data = feat_data.withColumn("adj_close_rolling_med", expr("percentile_approx(`Adj_Close`, 0.5)").over(window_spec))
feat_data.show()

23/05/03 18:54:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 18:55:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 16:>                                                         (0 + 1) / 1]

+------+--------------------+----------+---------+---------+---------+---------+---------+---------+------------------+---------------------+
|Symbol|       Security_Name|      Date|     Open|     High|      Low|    Close|Adj_Close|   Volume|    vol_moving_avg|adj_close_rolling_med|
+------+--------------------+----------+---------+---------+---------+---------+---------+---------+------------------+---------------------+
|     A|Agilent Technolog...|1999-11-18|32.546494| 35.76538|28.612303|31.473534|27.068665|6.25463E7|2471216.6666666665|            27.068665|
|     A|Agilent Technolog...|1999-11-19| 30.71352|30.758226|28.478184|28.880543|24.838577|1.52341E7| 900926.6666666666|            24.838577|
|     A|Agilent Technolog...|1999-11-22|29.551144|31.473534| 28.65701|31.473534|27.068665|6577800.0| 932106.6666666666|            27.068665|
|     A|Agilent Technolog...|1999-11-23|30.400572|31.205294|28.612303|28.612303| 24.60788|5975600.0|         2170640.0|            24.838577|
|     

                                                                                

In [10]:
feat_data.write.mode("overwrite").parquet("feature_engineered_data.parquet")

23/05/03 19:00:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 19:02:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

# Problem 3: Integrate ML Training

# Scenario 1: Machine Learning with Sci-kit Learn and Pandas Dataframes

In [2]:
import pandas as pd

ml_data = pd.read_parquet("feature_engineered_data.parquet")
ml_data.head()

Unnamed: 0,Symbol,Security_Name,Date,Open,High,Low,Close,Adj_Close,Volume,vol_moving_avg,adj_close_rolling_med
0,A,"Agilent Technologies, Inc. Common Stock",1999-11-18,32.546494,35.765381,28.612303,31.473534,27.068665,62546300.0,2471217.0,27.068665
1,A,"Agilent Technologies, Inc. Common Stock",1999-11-19,30.71352,30.758226,28.478184,28.880543,24.838577,15234100.0,900926.7,24.838577
2,A,"Agilent Technologies, Inc. Common Stock",1999-11-22,29.551144,31.473534,28.657009,31.473534,27.068665,6577800.0,932106.7,27.068665
3,A,"Agilent Technologies, Inc. Common Stock",1999-11-23,30.400572,31.205294,28.612303,28.612303,24.60788,5975600.0,2170640.0,24.838577
4,A,"Agilent Technologies, Inc. Common Stock",1999-11-24,28.701717,29.998211,28.612303,29.372318,25.261524,4843200.0,1190587.0,25.261524


In [3]:
def sample_by_symbol(group):
    return group.sample(min(len(group), 100))

# group by symbol and apply the sampling function
ml_data_sampled = ml_data.groupby('Symbol').apply(sample_by_symbol)

# reset the index of the sampled dataframe
ml_data_sampled = ml_data_sampled.reset_index(drop=True)

In [5]:
import logging
import joblib
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Assume `data` is loaded as a Pandas DataFrame
ml_data_sampled['Date'] = pd.to_datetime(ml_data_sampled['Date'])
ml_data_sampled.set_index('Date', inplace=True)

# Remove rows with NaN values
ml_data_sampled.dropna(inplace=True)

# Select features and target
features = ['vol_moving_avg', 'adj_close_rolling_med']
target = 'Volume'

X = ml_data_sampled[features]
y = ml_data_sampled[target]

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create a RandomForestRegressor model
model = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)

logger = logging.getLogger(__name__)
# Train the model
with joblib.parallel_backend('multiprocessing', n_jobs=-1):
    model.fit(X_train, y_train)

# Make predictions on test data
y_pred = model.predict(X_test)

# Calculate the Mean Absolute Error and Mean Squared Error
mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
logger.info(f'Mean Absolute Error: {mae:.4f}, Mean Squared Error: {mse:.4f}')
print(mae, mse)

1138124.584698493 53754096320277.47


In [6]:
import joblib

joblib.dump(model, 'data_engineer_work_sample_model.joblib')

['data_engineer_work_sample_model.joblib']

# Problem 4: Model Serving

In [12]:
import requests

url = 'http://127.0.0.1:8080/predict'
params = {'vol_moving_avg': 12345, 'adj_close_rolling_med': 25}
response = requests.get(url, params=params)

print(response.text)

1394


In [16]:
!curl "http://localhost:8080/predict?vol_moving_avg=12345&adj_close_rolling_med=25"

1394

# Alternate Scenario for Machine Learning Training: Machine Learning with SparkML and Pyspark Dataframes

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('data_engineer_work_sample').master("local[*]")\
    .config("spark.network.timeout", "1200s").getOrCreate()

pyspark_ml_data = spark.read.parquet("feature_engineered_data.parquet")


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 19:49:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline


(train_data, test_data)= pyspark_ml_data.randomSplit([0.8, 0.2], 1111)

# Define the feature columns
feature_cols = ["vol_moving_avg", "adj_close_rolling_med"]

# Create a vector assembler to combine the features
assembler = VectorAssembler(outputCol="features").setInputCols(["vol_moving_avg", "adj_close_rolling_med"]).setOutputCol("assembled-features")

# Create a random forest regressor
rf = RandomForestRegressor(featuresCol="assembled-features").setLabelCol("Volume")

pipeline = Pipeline(stages = [assembler,rf])

evaluator = RegressionEvaluator(labelCol="Volume", predictionCol="prediction")

# Create a parameter grid for the random forest regressor
param_grid = (ParamGridBuilder()
              .addGrid(rf.numTrees, [10, 20])
              .addGrid(rf.maxDepth, [5, 10])
              .build())

# Create a cross-validator
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=3)

# Fit the model on the training data
cv_model = cv.fit(train_data)

# Make predictions on the test data
predictions = cv_model.transform(test_data)

# Evaluate the performance of the model

rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})

# Print the results
print(f"RMSE: {rmse:.4f}")
print(f"R-squared: {r2:.4f}")
print(f"MAE: {mae:.4f}")
print(f"MSE: {mse:.4f}")