## Download the dataset

In [1]:
import warnings
import shutil
import os
warnings.filterwarnings("ignore")
from pyspark.sql import SparkSession
import opendatasets as od

## Preprocess the Dataset

In [2]:
if not os.path.exists("./archive"):
    od.download(
        "https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset")
    os.rename('stock-market-dataset', 'archive')
    path = "./archive"
    data_path = path + "/etfs/"
    output_path = path + "/output"

    source_dir = path +'/stocks'
    target_dir = path +'/etfs'
    file_names = os.listdir(source_dir)
    for file_name in file_names:
        shutil.move(os.path.join(source_dir, file_name), target_dir)

## Problem 1: Raw Data Processing

In [3]:
import os
import concurrent.futures
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder.appName("csv-to-parquet").config("spark.driver.memory", "6g").getOrCreate()

data_path = "./archive/etfs/"

# Function to read a CSV file
def read_csv_file(symbol_row):
    symbol = symbol_row["Symbol"]
    security_name = symbol_row["Security Name"]
    csv_file_path = data_path + f"{symbol}.csv"
    if not os.path.exists(csv_file_path):
        return None
    spark = SparkSession.builder.appName("csv-to-parquet").getOrCreate()
    schema = "Date DATE, Open FLOAT, High FLOAT, Low FLOAT, Close FLOAT, Adj_Close FLOAT, Volume INT"
    df = spark.read.format("csv").option("header", "true").schema(schema).load(csv_file_path)
    df = df.withColumn("Symbol", lit(symbol))
    df = df.withColumn("Security Name", lit(security_name))
    return df

# Load into a Spark dataframe
spark = SparkSession.builder.appName("csv-to-parquet").getOrCreate()
main_df = spark.read.csv(path + "/symbols_valid_meta.csv", header=True, inferSchema=True)
main_df = main_df.select("Symbol", "Security Name").distinct()

# Create an empty dataframe
combined_schema = "Date DATE, Open FLOAT, High FLOAT, Low FLOAT, Close FLOAT, Adj_Close FLOAT, Volume INT, Symbol STRING, Security_Name STRING"
combined_df = spark.createDataFrame([], schema=combined_schema)

# Implementing multithreading to read CSV files
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for symbol_row in main_df.rdd.collect():
        futures.append(executor.submit(read_csv_file, symbol_row))
    for future in concurrent.futures.as_completed(futures):
        df = future.result()
        if df is not None:
            combined_df = combined_df.union(df)

combined_df.write.mode("overwrite").parquet("./output.parquet")

spark.stop()

23/05/02 00:42:22 WARN Utils: Your hostname, Mac.local resolves to a loopback address: 127.0.0.1; using 192.168.0.27 instead (on interface en0)
23/05/02 00:42:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 00:42:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/02 02:13:33 WARN DAGScheduler: Broadcasting large task binary with size 44.1 MiB
23/05/02 02:13:51 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Date, Open, High, Low, Close, Adj Close, Volume
 Schema: Date, Open, High, Low, Close, Adj_Close, Volume
Expected: Adj_Close but found: Adj Close
CSV file: file:///Users/sunil/Downloads/Pipeline%20Project/Problem%201/archive/etfs/ABEQ.csv
23/05/02 02:13:51 WARN CSVHeaderChecker: CSV header does not conform to the 

In [4]:
spark = SparkSession.builder.appName("ReadParquetFile").getOrCreate()
df = spark.read.parquet("./output.parquet")
df.show(5)

                                                                                

+----------+---------+---------+---------+---------+----------+------+------+--------------------+
|      Date|     Open|     High|      Low|    Close| Adj_Close|Volume|Symbol|       Security_Name|
+----------+---------+---------+---------+---------+----------+------+------+--------------------+
|1962-01-02| 7.713333| 7.713333|7.6266665|7.6266665| 0.6181529|387200|   IBM|International Bus...|
|1962-01-03|7.6266665| 7.693333|7.6266665| 7.693333|0.62355584|288000|   IBM|International Bus...|
|1962-01-04| 7.693333| 7.693333| 7.613333| 7.616667|0.61734253|256000|   IBM|International Bus...|
|1962-01-05|7.6066666|7.6066666|7.4533334|7.4666667|  0.605185|363200|   IBM|International Bus...|
|1962-01-08|     7.46|     7.46| 7.266667| 7.326667| 0.5938371|544000|   IBM|International Bus...|
+----------+---------+---------+---------+---------+----------+------+------+--------------------+
only showing top 5 rows



## Problem 2: Feature Engineering

In [5]:
from pyspark.sql.functions import col, avg, percentile_approx
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadParquetFile").getOrCreate()

df = spark.read.parquet("./output.parquet")

