In [None]:
!unzip /content/drive/MyDrive/StockMarket/archive.zip

In [1]:
# Logging
import logging
from datetime import date
import os
#print(os.getcwd())
today = date.today()
filename = str(today)+"-pipeline.log"
#filename = os.path.join(os.getcwd(), str(today)+"-pipeline.log")
logging.basicConfig(filename=filename, 
                    filemode='w', 
                    level=logging.DEBUG, 
                    format='%(asctime)s, %(name)s %(levelname)s: %(message)s')

### Data Ingestion

In [2]:
import glob
# Get a list of all CSV files in the directory
etf_list = glob.glob("etfs/*.csv")
#print(etf_list)
logging.info(f'A total of {len(etf_list)} ETFs found.')

stock_list = glob.glob("stocks/*.csv")
#print(stock_list)
logging.info(f'A total of {len(stock_list)} stocks found.')


In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=7ea961c5bbdc9dd222ada3c77dac075d1842ba44e0b8bffe5b7ebc1faf77e34a
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

# Create a SparkSession
spark = SparkSession.builder.appName("pipeline").getOrCreate()

# Read all CSV files as a Spark DataFrame and add a new column with the file name
df = spark.read.csv(etf_list, header=True).withColumn("Symbol", input_file_name())

In [7]:
df = df.union(spark.read.csv(stock_list, header=True).withColumn("Symbol", input_file_name()))

print(f'Rows: {df.count()}, Columns: {len(df.columns)} combined.')
logging.info('Combined all CSV files into a Spark DataFrame')
logging.info(f'With Rows: {df.count()}, Columns: {len(df.columns)}')

Rows: 28151758, Columns: 8 combined.


In [None]:
df.show()

+----------+----+------+------+------+-----------------+------+--------------------+
|      Date|Open|  High|   Low| Close|        Adj Close|Volume|              Symbol|
+----------+----+------+------+------+-----------------+------+--------------------+
|1986-04-03| 0.0|  4.75| 4.625| 4.625|4.449552059173584| 15300|file:/content/etf...|
|1986-04-04| 0.0|  4.75|4.6875|  4.75| 4.56981086730957| 12000|file:/content/etf...|
|1986-04-07| 0.0| 4.875|  4.75|  4.75| 4.56981086730957| 11500|file:/content/etf...|
|1986-04-08| 0.0|4.8125|4.6875|  4.75| 4.56981086730957| 21000|file:/content/etf...|
|1986-04-09| 0.0|4.8125| 4.625|4.6875| 4.50968074798584| 22800|file:/content/etf...|
|1986-04-10| 0.0|4.6875| 4.625| 4.625|4.449552059173584|  6200|file:/content/etf...|
|1986-04-11| 0.0|4.6875|4.5625| 4.625|4.449552059173584| 37100|file:/content/etf...|
|1986-04-14| 0.0| 4.625|   4.5|4.5625|4.389422416687012| 28200|file:/content/etf...|
|1986-04-15| 0.0|4.6875|4.5625| 4.625|4.449552059173584| 14200|fi

In [8]:
from pyspark.sql.functions import regexp_extract

# Extract the filename from the "Symbol" column in the original DataFrame (df)
df = df.withColumn("Symbol", regexp_extract(df["Symbol"], r"([^/]+)\.csv$", 1))
#df.show()
logging.info('Extracted Symbol from path')

In [None]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Symbol: string (nullable = false)



In [9]:
from pyspark.sql.functions import col
df = df.withColumn("Open", col('Open').cast('float')) \
    .withColumn("High", col('High').cast('float')) \
    .withColumn("Low", col('Low').cast('float')) \
    .withColumn("Close", col('Close').cast('float')) \
    .withColumn("Adj Close", col('Adj Close').cast('float')) \
    .withColumn("Volume", col('Volume').cast('int'))
df.printSchema()
logging.info('Changed the data type of specific columns')

