In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello ")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
import skdist
skdist

<module 'skdist' from 'C:\\Users\\masahiro\\Anaconda3\\lib\\site-packages\\skdist\\__init__.py'>

In [4]:
import pandas as pd
 
from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_20newsgroups, load_digits
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.pipeline import Pipeline
from skdist.distribute.predict import get_prediction_udf
from pyspark.sql import SparkSession, functions as F

# spark session initialization
spark = (
    SparkSession
    .builder
    .getOrCreate()
    )
sc = spark.sparkContext

# simple 2-D numpy features
data = load_digits()
X = data["data"]
y = data["target"]
model = LogisticRegression(
    solver="liblinear", 
    multi_class="auto"
    )
model.fit(X, y)

# get UDFs with default 'numpy' feature types
predict = get_prediction_udf(model, method="predict")
predict_proba = get_prediction_udf(model, method="predict_proba")

# create PySpark DataFrame from features
pdf = pd.DataFrame(X)
sdf = spark.createDataFrame(pdf)
cols = [F.col(str(c)) for c in sdf.columns]

# apply predict UDFs and select prediction output
prediction_df = (
    sdf
    .withColumn("scores", predict_proba(*cols))
    .withColumn("preds", predict(*cols))
    .select("preds", "scores")
    )
prediction_df.show()

# single text feature 
data = fetch_20newsgroups(
    shuffle=True, random_state=1,
    remove=('headers', 'footers', 'quotes')
    )
X = data["data"][:100]
y = data["target"][:100]
model = Pipeline([
    ("vec", HashingVectorizer()), 
    ("clf", LogisticRegression(solver="liblinear", multi_class="auto"))
    ])
model.fit(X, y)

# get UDFs with 'text' feature types
predict = get_prediction_udf(model, method="predict", feature_type="text")
predict_proba = get_prediction_udf(model, method="predict_proba", feature_type="text")

# create PySpark DataFrame from features
pdf = pd.DataFrame(X)
sdf = spark.createDataFrame(pdf)
cols = [F.col(str(c)) for c in sdf.columns]

# apply predict UDFs and select prediction output
prediction_df = (
    sdf
    .withColumn("scores", predict_proba(*cols))
    .withColumn("preds", predict(*cols))
    .select("preds", "scores")
    )
prediction_df.show()

# complex feature space as pandas DataFrame
X = pd.DataFrame({"text": data["data"][:100]})
y = data["target"][:100]
model = Pipeline([
    ("vec", ColumnTransformer([("text", HashingVectorizer(), "text")])), 
    ("clf", LogisticRegression(solver="liblinear", multi_class="auto"))
    ])
model.fit(X, y)

# get UDFs with 'pandas' feature types
# NOTE: This time we must supply an ordered list
# of column names to the `get_predict_udf` function
predict = get_prediction_udf(model, method="predict", feature_type="pandas", names=list(X.columns))
predict_proba = get_prediction_udf(model, method="predict_proba", feature_type="pandas", names=list(X.columns))

# create PySpark DataFrame from features
sdf = spark.createDataFrame(X)
cols = [F.col(str(c)) for c in sdf.columns]

# apply predict UDFs and select prediction output
prediction_df = (
    sdf
    .withColumn("scores", predict_proba(*cols))
    .withColumn("preds", predict(*cols))
    .select("preds", "scores")
    )
prediction_df.show()

Downloading 20news dataset. This may take a few minutes.
Downloading dataset from https://ndownloader.figshare.com/files/5975967 (14 MB)


+-----+--------------------+
|preds|              scores|
+-----+--------------------+
|    0|[0.99988133364581...|
|    1|[4.75036320495255...|
|    2|[2.94891479564409...|
|    3|[1.63439184184317...|
|    4|[1.11340782802743...|
|    5|[1.47167847402628...|
|    6|[1.08555580162344...|
|    7|[3.02428696147597...|
|    8|[7.65455387782709...|
|    9|[3.97697358804967...|
|    0|[0.99919579687825...|
|    1|[2.65210459891626...|
|    2|[1.85886626332130...|
|    3|[2.89824549681289...|
|    4|[2.84814929909155...|
|    5|[2.70091084586477...|
|    6|[1.10907939716380...|
|    7|[3.06454485209046...|
|    8|[2.38830589881912...|
|    9|[8.24574106207122...|
+-----+--------------------+
only showing top 20 rows

+-----+--------------------+
|preds|              scores|
+-----+--------------------+
|    4|[0.03736128393565...|
|    0|[0.09792807410478...|
|   17|[0.05044543817914...|
|   11|[0.03443972986074...|
|   10|[0.04757471929521...|
|   15|[0.04555477151025...|
|    4|[0.0402530