In [1]:
# !pip install pyspark 
from pyspark.sql import SparkSession

In [2]:
from pyspark import SparkConf
# conf = SparkConf().set("spark.speculation","false")
# spark = SparkSession.builder.appName("random_reg").getOrCreate()
spark = SparkSession.builder.appName("random_reg").config("spark.sql.parquet.compression.codec", "lz4").getOrCreate()

In [35]:
today = datetime.today().strftime('%Y%m%d')
feature_engineering_path = "/usr/local/spark/staging/" + today + "/feature_engineering/"

full_path = os.path.join(feature_engineering_path, '*', '*.csv')
stock_df = spark.read.format("csv").load(full_path,header=True)
stock_df.show(1)

+----+----------------------------------+----------+-------------------+------------------+-------------------+------------------+-------------------+-----+-------+--------------------+
|ACSI|American Customer Satisfaction ETF|2016-11-07|24.8899993896484383|25.010000228881836|24.8899993896484385|24.989999771118164|23.8709430694580087|62300|62300.0|23.87094306945800810|
+----+----------------------------------+----------+-------------------+------------------+-------------------+------------------+-------------------+-----+-------+--------------------+
|ACSI|              American Customer...|2016-11-08| 27.010000228881836|27.010000228881836| 25.010000228881836|25.200000762939453|   24.0715389251709|20300|41300.0|  23.870943069458008|
+----+----------------------------------+----------+-------------------+------------------+-------------------+------------------+-------------------+-----+-------+--------------------+
only showing top 1 row



In [59]:
stock_df.printSchema()

root
 |-- Symbol: string (nullable = true)
 |-- Security Name: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- vol_moving_avg: double (nullable = true)
 |-- adj_close_rolling_med: double (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [60]:
stock_df = stock_df.dropna()

In [61]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['vol_moving_avg', 'adj_close_rolling_med'], outputCol = 'features')
vstock_df = vectorAssembler.transform(stock_df)
vstock_df = vstock_df.select('features', 'Volume')
vstock_df.show(3)

