# Sending data to Spark cluster from Local instance

suppose you would like to send pretrained scikit-learn model to your Spark cluster(e.g. for further usage with other packages like `spark-sklearn`)

**warning** this example assumes that both (py)Spark cluster and your local machine both have the same python packages versions

the following code requires numpy, scipy and scikit-learn to be installed

We can send strings:

In [1]:
%%local

s = u"abc ሴ def"
print(s)

abc ሴ def


In [2]:
%%send_to_spark -i s -t str -n s

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
16,,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 's' as 's' to Spark kernel

In [3]:
# This runs in Spark; not sure why Unicode doesn't work
print(s)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

abc ? def

Next we train a model to get its precision:

In [4]:
%%local
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
import pandas as pd
import pickle
from sklearn import tree
from sklearn.metrics import precision_score

iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, random_state=1)
X_test_pd = pd.DataFrame(X_test, columns=['a','b','c','d'])
Y_test_pd = pd.DataFrame(y_test, columns=['pred'])

decision_tree = tree.DecisionTreeClassifier()
decision_tree_model = decision_tree.fit(X_train, y_train)

y_pred = decision_tree_model.predict(X_test)
precision_score(y_test, y_pred, average='weighted')

0.9763157894736842

send test sets to `%spark`

In [5]:
%%send_to_spark -i X_test_pd -t df -n X_test

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'X_test_pd' as 'X_test' to Spark kernel

In [6]:
%%send_to_spark -i Y_test_pd -t df -n y_test

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'Y_test_pd' as 'y_test' to Spark kernel

because `pickle.dumps` returns `bytearray` we encode it to base64

In [7]:
%%local
import codecs
decision_tree_pickled = codecs.encode(pickle.dumps(decision_tree_model), "base64").decode()

In [8]:
%%send_to_spark -i decision_tree_pickled -t str -n decision_tree_pickled

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'decision_tree_pickled' as 'decision_tree_pickled' to Spark kernel

decode from base64 in `%spark`

In [9]:
import pickle, codecs

decision_tree_model = pickle.loads(codecs.decode(decision_tree_pickled.encode(), "base64"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

convert Pyspark DataFrame into numpy arrays

In [10]:
import numpy as np
y_test = np.array(y_test.select('pred').collect())
X_test = np.array(X_test.collect())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

run pretrained classifier and see if its precision matches the `%local` model

In [11]:
from sklearn.metrics import precision_score

y_pred = decision_tree_model.predict(X_test)
precision_score(y_test, y_pred, average='weighted')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.9763157894736842

it does! we have successfully passed both string and pandas dataframe from `%local` to `%spark`