window_spec = Window.partitionBy('Symbol').orderBy('Date').rowsBetween(-29, 0)
df = df.withColumn('vol_moving_avg', avg(col('Volume')).over(window_spec))
df = df.withColumn('adj_close_rolling_med', percentile_approx(col('Adj_Close'), 0.5).over(window_spec))

# write the DataFrame to a new location
df.write.mode("overwrite").parquet("./updated_output.parquet")

# read the updated DataFrame from the new location
updated_df = spark.read.parquet("./updated_output.parquet")

# show the first 5 rows of the updated DataFrame
updated_df.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+----------+--------+--------+--------+--------+---------+------+------+--------------------+--------------+---------------------+
|      Date|    Open|    High|     Low|   Close|Adj_Close|Volume|Symbol|       Security_Name|vol_moving_avg|adj_close_rolling_med|
+----------+--------+--------+--------+--------+---------+------+------+--------------------+--------------+---------------------+
|1962-01-02|6.532155|6.556185|6.532155|6.532155|1.5366576| 55900|    AA|Alcoa Corporation...|       55900.0|            1.5366576|
|1962-01-03|6.532155| 6.63228|6.524145| 6.63228|1.5602118| 74500|    AA|Alcoa Corporation...|       65200.0|            1.5366576|
|1962-01-04| 6.63228| 6.66432| 6.63228| 6.63228|1.5602118| 80500|    AA|Alcoa Corporation...|       70300.0|            1.5602118|
|1962-01-05| 6.63228| 6.65631| 6.61626| 6.62427|1.5583265| 70500|    AA|Alcoa Corporation...|       70350.0|            1.5583265|
|1962-01-08| 6.60825| 6.60825|6.339915|   6.408|1.5074503| 93800|    AA|Alcoa Corpo


                                                                                

In [6]:
# save the output to a temporary file location
df.write.mode("overwrite").parquet("./temp_output.parquet")

# delete the original file
shutil.rmtree("./output.parquet") 

# move the temporary file to the original file location
os.rename("./temp_output.parquet", "./output.parquet")

# read the updated DataFrame from the original location
updated_df = spark.read.parquet("./output.parquet")

# show the first 5 rows of the updated DataFrame
updated_df.show(5)
spark.stop()

                                                                                

In [7]:
spark = SparkSession.builder.appName("ReadParquetFile").getOrCreate()
df = spark.read.parquet("./output.parquet")
df.show(5)
spark.stop()


[Stage 9:>                                                          (0 + 1) / 1]

+----------+----+----+----+-----+---------+------+------+--------------------+--------------+---------------------+
|      Date|Open|High| Low|Close|Adj_Close|Volume|Symbol|       Security_Name|vol_moving_avg|adj_close_rolling_med|
+----------+----+----+----+-----+---------+------+------+--------------------+--------------+---------------------+
|2012-12-13|15.0|15.0|15.0| 15.0|     15.0|   100|  AAMC|Altisource Asset ...|         100.0|                 15.0|
|2012-12-14|19.0|30.0|19.0| 30.0|     30.0|144600|  AAMC|Altisource Asset ...|       72350.0|                 15.0|
|2012-12-17|31.5|65.0|31.5| 65.0|     65.0| 68600|  AAMC|Altisource Asset ...|       71100.0|                 30.0|
|2012-12-18|65.0|89.0|65.0| 80.0|     80.0| 43600|  AAMC|Altisource Asset ...|       64225.0|                 30.0|
|2012-12-19|80.0|84.0|78.0| 84.0|     84.0| 24000|  AAMC|Altisource Asset ...|       56180.0|                 65.0|
+----------+----+----+----+-----+---------+------+------+---------------


                                                                                

## Problem 3: Integrate ML Training

In [26]:
#!/usr/bin/env python
# coding: utf-8

# In[1]:


from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressionModel

spark = SparkSession.builder.appName("RandomForestRegressor").config("spark.driver.memory", "6g").getOrCreate()

df = spark.read.parquet("output.parquet")
df = df.na.drop()

# Define the input and output columns
features = ['vol_moving_avg', 'adj_close_rolling_med']
label = 'Volume'

# Convert the label column to numerical values
labelIndexer = StringIndexer(inputCol=label, outputCol="label").fit(df)
df = labelIndexer.transform(df)

# Create a vector assembler to combine the input features into a single vector column
assembler = VectorAssembler(inputCols=features, outputCol="features")
df = assembler.transform(df)

# Split the data into training and test sets
(trainingData, testData) = df.randomSplit([0.8, 0.2])
rf = RandomForestRegressor(numTrees=100)

model = rf.fit(trainingData)
model.write().overwrite().save("trained_model")
# model.save("trained_model")
model = RandomForestRegressionModel.load("trained_model")