+--------------------+------+
|            features|Volume|
+--------------------+------+
|[206633.333333333...|166500|
|[206570.0,11.9849...| 25400|
|[193106.666666666...| 24500|
+--------------------+------+
only showing top 3 rows



In [62]:
splits = vstock_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [63]:
from pyspark.ml.regression import RandomForestRegressor

In [64]:
lr = RandomForestRegressor(featuresCol = 'features', labelCol='Volume')
lr_model = lr.fit(train_df)

In [65]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","Volume","features").show(5)

+------------------+------+--------------------+
|        prediction|Volume|            features|
+------------------+------+--------------------+
|15725.513496688707| 30300|[14480.0,12.90999...|
|15128.879592409294| 10900|[14883.3333333333...|
|15725.513496688707| 13600|[15090.0,12.92999...|
|14795.567904097601| 14300|[15156.6666666666...|
|15725.513496688707| 13100|[15193.3333333333...|
+------------------+------+--------------------+
only showing top 5 rows



In [66]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Volume")
print("MAE on test data = %g" % lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "mae"}))

MAE on test data = 46352


In [2]:
fullPath = '/usr/local/spark/staging/20230513/feature_engineering/etfs/*.parquet'
df = spark.read.format("parquet") \
   .load(fullPath)
df.show(1)

+------+--------------------+----------+-------+--------+--------+-------+-----------------+-------+--------------+---------------------+-----------------+
|Symbol|       Security Name|      Date|   Open|    High|     Low|  Close|        Adj Close| Volume|vol_moving_avg|adj_close_rolling_med|__index_level_0__|
+------+--------------------+----------+-------+--------+--------+-------+-----------------+-------+--------------+---------------------+-----------------+
|   DIA|SPDR Dow Jones In...|1998-01-20|77.8125|78.84375|77.40625|78.8125|48.57304763793945|1744600|          null|                 null|                0|
+------+--------------------+----------+-------+--------+--------+-------+-----------------+-------+--------------+---------------------+-----------------+
only showing top 1 row



In [3]:
df.count()

1809784

In [195]:
from datetime import datetime
import os
from pyspark.sql.functions import lit
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import element_at, split, col

type = "etfs"
input_path = "/usr/local/spark/data"
stage_path = "/usr/local/spark/staging"
today = datetime.today().strftime('%Y%m%d')

# Set the input paths
output_path = stage_path + "/" + today + "/raw_data_processing/" + type

# Read the CSV files into Pandas dataframes
symbols_valid_meta = spark.read.csv(os.path.join(input_path, "symbols_valid_meta.csv"), header='true')
symbols_valid_meta = symbols_valid_meta.select(['Symbol', 'Security Name'])
symbols_valid_meta.show(2)

+------+--------------------+
|Symbol|       Security Name|
+------+--------------------+
|     A|Agilent Technolog...|
|    AA|Alcoa Corporation...|
+------+--------------------+
only showing top 2 rows



In [172]:
file = 'AAAU.csv'
stock_df = spark.read.csv(os.path.join(input_path, type, file), header='true')
symbol = file.replace('.csv', '')
stock_df = stock_df.withColumn("Symbol_", lit(symbol))
stock_df.show(1)

+----------+-----------------+-----------------+------------------+------------------+------------------+------+-------+
|      Date|             Open|             High|               Low|             Close|         Adj Close|Volume|Symbol_|
+----------+-----------------+-----------------+------------------+------------------+------------------+------+-------+
|2018-08-15|11.84000015258789|11.84000015258789|11.739999771118164|11.739999771118164|11.739999771118164| 27300|   AAAU|
+----------+-----------------+-----------------+------------------+------------------+------------------+------+-------+
only showing top 1 row



In [196]:
full_path = os.path.join(input_path, type, '*.csv')
stock_df = spark.read.format("csv").load(full_path, header='true')
# symbol = file.replace('.csv', '')
# stock_df = stock_df.withColumn("Symbol_", lit(symbol))
stock_df.show(1)

+----------+----+----+-----+-----+-----------------+------+
|      Date|Open|High|  Low|Close|        Adj Close|Volume|
+----------+----+----+-----+-----+-----------------+------+
|1986-04-03| 0.0|4.75|4.625|4.625|4.449552059173584| 15300|
+----------+----+----+-----+-----+-----------------+------+
only showing top 1 row



In [197]:
stock_df = stock_df.withColumn("filename", input_file_name())
stock_df.show(2, truncate=False)

+----------+----+----+------+-----+-----------------+------+---------------------------------------+
|Date      |Open|High|Low   |Close|Adj Close        |Volume|filename                               |
+----------+----+----+------+-----+-----------------+------+---------------------------------------+
|1986-04-03|0.0 |4.75|4.625 |4.625|4.449552059173584|15300 |file:/usr/local/spark/data/etfs/CEF.csv|
|1986-04-04|0.0 |4.75|4.6875|4.75 |4.56981086730957 |12000 |file:/usr/local/spark/data/etfs/CEF.csv|
+----------+----+----+------+-----+-----------------+------+---------------------------------------+
only showing top 2 rows



In [198]:
stock_df = stock_df.withColumn('filename', split(stock_df.filename, '/'))
stock_df = stock_df.withColumn('filename', element_at(col('filename'), -1) )
stock_df = stock_df.withColumn('Symbol_', split(stock_df.filename, '\.')[0])
stock_df = stock_df.drop('filename')
stock_df.show(5, truncate=False)

+----------+----+------+------+------+-----------------+------+-------+
|Date      |Open|High  |Low   |Close |Adj Close        |Volume|Symbol_|
+----------+----+------+------+------+-----------------+------+-------+
|1986-04-03|0.0 |4.75  |4.625 |4.625 |4.449552059173584|15300 |CEF    |
|1986-04-04|0.0 |4.75  |4.6875|4.75  |4.56981086730957 |12000 |CEF    |
|1986-04-07|0.0 |4.875 |4.75  |4.75  |4.56981086730957 |11500 |CEF    |
|1986-04-08|0.0 |4.8125|4.6875|4.75  |4.56981086730957 |21000 |CEF    |
|1986-04-09|0.0 |4.8125|4.625 |4.6875|4.50968074798584 |22800 |CEF    |
+----------+----+------+------+------+-----------------+------+-------+
only showing top 5 rows



In [199]:
columns = ['Symbol', 'Security Name', 'Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']
stock_df = stock_df.join(symbols_valid_meta, stock_df.Symbol_ ==  symbols_valid_meta.Symbol,"left").drop('Symbol_')
stock_df = stock_df.select(columns)
stock_df.show(1, truncate=False)

+------+-------------------------------------------+----------+----+----+-----+-----+-----------------+------+
|Symbol|Security Name                              |Date      |Open|High|Low  |Close|Adj Close        |Volume|
+------+-------------------------------------------+----------+----+----+-----+-----+-----------------+------+
|CEF   |Sprott Physical Gold and Silver Trust Units|1986-04-03|0.0 |4.75|4.625|4.625|4.449552059173584|15300 |
+------+-------------------------------------------+----------+----+----+-----+-----+-----------------+------+
only showing top 1 row



In [200]:
stock_df.write.parquet(output_path + ".parquet")

<H1>FEATURE ENGINEERING<H1>

In [3]:
import os
import sys
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import element_at, split, col
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("random_reg").getOrCreate()

type = "etfs"
today = datetime.today().strftime('%Y%m%d')
raw_data_processing_path = "/usr/local/spark/staging/" + today + "/raw_data_processing/"
output_path = "/usr/local/spark/staging/" + today + "/feature_engineering/"

In [17]:
full_path = os.path.join(raw_data_processing_path, type, '*.csv')
stock_df = spark.read.format("csv").load(full_path,header=True)
stock_df.show(truncate=False)

+------+-------------------------------------------+----------+----+------+------+------+-----------------+------+
|Symbol|Security Name                              |Date      |Open|High  |Low   |Close |Adj Close        |Volume|
+------+-------------------------------------------+----------+----+------+------+------+-----------------+------+
|CEF   |Sprott Physical Gold and Silver Trust Units|1986-04-03|0.0 |4.75  |4.625 |4.625 |4.449552059173584|15300 |
|CEF   |Sprott Physical Gold and Silver Trust Units|1986-04-04|0.0 |4.75  |4.6875|4.75  |4.56981086730957 |12000 |
|CEF   |Sprott Physical Gold and Silver Trust Units|1986-04-07|0.0 |4.875 |4.75  |4.75  |4.56981086730957 |11500 |
|CEF   |Sprott Physical Gold and Silver Trust Units|1986-04-08|0.0 |4.8125|4.6875|4.75  |4.56981086730957 |21000 |
|CEF   |Sprott Physical Gold and Silver Trust Units|1986-04-09|0.0 |4.8125|4.625 |4.6875|4.50968074798584 |22800 |
|CEF   |Sprott Physical Gold and Silver Trust Units|1986-04-10|0.0 |4.6875|4.625

In [26]:
from pyspark.sql.functions import col, avg, expr
from pyspark.sql.window import Window

# Define the window specification partitioned by the stock symbol and ordered by the date
window_spec = Window.partitionBy("Symbol").orderBy("Date")

# Calculate the rolling average of the trading volume (Volume)
# df_with_rolling_avg = stock_df.withColumn("vol_moving_avg", col("Volume").rolling(30).mean().over(window_spec))
df_with_rolling_avg = stock_df.withColumn("vol_moving_avg", avg(col("Volume")).over(window_spec))

# Calculate the rolling median of the Adjusted Close (Adj Close)
# df_with_rolling_avg_and_median = df_with_rolling_avg.withColumn("adj_close_rolling_med", col("Adj Close").rolling(30).median().over(window_spec))
df_with_rolling_avg_and_median = df_with_rolling_avg.withColumn("adj_close_rolling_med", expr("percentile_approx(`Adj Close`, 0.5)").over(window_spec))
df_with_rolling_avg_and_median.show()

+------+--------------------+----------+------------------+------------------+------------------+------------------+------------------+------+------------------+---------------------+
|Symbol|       Security Name|      Date|              Open|              High|               Low|             Close|         Adj Close|Volume|    vol_moving_avg|adj_close_rolling_med|
+------+--------------------+----------+------------------+------------------+------------------+------------------+------------------+------+------------------+---------------------+
|  ACSI|American Customer...|2016-11-07|24.889999389648438|25.010000228881836|24.889999389648438|24.989999771118164|23.870943069458008| 62300|           62300.0|   23.870943069458008|
|  ACSI|American Customer...|2016-11-08|27.010000228881836|27.010000228881836|25.010000228881836|25.200000762939453|  24.0715389251709| 20300|           41300.0|   23.870943069458008|
|  ACSI|American Customer...|2016-11-09|  25.1200008392334| 25.43000030517578|  

In [34]:
print(output_path+type+".parquet")
df_with_rolling_avg_and_median.write.csv(output_path+type+".csv")

/usr/local/spark/staging/20230524/feature_engineering/etfs.parquet


<h1> ML train </h1>

In [5]:
import os
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator


spark = SparkSession.builder.appName("random_reg").getOrCreate()

In [7]:
# Get the second argument passed to spark-submit (the first is the python app)
# type = "etfs"
#type = sys.argv[1]
today = datetime.today().strftime('%Y%m%d')
feature_engineering_path = "/usr/local/spark/staging/20230527/feature_engineering/"

full_path = os.path.join(feature_engineering_path, '*', '*.csv')
stock_df = spark.read.format("csv").load(full_path,header=True)

stock_df = stock_df.dropna()

In [6]:
stock_df.show(2)

+------+--------------------+----------+------------------+------------------+------------------+------------------+------------------+------+--------------+---------------------+
|Symbol|       Security Name|      Date|              Open|              High|               Low|             Close|         Adj Close|Volume|vol_moving_avg|adj_close_rolling_med|
+------+--------------------+----------+------------------+------------------+------------------+------------------+------------------+------+--------------+---------------------+
|  ACSI|American Customer...|2016-11-07|24.889999389648438|25.010000228881836|24.889999389648438|24.989999771118164|23.870943069458008| 62300|       62300.0|   23.870943069458008|
|  ACSI|American Customer...|2016-11-08|27.010000228881836|27.010000228881836|25.010000228881836|25.200000762939453|  24.0715389251709| 20300|       41300.0|   23.870943069458008|
+------+--------------------+----------+------------------+------------------+------------------+---

In [8]:
feature_df = stock_df.selectExpr("cast(vol_moving_avg as float) vol_moving_avg", 
                    "cast(adj_close_rolling_med as float) adj_close_rolling_med", 
                    "cast(Volume as float) Volume")
vectorAssembler = VectorAssembler(inputCols = ['vol_moving_avg', 'adj_close_rolling_med'], outputCol = 'features')
vstock_df = vectorAssembler.transform(feature_df)
vstock_df = vstock_df.select('features', 'Volume')

In [8]:
# stock_df.withColumn("vol_moving_avg1", stock_df("vol_moving_avg").cast(DoubleType))
vstock_df.show()

+--------------------+-------+
|            features| Volume|
+--------------------+-------+
|[62300.0,23.87094...|62300.0|
|[41300.0,23.87094...|20300.0|
|[33733.33203125,2...|18600.0|
|[30050.0,24.07153...|19000.0|
|[27720.0,24.29123...|18400.0|
|[26450.0,24.29123...|20100.0|
|[25585.71484375,2...|20400.0|
|[24687.5,24.40586...|18400.0|
|[23322.22265625,2...|12400.0|
|[22240.0,24.54915...|12500.0|
|[21527.2734375,24...|14400.0|
|[20825.0,24.77840...|13100.0|
|[20400.0,24.84526...|15300.0|
|[19657.142578125,...|10000.0|
|[19246.666015625,...|13500.0|
|[18843.75,24.9503...|12800.0|
|[18311.765625,25....| 9800.0|
|[17766.666015625,...| 8500.0|
|[17284.2109375,25...| 8600.0|
|[16855.0,25.06496...| 8700.0|
+--------------------+-------+
only showing top 20 rows



In [9]:
splits = vstock_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [11]:
lr = RandomForestRegressor(featuresCol = 'features', labelCol='Volume')
lr_model = lr.fit(train_df)

lr_predictions = lr_model.transform(test_df)
#lr_predictions.select("prediction","Volume","features").show(5)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Volume")
print("MAE on test data = %g" % lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "mae"}))

MAE on test data = 703425


In [12]:
lr_model.write().overwrite().save("/usr/local/spark/staging/20230527/feature_engineering/regressor.model")

In [10]:
model = RandomForestRegressionModel.load("/usr/local/spark/staging/20230527/regressor.model")
lr_predictions = model.transform(test_df)
lr_predictions.select("prediction","Volume","features").show(5)

+------------------+------+--------------------+
|        prediction|Volume|            features|
+------------------+------+--------------------+
| 340561.9005693862|   0.0|[0.0,-2038.119262...|
| 340561.9005693862|   0.0|[0.0,-2038.119262...|
|193986.56398719936|   0.0|[0.0,11.059322357...|
|193986.56398719936|   0.0|[0.0,11.545310974...|
|193986.56398719936|   0.0|[0.0,14.760999679...|
+------------------+------+--------------------+
only showing top 5 rows



In [41]:
vol_moving_avg = 0.0
adj_close_rolling_med = -2038.1192626953125
new_feature = spark.createDataFrame([(vol_moving_avg, adj_close_rolling_med, 0.0)],["vol_moving_avg", "adj_close_rolling_med", "Volume"])
vectorAssembler = VectorAssembler(inputCols = ['vol_moving_avg', 'adj_close_rolling_med'], outputCol = 'features')
vstock_df = vectorAssembler.transform(new_feature)
vstock_df = vstock_df.select('features', 'Volume')
vstock_df.show(truncate=False)

+-------------------------+------+
|features                 |Volume|
+-------------------------+------+
|[0.0,-2038.1192626953125]|0.0   |
+-------------------------+------+



In [36]:
model.transform(vstock_df).show(truncate=False)

+-------------------------+------+-----------------+
|features                 |Volume|prediction       |
+-------------------------+------+-----------------+
|[0.0,-2038.1192626953125]|0.0   |340561.9005693862|
+-------------------------+------+-----------------+



In [40]:
predict = model.transform(vstock_df)
predict.select('prediction').collect()[0]['prediction']

340561.9005693862

In [53]:
from numpy import array
model.predict(array([vol_moving_avg, adj_close_rolling_med]))

Py4JJavaError: An error occurred while calling z:org.apache.spark.ml.python.MLSerDe.loads.
: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.mllib.api.python.SerDeBase.loads(PythonMLLibAPI.scala:1326)
	at org.apache.spark.ml.python.MLSerDe.loads(MLSerDe.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [51]:
array([vol_moving_avg, adj_close_rolling_med])

array([    0.       , -2038.1192627])