20.5 9 Scale Your ML (DL) Prototype

In this step, your goal is to ensure that your ML/DL approach, which you’ve proved to be
viable, can work with large volumes of data. You need to scale your prototype. Please
work with your mentor to determine what that means for your problem.
Using scikit-learn, SparkML, Keras, TensorFlow, PyTorch, or some of the other
technologies you have learned, implement your prototype at scale.
In case your earlier prototype was working with a subset, ensure that this scaled-up
prototype can handle your complete dataset.
Think about what your capstone problem would look like in the real world:
● How much data would you need to handle?
● Can you scale your prototype to handle that volume of data using the approach
and tools you have selected?

The submission shows that the student understands how to scale a machine learning or a deep learning model.
The scaled prototype can handle the complete dataset that the student has collected (even if the student has only used a sample, so far) and is capable of handling all of the data that a real-world version of the application would need to handle.
The submission demonstrates that the student made well-thought-out decisions about scaling their prototype:
Choice of tools/libraries: scikit-learn, SparkML, TensorFlow, Keras, and PyTorch, etc.
Choice of machine learning/deep learning technique


Well-documented GitHub repository and code. The Jupyter notebooks for the code provide step-by-step documentation that’s easy to follow.

In [241]:
import warnings
warnings.filterwarnings('ignore')

## Scaling 
* Vertical: Bigger CPU, mem, disk on one host
* Horizontal: Processing is distributed to multiple hosts

### Initalize Spark

In [242]:
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [243]:
from sklearn.feature_extraction.text import CountVectorizer

In [244]:
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("bizwiz") \
      .getOrCreate() 

### Open "Big" dataset with Spark

In [245]:
#df_in=spark.read.parquet('build/s_in/bizwiz_value_score_2.parquet')

In [246]:
#df_in.schema

In [247]:
df_in_schema=StructType([StructField('id', DoubleType(), True), StructField('pptitle', StringType(), True), StructField('ppdesc', StringType(), True), StructField('ppdetails', StringType(), True), StructField('ppfinancials', StringType(), True), StructField('pcategories', ArrayType(StringType(), True), True), StructField('COUNTY_NAME', StringType(), True), StructField('STATE_NAME', StringType(), True), StructField('price', DoubleType(), True), StructField('cash_flow', DoubleType(), True), StructField('gross_revenue', DoubleType(), True), StructField('established', DoubleType(), True), StructField('POVERTY_PERCENT', DoubleType(), True), StructField('MEDIAN_HOUSEHOLD_INCOME', DoubleType(), True), StructField('bizwiz_value_score', DoubleType(), True), StructField('bizwiz_class', LongType(), True), StructField('bizwiz_label', StringType(), True), StructField('label_num', LongType(), True), StructField('label', StringType(), True), StructField('Abs(established - sqrt(gross_revenue))', DoubleType(), True), StructField('Abs(sqrt(MEDIAN_HOUSEHOLD_INCOME) - sqrt(cash_flow))', DoubleType(), True), StructField('Abs(sqrt(cash_flow) - established)', DoubleType(), True), StructField('log(MEDIAN_HOUSEHOLD_INCOME + cash_flow)', DoubleType(), True), StructField('sqrt(established + sqrt(gross_revenue))', DoubleType(), True), StructField('Abs(POVERTY_PERCENT**2 - sqrt(cash_flow))', DoubleType(), True), StructField('1/(sqrt(MEDIAN_HOUSEHOLD_INCOME) + cash_flow)', DoubleType(), True), StructField('POVERTY_PERCENT**2*cash_flow', DoubleType(), True), StructField('1/(-sqrt(gross_revenue) + 1/POVERTY_PERCENT)', DoubleType(), True), StructField('sqrt(gross_revenue/POVERTY_PERCENT)', DoubleType(), True), StructField('POVERTY_PERCENT**3*sqrt(gross_revenue)', DoubleType(), True), StructField('sqrt(MEDIAN_HOUSEHOLD_INCOME)*established**3', DoubleType(), True), StructField('(sqrt(MEDIAN_HOUSEHOLD_INCOME) + established)**3', DoubleType(), True), StructField('__index_level_0__', LongType(), True)])

In [248]:
def partial_fit(self , data):
    if(hasattr(vectorizer , 'vocabulary_')):
        vocab = self.vocabulary_
    else:
        vocab = {}
    self.fit(data)
    vocab = list(set(vocab.keys()).union(set(self.vocabulary_ )))
    self.vocabulary_ = {vocab[i] : i for i in range(len(vocab))}

In [249]:
CountVectorizer.partial_fit = partial_fit

In [250]:
vectorizer = CountVectorizer()

### Process "Big" datset in batches

In [251]:
#def func(itr):
#    for row in iter:
#        print(row.columns)

In [252]:
#df_in.foreachPartition(func)

In [253]:
#dataset=df_in.select("*")

In [254]:
def batch_proc(df, epoch_id):
    X= df.select('ppdesc').collect() #.to_numpy()
    print(X)
    #y= df.price.to_numpy()
    vectorizer.partial_fit(X)

In [255]:
#https://saturncloud.io/blog/how-to-use-foreach-and-foreachbatch-in-pyspark-to-write-to-database/

In [256]:
#query = dataset.writeStream.outputMode("update").foreachBatch(batch_proc).start()

In [257]:
#query.awaitTermination()

In [258]:
sdf_in=spark.readStream.format("parquet").schema(df_in_schema).load('build/s_in')

In [259]:
sdf_in.isStreaming

True

In [260]:
dataset=sdf_in.select("*")

In [261]:
query = dataset.writeStream.outputMode("update").foreachBatch(batch_proc).start()

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/sql/types.py", line 2377, in __getattr__
    idx = self.__fields__.index(item)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: 'lower' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 120, in call
    raise e
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 117, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/tmp/ipykernel_113/2433842204.py", line 5, in batch_proc
    vectorizer.partial_fit(X)
  File "/tmp/ipykernel

In [262]:
#query.awaitTermination()

In [263]:
spark.stop()