# Test data prediction
predictions = model.transform(testData)

# Calculating MAE and MSE
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)

print("Model Trained Successfully !!!")
print(f"MAE: {mae}")
print(f"MSE: {mse}")

spark.stop()

print("Script 2 is completed - Machine Learning Model has been made.")

23/05/02 16:25:03 WARN DAGScheduler: Broadcasting large task binary with size 15.1 MiB
23/05/02 16:25:24 WARN DAGScheduler: Broadcasting large task binary with size 15.1 MiB
23/05/02 16:29:16 WARN DAGScheduler: Broadcasting large task binary with size 15.1 MiB
23/05/02 16:33:10 WARN DAGScheduler: Broadcasting large task binary with size 15.1 MiB
23/05/02 16:38:25 WARN MemoryStore: Not enough space to cache rdd_25_0 in memory! (computed 562.4 MiB so far)
23/05/02 16:38:26 WARN BlockManager: Persisting block rdd_25_0 to disk instead.
23/05/02 16:38:26 WARN MemoryStore: Not enough space to cache rdd_25_1 in memory! (computed 562.4 MiB so far)
23/05/02 16:38:26 WARN BlockManager: Persisting block rdd_25_1 to disk instead.
23/05/02 16:38:26 WARN MemoryStore: Not enough space to cache rdd_25_2 in memory! (computed 562.4 MiB so far)
23/05/02 16:38:26 WARN BlockManager: Persisting block rdd_25_2 to disk instead.
23/05/02 16:38:33 WARN MemoryStore: Not enough space to cache rdd_25_3 in memory! 

Model Trained Successfully !!!
MAE: 7296.475693443471
MSE: 708927415.9507552


## Problem 4: Model Serving

In [None]:
from pyspark.sql import SparkSession
from fastapi import FastAPI, status
from pydantic import BaseModel, root_validator
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml.linalg import Vectors

app = FastAPI()

spark = SparkSession.builder.appName("myApp").getOrCreate()
model = RandomForestRegressionModel.load("trained_model")

class PredictionRequest(BaseModel):
    vol_moving_avg: float
    adj_close_rolling_med: float

class PredictionResponse(BaseModel):
    prediction: float
    @root_validator
    def validate_prediction(cls, values):
        values['prediction'] = round(values['prediction'], 2)
        return values


@app.post("/", response_model=PredictionResponse, status_code=status.HTTP_200_OK)
def predict(request: PredictionRequest):
    
    # Converting request payload to a PySpark vector
    features = Vectors.dense([
        request.vol_moving_avg,
        request.adj_close_rolling_med
    ])
    
    prediction = model.predict(features)
    return PredictionResponse(prediction = prediction)

print("Successful !!!")

In [2]:
import subprocess
import time

# Start the server in a new process
server_process = subprocess.Popen(['python', 'FastAPI_Server.py'])

# Wait for the server to start up
time.sleep(30)

# Send a sample request to the server
client_process = subprocess.Popen(['python', 'Sample_Input.py'])

# Wait for the client to finish
client_process.wait()

# Stop the server
server_process.terminate()

23/05/02 19:57:30 WARN Utils: Your hostname, Mac.local resolves to a loopback address: 127.0.0.1; using 192.168.0.27 instead (on interface en0)
23/05/02 19:57:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 19:57:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO:     Started server process [32526]                                        
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)


Successful !!!
Starting the Server !!!
INFO:     ::1:49632 - "POST / HTTP/1.1" 200 OK
200
{'prediction': 3446.69}


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [32526]


## Other Files

#### API Server

In [None]:
import uvicorn
from script_3 import app

if __name__ == "__main__":
    print("Starting the Server !!!")
    uvicorn.run(app, host="localhost", port=8000)

### Sample Input

In [None]:
import requests

url = "http://localhost:8000/"

payload = {
    "vol_moving_avg": 123.45,
    "adj_close_rolling_med": 678.90
}

response = requests.post(url, json=payload)

print(response.status_code)
print(response.json())

### API Unit Testing

In [None]:
from fastapi.testclient import TestClient
import pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.sql import SparkSession
from script_3 import app


spark = SparkSession.builder.appName("myApp").getOrCreate()
model = RandomForestRegressionModel.load("trained_model")

client = TestClient(app)

def test_predict():
    payload = {"vol_moving_avg": 100.0, "adj_close_rolling_med": 50.0}
    features = Vectors.dense([payload["vol_moving_avg"], payload["adj_close_rolling_med"]])
    prediction = model.predict(features)
    expected_response = {"prediction": round(prediction, 2)}
    
    response = client.post("/", json=payload)
    
    assert response.status_code == 200
    assert response.json() == expected_response