root
 |-- Date: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Adj Close: float (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Symbol: string (nullable = false)



In [10]:
# Read the metadata CSV file into a Spark DataFrame and select only the relevant columns
metadata_df = spark.read.csv("symbols_valid_meta.csv", header=True)

metadata_df = metadata_df.select("Symbol", "Security Name")
metadata_df.show()
logging.info('Read metadata CSV file')

+------+--------------------+
|Symbol|       Security Name|
+------+--------------------+
|     A|Agilent Technolog...|
|    AA|Alcoa Corporation...|
|  AAAU|Perth Mint Physic...|
|  AACG|ATA Creativity Gl...|
|  AADR|AdvisorShares Dor...|
|   AAL|American Airlines...|
|  AAMC|Altisource Asset ...|
|  AAME|Atlantic American...|
|   AAN|Aaron's, Inc. Com...|
|  AAOI|Applied Optoelect...|
|  AAON|AAON, Inc. - Comm...|
|   AAP|Advance Auto Part...|
|  AAPL|Apple Inc. - Comm...|
|   AAT|American Assets T...|
|   AAU|Almaden Minerals,...|
|  AAWW|Atlas Air Worldwi...|
|  AAXJ|iShares MSCI All ...|
|  AAXN|Axon Enterprise, ...|
|    AB|AllianceBernstein...|
|   ABB|ABB Ltd Common Stock|
+------+--------------------+
only showing top 20 rows



In [11]:
# Join the original DataFrame (df) with the metadata DataFrame on the "Symbol" column
df = df.join(metadata_df, on=["Symbol"], how="left")
df.show(10)
df.printSchema()
logging.info('Joined Security Name from metadata to combined DataFrame')

+------+----------+----+------+------+------+---------+------+--------------------+
|Symbol|      Date|Open|  High|   Low| Close|Adj Close|Volume|       Security Name|
+------+----------+----+------+------+------+---------+------+--------------------+
|   CEF|1986-04-03| 0.0|  4.75| 4.625| 4.625| 4.449552| 15300|Sprott Physical G...|
|   CEF|1986-04-04| 0.0|  4.75|4.6875|  4.75| 4.569811| 12000|Sprott Physical G...|
|   CEF|1986-04-07| 0.0| 4.875|  4.75|  4.75| 4.569811| 11500|Sprott Physical G...|
|   CEF|1986-04-08| 0.0|4.8125|4.6875|  4.75| 4.569811| 21000|Sprott Physical G...|
|   CEF|1986-04-09| 0.0|4.8125| 4.625|4.6875|4.5096807| 22800|Sprott Physical G...|
|   CEF|1986-04-10| 0.0|4.6875| 4.625| 4.625| 4.449552|  6200|Sprott Physical G...|
|   CEF|1986-04-11| 0.0|4.6875|4.5625| 4.625| 4.449552| 37100|Sprott Physical G...|
|   CEF|1986-04-14| 0.0| 4.625|   4.5|4.5625|4.3894224| 28200|Sprott Physical G...|
|   CEF|1986-04-15| 0.0|4.6875|4.5625| 4.625| 4.449552| 14200|Sprott Physica

In [None]:
df.columns

['Symbol',
 'Date',
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume',
 'Security Name']

#### Rearrange the dataframe

In [12]:
df = df.select('Symbol','Security Name',
 'Date',
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume')
df.show()
logging.info('Rearranged columns')

+------+--------------------+----------+----+------+------+------+---------+------+
|Symbol|       Security Name|      Date|Open|  High|   Low| Close|Adj Close|Volume|
+------+--------------------+----------+----+------+------+------+---------+------+
|   CEF|Sprott Physical G...|1986-04-03| 0.0|  4.75| 4.625| 4.625| 4.449552| 15300|
|   CEF|Sprott Physical G...|1986-04-04| 0.0|  4.75|4.6875|  4.75| 4.569811| 12000|
|   CEF|Sprott Physical G...|1986-04-07| 0.0| 4.875|  4.75|  4.75| 4.569811| 11500|
|   CEF|Sprott Physical G...|1986-04-08| 0.0|4.8125|4.6875|  4.75| 4.569811| 21000|
|   CEF|Sprott Physical G...|1986-04-09| 0.0|4.8125| 4.625|4.6875|4.5096807| 22800|
|   CEF|Sprott Physical G...|1986-04-10| 0.0|4.6875| 4.625| 4.625| 4.449552|  6200|
|   CEF|Sprott Physical G...|1986-04-11| 0.0|4.6875|4.5625| 4.625| 4.449552| 37100|
|   CEF|Sprott Physical G...|1986-04-14| 0.0| 4.625|   4.5|4.5625|4.3894224| 28200|
|   CEF|Sprott Physical G...|1986-04-15| 0.0|4.6875|4.5625| 4.625| 4.449552|

In [13]:

df = df.withColumnRenamed("Security Name", "Security_Name")
df = df.withColumnRenamed("Adj Close", "Adj_Close")
#df.show()
logging.info('Renamed columns to remove spaces')

#### Formatting

In [None]:
# Save as CSV
# Try to use coalesce() instead of repartition() to reduce the number of partitions
df.repartition(1).write.csv("etfs.csv", header=True, mode="overwrite")
logging.info('Saved current DataFrame as a CSV file')

In [None]:
# Save as Parquet
# Try to use coalesce() instead of repartition() to reduce the number of partitions
df.write.parquet("etfs.parquet", mode="overwrite")
logging.info('Saved current DataFrame as a Parquet file')

In [None]:
#print((df.count(), len(df.columns)))

### Feature Engineering

In [14]:
import pyspark.sql
from pyspark.sql.functions import percentile_approx, mean
from pyspark.sql.window import Window

In [15]:

'''
# Method 1
df.createOrReplaceTempView("df_view")
df2 = spark.sql(
    """SELECT *, mean(Volume) OVER (
        PARTITION BY Symbol 
        ORDER BY CAST(Date AS timestamp) 
        RANGE BETWEEN INTERVAL 29 DAYS PRECEDING AND CURRENT ROW
     ) AS vol_moving_avg FROM df_view""")
df2.show(25)
df2.printSchema()
'''
# Method 2
# Define the window specification
windowSpec = (
    Window()
    .partitionBy("Symbol")
    .orderBy(col("Date").cast("timestamp").cast("long"))
    .rangeBetween(-29*86400, 0)
)

# Calculate the rolling 30-day median of the Adj_Close column
df2 = df.withColumn("vol_moving_avg", mean("Volume").over(windowSpec))

# Show the resulting DataFrame
df2.show()
df2.printSchema()
logging.info('Calculated and added moving average of volume')

+------+--------------------+----------+-----+-----+-----+-----+---------+--------+------------------+
|Symbol|       Security_Name|      Date| Open| High|  Low|Close|Adj_Close|  Volume|    vol_moving_avg|
+------+--------------------+----------+-----+-----+-----+-----+---------+--------+------------------+
|   AAT|American Assets T...|2011-01-13|21.53| 22.0|21.18|21.25|16.332218|15536900|         1.55369E7|
|   AAT|American Assets T...|2011-01-14|21.16|21.45|21.16|21.31|16.378332| 1304800|         8420850.0|
|   AAT|American Assets T...|2011-01-18| 21.3|21.45| 21.2|21.37| 16.42445|  124800|         5655500.0|
|   AAT|American Assets T...|2011-01-19|21.42|21.42|20.88|21.25|16.332218| 1010200|         4494175.0|
|   AAT|American Assets T...|2011-01-20|21.05| 21.4|21.03|21.21|16.301474|  736600|         3742660.0|
|   AAT|American Assets T...|2011-01-21| 21.3| 21.3|21.03|21.25|16.332218|  636800|3225016.6666666665|
|   AAT|American Assets T...|2011-01-24| 21.2|21.39| 21.1| 21.3|16.370651

In [16]:
from pyspark.sql.functions import udf, collect_list
import numpy as np
from pyspark.sql.types import FloatType

median_udf = udf(lambda x: float(np.median(x)), FloatType())

df2 = df2.withColumn("list", collect_list("Adj_Close").over(windowSpec)) \
  .withColumn("adj_close_rolling_med", median_udf("list"))

df2 = df2.drop("list")
# Show the resulting DataFrame
df2.show()
df2.printSchema()
logging.info('Calculated and added rolling median of Adj Close')

+------+--------------------+----------+-----+-----+-----+-----+---------+--------+------------------+---------------------+
|Symbol|       Security_Name|      Date| Open| High|  Low|Close|Adj_Close|  Volume|    vol_moving_avg|adj_close_rolling_med|
+------+--------------------+----------+-----+-----+-----+-----+---------+--------+------------------+---------------------+
|   AAT|American Assets T...|2011-01-13|21.53| 22.0|21.18|21.25|16.332218|15536900|         1.55369E7|            16.332218|
|   AAT|American Assets T...|2011-01-14|21.16|21.45|21.16|21.31|16.378332| 1304800|         8420850.0|            16.355274|
|   AAT|American Assets T...|2011-01-18| 21.3|21.45| 21.2|21.37| 16.42445|  124800|         5655500.0|            16.378332|
|   AAT|American Assets T...|2011-01-19|21.42|21.42|20.88|21.25|16.332218| 1010200|         4494175.0|            16.355274|
|   AAT|American Assets T...|2011-01-20|21.05| 21.4|21.03|21.21|16.301474|  736600|         3742660.0|            16.332218|


#### df2 now contains the resulting dataset with new features

In [None]:
# Save as CSV
# Try to use coalesce() instead of repartition() to reduce the number of partitions
df2.repartition(1).write.csv("etfs2.csv", header=True, mode="overwrite")
logging.info('Saved the new DataFrame as a CSV file')

In [17]:
# Save as Parquet
# Try to use coalesce() instead of repartition() to reduce the number of partitions
df2.repartition(1).write.parquet("etfs2.parquet", mode="overwrite")
logging.info('Saved the new DataFrame as a Parquet file')

### Integrate ML Training

#### Method 1: Use Scikit-learn ML model
(Out of Memory Error)

In [None]:
import pandas as pd
data_from_parquet = pd.read_parquet('/content/etfs2.parquet/part-00000-7836736d-c28e-4cf6-94c3-27cf3c804dd3-c000.snappy.parquet', columns=['vol_moving_avg', 'adj_close_rolling_med', 'Volume'])
#data_from_csv = pd.read_csv('/content/etfs2.csv/etfs2.csv')
logging.info('Read data from Parquet')
data = data_from_parquet

In [None]:
data_from_parquet.head()

Unnamed: 0,vol_moving_avg,adj_close_rolling_med,Volume
0,0.0,0.044911,0.0
1,298200.0,24.6131,298200.0
2,150700.0,24.6131,3200.0
3,102733.333333,24.6131,6800.0
4,77625.0,24.658989,2300.0


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error

data = data_from_parquet
#data['Date'] = pd.to_datetime(data['Date'])
#data.set_index('Date', inplace=True)

# Remove rows with NaN values
data.dropna(inplace=True)

# Select features and target
features = ['vol_moving_avg', 'adj_close_rolling_med']
target = 'Volume'

X = data[features]
y = data[target]

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create a RandomForestRegressor model
model = RandomForestRegressor(n_estimators=100, random_state=42)

# Train the model
model.fit(X_train, y_train)
"""
Out of Memory
"""

# Make predictions on test data
y_pred = model.predict(X_test)

# Calculate the Mean Absolute Error and Mean Squared Error
mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)

logging.info('Trained and saved....')

In [None]:
print(mae)
print(mse)

#### Method 2: Use Spark ML models

In [28]:
# Create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pipeline").getOrCreate()

# Read from Parquet
#parDF=spark.read.parquet('/content/drive/MyDrive/RiskThinkingAI/etfs2.parquet')
#parDF=spark.read.parquet('/content/etfs2.parquet/etfs_stocks_2.parquet')
parDF = df2

parDF = parDF.na.drop()
# Reference: https://hackernoon.com/building-a-machine-learning-model-with-pyspark-a-step-by-step-guide-1z2d3ycd

required_features = ['vol_moving_avg', 'adj_close_rolling_med']

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(parDF)
transformed_data.show(10)
transformed_data = transformed_data.select(['features', 'Volume'])


+------+--------------------+----------+-----+-----+-----+-----+---------+--------+------------------+---------------------+--------------------+
|Symbol|       Security_Name|      Date| Open| High|  Low|Close|Adj_Close|  Volume|    vol_moving_avg|adj_close_rolling_med|            features|
+------+--------------------+----------+-----+-----+-----+-----+---------+--------+------------------+---------------------+--------------------+
|   AAT|American Assets T...|2011-01-13|21.53| 22.0|21.18|21.25|16.332218|15536900|         1.55369E7|            16.332218|[1.55369E7,16.332...|
|   AAT|American Assets T...|2011-01-14|21.16|21.45|21.16|21.31|16.378332| 1304800|         8420850.0|            16.355274|[8420850.0,16.355...|
|   AAT|American Assets T...|2011-01-18| 21.3|21.45| 21.2|21.37| 16.42445|  124800|         5655500.0|            16.378332|[5655500.0,16.378...|
|   AAT|American Assets T...|2011-01-19|21.42|21.42|20.88|21.25|16.332218| 1010200|         4494175.0|            16.355274|

In [29]:
# Split the data

#(training_data, test_data) = transformed_data.randomSplit([0.8,0.2], seed =2020)
#print(f"Training Dataset Count: {training_data.count()}")
#print(f"Test Dataset Count: {test_data.count()}")

splits = transformed_data.randomSplit([0.8, 0.2])
training_data = splits[0]
test_data = splits[1]

In [30]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='Volume', 
                      featuresCol='features',
                      maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(training_data)
rf_predictions = model.transform(test_data)

print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

In [None]:
# Summarize the model over the training set and print out some metrics
trainingSummary = model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

logging.info('Trained model using PySpark ML')
logging.info(f'With RMSE: {trainingSummary.rootMeanSquaredError}')
logging.info(f'With r2: {trainingSummary.r2}')
logging.info(f'With loss: {model.getLoss()}')

In [22]:
#from pyspark.context import SparkContext
# save the model to disk
filename = str(today) + '-lr-model'
#joblib.dump(model, filename)
model.save(filename)
logging.info(f'Saved models as {filename}')

In [23]:
from pyspark.ml.regression import LinearRegressionModel

sameModel = LinearRegressionModel.load(filename)

In [24]:
rf_predictions = sameModel.transform(test_data)
rf_predictions.show(10)

+--------------------+------+------------------+
|            features|Volume|        prediction|
+--------------------+------+------------------+
|[0.0,-1999.244873...|     0|21559.669654760044|
|[0.0,-1999.244873...|     0|21559.669654760044|
|[0.0,-1293.955566...|     0|21559.669654760044|
|[0.0,-1288.402099...|     0|21559.669654760044|
|[0.0,-1288.402099...|     0|21559.669654760044|
|[0.0,-1282.848999...|     0|21559.669654760044|
|[0.0,-1227.314208...|     0|21559.669654760044|
|[0.0,-1216.207519...|     0|21559.669654760044|
|[0.0,-1216.207519...|     0|21559.669654760044|
|[0.0,-1207.877197...|     0|21559.669654760044|
+--------------------+------+------------------+
only showing top 10 rows



In [None]:
sameModelFromDrive = LinearRegressionModel.load("/content/drive/MyDrive/StockMarket/2023-05-07-lr-model")

rf_predictions = sameModelFromDrive.transform(test_data)
rf_predictions.show(10)

+--------------------+------+------------------+
|            features|Volume|        prediction|
+--------------------+------+------------------+
|[0.0,-1771.553222...|     0| 17621.29583576283|
|[0.0,-1732.678955...|     0| 17621.28854090555|
|[0.0,-1574.405395...|     0| 17621.25884046099|
|[0.0,-1524.424316...|     0|17621.249461381674|
|[0.0,-1293.955566...|     0| 17621.20621332211|
|[0.0,-1288.402099...|     0|17621.205171199643|
|[0.0,-1288.402099...|     0|17621.205171199643|
|[0.0,-1288.402099...|     0|17621.205171199643|
|[0.0,-1282.848999...|     0| 17621.20412914589|
|[0.0,-1238.421142...|     0|17621.195792143233|
+--------------------+------+------------------+
only showing top 10 rows



In [None]:
!pip freeze

### Problem 4. Model Serving

In [26]:
#from pyspark.sql.functions import concat
apiData = spark.createDataFrame([
    (41431, -5124.312)], 
    ["vol_moving_avg", "adj_close_rolling_med"])

apiData.show()

'''
# combine the three columns into a single column named "features"
apiData = apiData.withColumn("features", concat(col("vol_moving_avg"), col("vol_moving_avg")))
apiData = transformed_data.select(['features'])
apiData.show()
'''

apiDataTransformed = assembler.transform(apiData)
apiDataTransformed = apiDataTransformed.select(['features'])
apiDataTransformed.show()

apiDataPredictions = sameModel.transform(apiDataTransformed)
apiDataPredictions.select('prediction').show()

+--------------+---------------------+
|vol_moving_avg|adj_close_rolling_med|
+--------------+---------------------+
|         41431|            -5124.312|
+--------------+---------------------+

+-------------------+
|           features|
+-------------------+
|[41431.0,-5124.312]|
+-------------------+

+------------------+
|        prediction|
+------------------+
|62171.124431635784|
+------------------+



In [None]:
apiDataPredictions.collect()[0][1]

58796.478232862195

In [None]:
#!pip install flask-ngrok # Use only when the notebook is running on Colab
# https://www.geeksforgeeks.org/how-to-run-flask-app-on-google-colab/

In [3]:
from flask import Flask, request, jsonify
from pyspark.ml.regression import LinearRegressionModel
#import pandas as pd

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("api").getOrCreate()

from pyspark.ml.feature import VectorAssembler
required_features = ['vol_moving_avg', 'adj_close_rolling_med']
assembler = VectorAssembler(inputCols=required_features, outputCol='features')



from google.colab.output import eval_js
print(eval_js("google.colab.kernel.proxyPort(5000)"))

# https://brightersidetech.com/running-flask-apps-in-google-colab/
    
app = Flask(__name__)
model = LinearRegressionModel.load("/content/drive/MyDrive/StockMarket/2023-05-07-lr-model")

def volume_prediction(model, data):
    #df = pd.DataFrame(data=data)

    apiData = spark.createDataFrame([data], 
    required_features)

    apiData = assembler.transform(apiData)
    apiData = apiData.select(['features'])

    prediction = model.transform(apiData)
    # https://www.geeksforgeeks.org/get-value-of-a-particular-cell-in-pyspark-dataframe/
    #print(f'Prediction: {int(prediction.collect()[0][1])}')
    return {'volume': int(prediction.collect()[0][1])}


@app.route('/')
def home():
    return "Hello World"

"""
@app.route('/test')
def test():
  return jsonify({'test': 'You can access test API'}), 200
"""
@app.route('/predict')
def get_volume():
    vol_moving_avg = request.args.get('vol_moving_avg')
    adj_close_rolling_med = request.args.get('adj_close_rolling_med')
    
    if not vol_moving_avg or not adj_close_rolling_med:
        return jsonify({'error': 'You need to supply both vol_moving_avg and adj_close_rolling_med'}), 400

    data = [float(vol_moving_avg), float(adj_close_rolling_med)]
    return jsonify({
        **volume_prediction(model, data),
    })

if __name__ == '__main__':
    app.run()

https://q5h1pm05px-496ff2e9c6d22116-5000-colab.googleusercontent.com/
 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug:127.0.0.1 - - [09/May/2023 05:30:44] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/May/2023 05:30:44] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -
INFO:werkzeug:127.0.0.1 - - [09/May/2023 05:32:00] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/May/2023 05:32:00] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -


In [13]:
import requests
volume = requests.get('https://q5h1pm05px-496ff2e9c6d22116-5000-colab.googleusercontent.com/predict', 
                       params={'vol_moving_avg': 41431, 'adj_close_rolling_med':-5124.312})
volume

<Response [404]>

#### Download a folder by zipping

In [27]:
!zip -r /content/model.zip /content/2023-05-08-lr-model
from google.colab import files
files.download("/content/model.zip")

  adding: content/2023-05-08-lr-model/ (stored 0%)
  adding: content/2023-05-08-lr-model/metadata/ (stored 0%)
  adding: content/2023-05-08-lr-model/metadata/_SUCCESS (stored 0%)
  adding: content/2023-05-08-lr-model/metadata/.part-00000.crc (stored 0%)
  adding: content/2023-05-08-lr-model/metadata/._SUCCESS.crc (stored 0%)
  adding: content/2023-05-08-lr-model/metadata/part-00000 (deflated 44%)
  adding: content/2023-05-08-lr-model/data/ (stored 0%)
  adding: content/2023-05-08-lr-model/data/_SUCCESS (stored 0%)
  adding: content/2023-05-08-lr-model/data/._SUCCESS.crc (stored 0%)
  adding: content/2023-05-08-lr-model/data/part-00000-89105cda-1d70-4650-8fc8-bf91d3e8b8a4-c000.snappy.parquet (deflated 55%)
  adding: content/2023-05-08-lr-model/data/.part-00000-89105cda-1d70-4650-8fc8-bf91d3e8b8a4-c000.snappy.parquet.crc (stored 0%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>