In [5]:
!pip install flask

Collecting flask
  Downloading Flask-2.3.2-py3-none-any.whl (96 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m96.9/96.9 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting Werkzeug>=2.3.3 (from flask)
  Downloading Werkzeug-2.3.3-py3-none-any.whl (242 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.3/242.3 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting itsdangerous>=2.1.2 (from flask)
  Downloading itsdangerous-2.1.2-py3-none-any.whl (15 kB)
Installing collected packages: Werkzeug, itsdangerous, flask
Successfully installed Werkzeug-2.3.3 flask-2.3.2 itsdangerous-2.1.2


In [12]:
from flask import Flask, request, jsonify
from pyspark.sql import SparkSession
import os

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressionModel

# Create a new SparkSession
spark = SparkSession.builder.appName("FlaskApp").getOrCreate()
model = RandomForestRegressionModel.load("randomForestpredictionmodel")
print("Loaded Model!")

# Prediction Function
def predict_volume(model, input_data):
    # Assemble the features into a vector column
    assembler = VectorAssembler(inputCols=['vol_moving_avg', 'adj_close_rolling_med'], outputCol="features")
    input_data = assembler.transform(input_data)

    # Make predictions on the new data
    predictions = model.transform(input_data)

    # Select relevant columns and return the DataFrame
    output = predictions.select('prediction')
    return output

app = Flask(__name__)

@app.route('/')
@app.route('/health')
def health():
    return jsonify({"message":'App is healthy!'})

@app.route('/predict', methods=['GET'])
def predict():
    global model
    # Get the values of the 'vol_moving_avg' and 'adj_close_rolling_med' query parameters
    vol_moving_avg = float(request.args.get('vol_moving_avg'))
    adj_close_rolling_med = float(request.args.get('adj_close_rolling_med'))

    input_data = spark.createDataFrame([(vol_moving_avg, adj_close_rolling_med)], ['vol_moving_avg', 'adj_close_rolling_med'])

    predictions = predict_volume(model, input_data)
    # Convert the predictions to a JSON response
    output = {
        "input": {
            "vol_moving_avg": vol_moving_avg, 
            "adj_close_rolling_med": adj_close_rolling_med
        },
        'predictions': predictions.select('prediction').collect()
    }
    return jsonify(output)


Loaded Model!


In [None]:
app.run(host="0.0.0.0",port=8000)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:8000
 * Running on http://172.17.0.2:8000
Press CTRL+C to quit
