In [0]:
cluster_info_cols = [
    'spark.databricks.clusterUsageTags.clusterNodeType', 
    'spark.databricks.clusterUsageTags.clusterMaxWorkers', 
    'spark.databricks.clusterUsageTags.clusterMinWorkers'
]
cluster_config = {el[0].replace('spark.databricks.clusterUsageTags.', ''): el[1] for el in spark.sparkContext.getConf().getAll() if el[0] in cluster_info_cols}

In [0]:
if cluster_config['clusterNodeType']=='Standard_DS5_v2':
    cluster_config['GB'] = 56
    cluster_config['core'] = 16
elif cluster_config['clusterNodeType']=='Standard_DS3_v2':
    cluster_config['GB'] = 14
    cluster_config['core'] = 4
elif cluster_config['clusterNodeType']=="dev-tier-node":
    cluster_config['GB'] = 15.3
    cluster_config['core'] = 2
    
del cluster_config['clusterNodeType']

# Boston data

In [0]:
import numpy as np
import pandas as pd

from sklearn.datasets import load_boston

In [0]:
boston = load_boston()

boston_pd = pd.DataFrame(
    data= np.c_[boston['data'], boston['target']],
    columns= np.append(boston['feature_names'], 'target')
)
boston_pd.head(5), boston_pd.shape[0]

# RF - sklearn

In [0]:
from sklearn.ensemble import RandomForestRegressor
from scipy.stats.stats import pearsonr
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error

In [0]:
train, test = train_test_split(boston_pd, test_size=0.2, random_state=42)

Sztuczne powiększenie danych - tylko dla celów testowych

In [0]:
n = train.shape[0] * 1 #1000

In [0]:
train = train.sample(n, replace=True)

In [0]:
y_train = train['target']
X_train = train.drop(['target'], axis=1)

y_test = test['target']
X_test = test.drop(['target'], axis=1)

# rf = RandomForestRegressor()
# model = rf.fit(X_train, y_train)

# y_pred = model.predict(X_test)

# mae = mean_absolute_error(y_test, y_pred)
# print("MAE: " + str(mae))

#RF with tracking

In [0]:
seed = 42  # for repeatability purposes
default_params = {'n_estimators': 200, 'max_depth': 10}

In [0]:
import time
import mlflow
import mlflow.sklearn

In [0]:
mlflow.start_run(run_name="sklearn")

rf = RandomForestRegressor(n_estimators=default_params['n_estimators'], max_depth=default_params['max_depth'], random_state=seed)

start = time.time()
model = rf.fit(X_train, y_train)
end = time.time()

y_pred = model.predict(X_test)

mae = mean_absolute_error(y_test, y_pred)

mlflow.log_metrics({
    "time": end-start,
    "mae": mae,
})
mlflow.log_params(cluster_config)
mlflow.log_param("n", X_train.shape[0])
mlflow.sklearn.log_model(model, "model")

mlflow.end_run()

In [0]:
# features importances
pd.DataFrame({'var': X_train.columns, 'imp': model.feature_importances_}).sort_values('imp', ascending=False)

Unnamed: 0,var,imp
12,LSTAT,0.511157
5,RM,0.316889
7,DIS,0.06801
0,CRIM,0.035081
4,NOX,0.024399
9,TAX,0.010394
6,AGE,0.009572
11,B,0.009096
10,PTRATIO,0.006358
2,INDUS,0.00541


In [0]:
def save_model(save, run_name, cluster_config, time, mae, n, cv, parallelism, model, lib):
    if save:
        mlflow.start_run(run_name=run_name)

        mlflow.log_metrics({
            "time": time,
            "mae": mae,
        })
        mlflow.log_params(cluster_config)
        mlflow.log_params({"n": n, 'cv': cv, 'parallelism': parallelism})
        lib.log_model(model, "model")

        mlflow.end_run()

In [0]:
save = True

#MLlib

https://spark.apache.org/docs/latest/ml-classification-regression.html

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor as RandomForestRegressorSpark
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
import mlflow.spark

In [0]:
train_spark = spark.createDataFrame(train).withColumnRenamed('target', 'label')
test_spark = spark.createDataFrame(test)

In [0]:
assembler = VectorAssembler(inputCols= list(X_train.columns), outputCol="features")
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.RandomForestRegressor.html#pyspark.ml.regression.RandomForestRegressor
rf = RandomForestRegressorSpark(numTrees=default_params['n_estimators'], maxDepth=default_params['max_depth'], seed=seed)


pipeline = Pipeline(stages = [assembler, rf])

start = time.time()
model = pipeline.fit(train_spark)
end = time.time()

test_pred_spark = model.transform(test_spark)

evaluator = RegressionEvaluator(labelCol='target', predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(test_pred_spark)

In [0]:
save_model(save, "mllib", cluster_config, end-start, mae, train_spark.count(), None, None, model, mlflow.spark)

#Grid search

In [0]:
cv = 10
parameters = {'n_estimators': [10, 100, 200], 'max_depth': [5, 10, 15], 'random_state':[42]}

## Grid search with cross validation - sklearn

In [0]:
from sklearn.model_selection import GridSearchCV

In [0]:
rf = RandomForestRegressor()
gs = GridSearchCV(rf, parameters, cv=cv, scoring='neg_mean_absolute_error')
start = time.time()
gs.fit(X_train, y_train)
end = time.time()
end - start

save_model(save, "sklearn-grid", cluster_config, end-start, abs(gs.best_score_), X_train.shape[0], cv, None, gs.best_estimator_, mlflow.sklearn)

In [0]:
#pd.DataFrame(gs.cv_results_)

## Grid search with parallelism

In [0]:
parallelism = 4

In [0]:
from sklearn.utils import parallel_backend
!pip install joblibspark
from joblibspark import register_spark

In [0]:
register_spark() # register spark backend

start = time.time()
with parallel_backend("spark", n_jobs=parallelism):
    gs.fit(X_train, y_train)
end = time.time()

In [0]:
gs.best_params_

In [0]:
save_model(save, "sklearn-grid-parallel", cluster_config, end-start, abs(gs.best_score_), X_train.shape[0], cv, parallelism, gs.best_estimator_, mlflow.sklearn)

##Grid search with cross validation - MLlib

In [0]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [0]:
rf = RandomForestRegressorSpark()
pipeline = Pipeline(stages = [assembler, rf])
evaluator = RegressionEvaluator(labelCol='label', predictionCol="prediction", metricName="mae")

paramGrid = ParamGridBuilder()\
    .addGrid(rf.maxDepth, parameters['max_depth'])\
    .addGrid(rf.numTrees, parameters['n_estimators'])\
    .addGrid(rf.seed, parameters['random_state'])\
    .build()

cvSpark = CrossValidator(
    estimator=pipeline, 
    evaluator=evaluator,
    estimatorParamMaps=paramGrid,
    numFolds=cv,
    parallelism=parallelism,
    seed=seed
)

#pipeline = Pipeline(stages = [assembler, rf, cvSpark])

start = time.time()
cvModel = cvSpark.fit(train_spark)
end = time.time()

In [0]:
cvModel.getEstimatorParamMaps()[ np.argmin(cvModel.avgMetrics) ].values()

In [0]:
save_model(save, "mllib-grid-parallel", cluster_config, end-start, min(cvModel.avgMetrics), train_spark.count(), cv, parallelism, cvModel.bestModel, mlflow.spark)

# H20

https://pypi.org/project/h2o-pysparkling-3.1/

http://docs.h2o.ai/sparkling-water/3.1/latest-stable/doc/pysparkling.html

https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/drf.html

In [0]:
from pysparkling import *
import h2o
from h2o.estimators.random_forest import H2ORandomForestEstimator

In [0]:
import mlflow.h2o  # https://www.mlflow.org/docs/latest/python_api/mlflow.h2o.html

In [0]:
hc = H2OContext.getOrCreate();

In [0]:
train_h2o = h2o.H2OFrame(train)
test_h2o = h2o.H2OFrame(test)

In [0]:
rf_h2o = H2ORandomForestEstimator(
    ntrees=default_params['n_estimators'],
    max_depth=default_params['max_depth'],
    seed=seed
)

start = time.time()
rf_h2o.train(list(X_train.columns), 'target', training_frame=train_h2o, validation_frame=test_h2o)
end = time.time()

In [0]:
#rf_h2o.varimp()

In [0]:
save_model(save, "h2o", cluster_config, end-start, rf_h2o.mae(valid=True), train_h2o.nrow, None, None, rf_h2o, spark.h2o)

# Grid search with H2O

In [0]:
from h2o.grid.grid_search import H2OGridSearch

In [0]:
parameters_h2o = parameters.copy()
parameters_h2o['ntrees'] = parameters_h2o['n_estimators']
parameters_h2o['seed'] = parameters_h2o['random_state']
del parameters_h2o['n_estimators']
del parameters_h2o['random_state']

parameters_h2o, parameters

In [0]:
gs_h2o = H2OGridSearch(
    model=H2ORandomForestEstimator,
    hyper_params=parameters_h2o,
    parallelism=parallelism,
   # search_criteria={'strategy': 'Cartesian', 'stopping_metric': 'mae'}
)

start = time.time()
gs_h2o.train(list(X_train.columns), 'target', training_frame=train_h2o, seed=seed)
end = time.time()

best_gs_h2o = gs_h2o.get_grid(sort_by='mae', decreasing=False).models[0]

In [0]:
best_gs_h2o.params['ntrees']

In [0]:
save_model(save, "h2o-grid", cluster_config, end-start, best_gs_h2o.mae(test_h2o), train_h2o.nrow, cv, parallelism, best_gs_h2o , mlflow.h2o)

# Koalas vs Pandas

https://koalas.readthedocs.io/en/latest/

In [0]:
import databricks.koalas as ks

In [0]:
train_cat = train.copy()
train_cat['AGE_CAT'] = pd.cut(train_cat['AGE'], bins=3, labels=['one', 'two', 'three']).astype(str)

In [0]:
train_ks = ks.from_pandas(train_cat)
test_ks = ks.from_pandas(test)

In [0]:
train_ks.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target,AGE_CAT
477,15.0234,0.0,18.1,0.0,0.614,5.304,97.3,2.1007,24.0,666.0,20.2,349.48,24.91,12.0,three
15,0.62739,0.0,8.14,0.0,0.538,5.834,56.5,4.4986,4.0,307.0,21.0,395.62,8.47,19.9,two
332,0.03466,35.0,6.06,0.0,0.4379,6.031,23.3,6.6407,1.0,304.0,16.9,362.25,7.83,19.4,one
423,7.05042,0.0,18.1,0.0,0.614,6.103,85.1,2.0218,24.0,666.0,20.2,2.52,23.29,13.4,three
19,0.7258,0.0,8.14,0.0,0.538,5.727,69.5,3.7965,4.0,307.0,21.0,390.95,11.28,18.2,three


In [0]:
ks.sql("SELECT * FROM {train_ks} WHERE AGE > 80").shape

In [0]:
train_ks.groupby(['AGE_CAT']).count()

Unnamed: 0_level_0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
AGE_CAT,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
two,95,95,95,95,95,95,95,95,95,95,95,95,95,95
one,72,72,72,72,72,72,72,72,72,72,72,72,72,72
three,237,237,237,237,237,237,237,237,237,237,237,237,237,237


In [0]:
train_cat.groupby(['AGE_CAT']).count()

Unnamed: 0_level_0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
AGE_CAT,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
one,72,72,72,72,72,72,72,72,72,72,72,72,72,72
three,237,237,237,237,237,237,237,237,237,237,237,237,237,237
two,95,95,95,95,95,95,95,95,95,95,95,95,95,95


# Użyteczne transformacje

In [0]:
train_cat_spark = spark.createDataFrame(train_cat)

## Window

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as f

In [0]:
window = Window.partitionBy('AGE_CAT')

train_cat_spark\
    .withColumn('avg_age', f.rank().over(window.orderBy(f.asc('AGE'))))\
    .display()

CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target,AGE_CAT,avg_age
0.12744,0.0,6.91,0.0,0.448,6.77,2.9,5.7209,3.0,233.0,17.9,385.41,4.84,26.6,one,1
0.07896,0.0,12.83,0.0,0.437,6.273,6.0,4.2515,5.0,398.0,18.7,394.92,6.78,24.1,one,2
0.15936,0.0,6.91,0.0,0.448,6.211,6.5,5.7209,3.0,233.0,17.9,394.46,7.44,24.7,one,3
0.1415,0.0,6.91,0.0,0.448,6.169,6.6,5.7209,3.0,233.0,17.9,383.37,5.81,25.3,one,4
0.08221,22.0,5.86,0.0,0.431,6.957,6.8,8.9067,7.0,330.0,19.1,386.09,3.53,29.6,one,5
0.12757,30.0,4.93,0.0,0.428,6.393,7.8,7.0355,6.0,300.0,16.6,374.71,5.19,23.7,one,6
0.36894,22.0,5.86,0.0,0.431,8.259,8.4,8.9067,7.0,330.0,19.1,396.9,3.54,42.8,one,7
0.21409,22.0,5.86,0.0,0.431,6.438,8.9,7.3967,7.0,330.0,19.1,377.07,3.59,24.8,one,8
0.28955,0.0,10.59,0.0,0.489,5.412,9.8,3.5875,4.0,277.0,18.6,348.93,29.55,23.7,one,9
0.02187,60.0,2.93,0.0,0.401,6.8,9.9,6.2196,1.0,265.0,15.6,393.37,5.03,31.1,one,10


## ApplyInPandas

In [0]:
from sklearn.model_selection import cross_val_score

In [0]:
from pyspark.sql.types import *

In [0]:
def train_rf(data):
    rf = RandomForestRegressor()
    
    y_train = data['target']
    X_train = data.drop(['target', 'AGE_CAT'], axis=1)
    
    mae = cross_val_score(rf, X_train, y_train, cv=cv, scoring='neg_mean_absolute_error')
    print(mae)
    return pd.DataFrame({'fold': range(cv), 'mae': mae, 'age': data['AGE_CAT'].max()})

In [0]:
train_rf(train_cat)

Unnamed: 0,fold,mae,age
0,0,-1.913341,two
1,1,-2.455976,two
2,2,-1.977463,two
3,3,-2.933878,two
4,4,-3.32985,two
5,5,-2.072425,two
6,6,-2.9879,two
7,7,-2.037375,two
8,8,-1.728975,two
9,9,-2.578025,two


In [0]:
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.types.StructField.html
schema = StructType([
    StructField('fold', IntegerType(), True),
    StructField('mae', DoubleType(), True),
    StructField('age', StringType(), True),
])  

In [0]:
res = train_cat_spark\
    .groupBy('AGE_CAT')\
    .applyInPandas(train_rf, schema)

res.display()

fold,mae,age
0,-1.5386249999999992,one
1,-1.377750000000002,one
2,-0.9377142857142812,one
3,-2.7218571428571416,one
4,-2.5819999999999985,one
5,-3.3784285714285622,one
6,-1.604000000000004,one
7,-1.8562857142857168,one
8,-1.0334285714285711,one
9,-1.7900000000000031,one


In [0]:
res.display()

fold,mae,age
0,-1.669250000000003,one
1,-1.4276249999999977,one
2,-0.993142857142854,one
3,-2.530999999999994,one
4,-2.6645714285714304,one
5,-3.2897142857142745,one
6,-1.966428571428572,one
7,-1.862999999999997,one
8,-0.9841428571428537,one
9,-1.9045714285714264,one


In [0]:
res.cache()

In [0]:
res.display()

fold,mae,age
0,-3.0482000000000005,two
1,-1.2151999999999998,two
2,-1.8446999999999965,two
3,-1.558000000000003,two
4,-2.8642000000000007,two
5,-3.792333333333332,two
6,-3.670555555555557,two
7,-1.595888888888892,two
8,-2.259,two
9,-1.886444444444441,two


In [0]:
res.display()

fold,mae,age
0,-3.0482000000000005,two
1,-1.2151999999999998,two
2,-1.8446999999999965,two
3,-1.558000000000003,two
4,-2.8642000000000007,two
5,-3.792333333333332,two
6,-3.670555555555557,two
7,-1.595888888888892,two
8,-2.259,two
9,-1.886444444444441,two


In [0]:
res\
    .groupBy('age')\
    .pivot('fold')\
    .agg(f.avg('mae').alias('mae'))\
    .display()

age,0,1,2,3,4,5,6,7,8,9
two,-3.0482000000000005,-1.2151999999999998,-1.8446999999999965,-1.558000000000003,-2.8642000000000007,-3.792333333333332,-3.670555555555557,-1.595888888888892,-2.259,-1.886444444444441
one,-1.514000000000001,-1.445250000000009,-1.0128571428571402,-2.5741428571428533,-2.873428571428573,-3.351857142857132,-1.9650000000000036,-1.626000000000006,-1.0804285714285666,-1.91542857142857
three,-2.334500000000001,-3.242791666666669,-2.641666666666667,-3.227416666666669,-3.3081250000000004,-2.1706666666666723,-3.0510000000000006,-2.0947391304347867,-2.0118260869565225,-3.956565217391303


## Reduce

In [0]:
from pyspark.sql import DataFrame # https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html?highlight=datafram

In [0]:
from functools import reduce

In [0]:
train, test = train_test_split(boston_pd, test_size=0.2, random_state=42)

In [0]:
train_spark = spark.createDataFrame(train)
test_spark = spark.createDataFrame(test)

In [0]:
train_array = [train_spark, test_spark]
all_data = reduce(DataFrame.unionAll, train_array)
all_data.count()

In [0]:
all_data.toPandas().head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
0,15.0234,0.0,18.1,0.0,0.614,5.304,97.3,2.1007,24.0,666.0,20.2,349.48,24.91,12.0
1,0.62739,0.0,8.14,0.0,0.538,5.834,56.5,4.4986,4.0,307.0,21.0,395.62,8.47,19.9
2,0.03466,35.0,6.06,0.0,0.4379,6.031,23.3,6.6407,1.0,304.0,16.9,362.25,7.83,19.4
3,7.05042,0.0,18.1,0.0,0.614,6.103,85.1,2.0218,24.0,666.0,20.2,2.52,23.29,13.4
4,0.7258,0.0,8.14,0.0,0.538,5.727,69.5,3.7965,4.0,307.0,21.0,390.95,11.28,18.2


## UDF

In [0]:
import math

In [0]:
@udf(returnType=DoubleType()) 
def udf_sqrt(x):
    return math.sqrt(x)

In [0]:
all_data.withColumn('x2', udf_sqrt('TAX')).limit(2).display()

CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target,x2
15.0234,0.0,18.1,0.0,0.614,5.304,97.3,2.1007,24.0,666.0,20.2,349.48,24.91,12.0,25.80697580112788
0.62739,0.0,8.14,0.0,0.538,5.834,56.5,4.4986,4.0,307.0,21.0,395.62,8.47,19.9,17.52141546793523


In [0]:
all_data.withColumn('x2', f.sqrt('TAX')).limit(2).display()

CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target,x2
15.0234,0.0,18.1,0.0,0.614,5.304,97.3,2.1007,24.0,666.0,20.2,349.48,24.91,12.0,25.80697580112788
0.62739,0.0,8.14,0.0,0.538,5.834,56.5,4.4986,4.0,307.0,21.0,395.62,8.47,19.9,17.52141546793523


In [0]:
all_data.write.mode('overwrite').parquet('/FileStore/test.parquet')

# https://community.cloud.databricks.com/?o=2097898880805746#tables/new/dbfs
# https://mungingdata.com/apache-spark/partitionby/#:~:text=partitionBy()%20is%20a%20DataFrameWriter,important%20independent%20of%20disk%20partitioning.

## Expanding

In [0]:
train_cat_spark\
    .withColumn('age_q', f.expr('percentile(AGE, array(0.0, 0.5, 1.0))').over(Window.partitionBy('AGE_CAT')))\
    .withColumn("q", f.array(f.lit(0), f.lit(0.5), f.lit(1.0)))\
    .withColumn("zipped", f.arrays_zip(*(["q"] + ['age_q'])))\
    .drop(*(["q"]))\
    .withColumn("zipped", f.explode("zipped"))\
    .select("AGE_CAT", "zipped.*").drop("zipped")\
    .dropDuplicates()\
    .orderBy('AGE_CAT', 'q')\
    .display()

AGE_CAT,q,age_q
one,0.0,2.9
one,0.5,22.6
one,1.0,34.9
three,0.0,67.8
three,0.5,91.6
three,1.0,100.0
two,0.0,35.7
two,0.5,52.9
two,1.0,67.2


#Other languages

## Scala

In [0]:
%scala

var data_spark = spark.read.parquet("/FileStore/test.parquet")

display(data_spark)

CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
0.33147,0.0,6.2,0.0,0.507,8.247,70.4,3.6519,8.0,307.0,17.4,378.95,3.95,48.3
0.3692,0.0,9.9,0.0,0.544,6.567,87.3,3.6023,4.0,304.0,18.4,395.69,9.28,23.8
2.24236,0.0,19.58,0.0,0.605,5.854,91.8,2.422,5.0,403.0,14.7,395.11,11.64,22.7
0.32264,0.0,21.89,0.0,0.624,5.942,93.5,1.9669,4.0,437.0,21.2,378.25,16.9,17.4
0.04666,80.0,1.52,0.0,0.404,7.107,36.6,7.309,2.0,329.0,12.6,354.31,8.61,30.3
0.66351,20.0,3.97,0.0,0.647,7.333,100.0,1.8946,5.0,264.0,13.0,383.29,7.79,36.0
0.57529,0.0,6.2,0.0,0.507,8.337,73.3,3.8384,8.0,307.0,17.4,385.91,2.47,41.7
0.17134,0.0,10.01,0.0,0.547,5.928,88.2,2.4631,6.0,432.0,17.8,344.91,15.76,18.3
0.06899,0.0,25.65,0.0,0.581,5.87,69.7,2.2577,2.0,188.0,19.1,389.15,14.37,22.0
0.07244,60.0,1.69,0.0,0.411,5.884,18.5,10.7103,4.0,411.0,18.3,392.33,7.79,18.6


## R

https://spark.apache.org/docs/latest/sparkr.html#:~:text=SparkR%20is%20an%20R%20package,dplyr)%20but%20on%20large%20datasets.

In [0]:
%r
library(SparkR)

In [0]:
%r
data_r <- read.parquet("/FileStore/test.parquet")
head(data_r)

In [0]:
%r
head(select(data_r, "CRIM"))

## SQL

In [0]:
%sql
CREATE DATABASE test

In [0]:
all_data.write.saveAsTable("test.boston")

In [0]:
sql('select * from test.boston').display()

CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
0.33147,0.0,6.2,0.0,0.507,8.247,70.4,3.6519,8.0,307.0,17.4,378.95,3.95,48.3
0.3692,0.0,9.9,0.0,0.544,6.567,87.3,3.6023,4.0,304.0,18.4,395.69,9.28,23.8
2.24236,0.0,19.58,0.0,0.605,5.854,91.8,2.422,5.0,403.0,14.7,395.11,11.64,22.7
0.32264,0.0,21.89,0.0,0.624,5.942,93.5,1.9669,4.0,437.0,21.2,378.25,16.9,17.4
0.04666,80.0,1.52,0.0,0.404,7.107,36.6,7.309,2.0,329.0,12.6,354.31,8.61,30.3
0.66351,20.0,3.97,0.0,0.647,7.333,100.0,1.8946,5.0,264.0,13.0,383.29,7.79,36.0
0.57529,0.0,6.2,0.0,0.507,8.337,73.3,3.8384,8.0,307.0,17.4,385.91,2.47,41.7
0.17134,0.0,10.01,0.0,0.547,5.928,88.2,2.4631,6.0,432.0,17.8,344.91,15.76,18.3
0.06899,0.0,25.65,0.0,0.581,5.87,69.7,2.2577,2.0,188.0,19.1,389.15,14.37,22.0
0.07244,60.0,1.69,0.0,0.411,5.884,18.5,10.7103,4.0,411.0,18.3,392.33,7.79,18.6


In [0]:
# %sql
# drop DATABASE asdf

# Joins

## Cross join

In [0]:
parameters = {'n_estimators': [10, 100, 200], 'max_depth': [5, 10, 15], 'random_state':[42]}

In [0]:
from pyspark.sql.types import FloatType
import pandas as pd

params_spark = spark.createDataFrame(pd.DataFrame({'n_estimators': [10, 100, 200]}))
params_spark.display()

n_estimators
10
100
200


In [0]:
sql('select * from test.boston').crossJoin(params_spark).count()

## Join

In [0]:
data = sql('select * from test.boston').withColumn('id', f.monotonically_increasing_id())

data.select('id', 'CRIM').join(data.select('id', 'PTRATIO').limit(2), on='id', how='left').display()

id,CRIM,PTRATIO
0,0.33147,17.4
1,0.3692,18.4
2,2.24236,
3,0.32264,
4,0.04666,
5,0.66351,
6,0.57529,
7,0.17134,
8,0.06899,
9,0.07244,


In [0]:
%scala
import org.apache.spark.sql.functions._ 

var data = sql("select * from test.boston").withColumn("id", monotonicallyIncreasingId)

var subset1 = data.select("id", "CRIM")
var subset2 = data.select("id", "PTRATIO").limit(2)

display(subset1.join(subset2, subset1("id") === subset2("id"), "inner"))

id,CRIM,id.1,PTRATIO
0,0.33147,0,17.4
1,0.3692,1,18.4


#Hyperopt

https://docs.databricks.com/_static/notebooks/hyperopt-spark-mlflow.html

In [0]:
from hyperopt import fmin, tpe, rand, hp, SparkTrials, Trials, STATUS_OK

In [0]:
# function to minimize
def objective(C):
    rf = RandomForestRegressor(C)
    
    mae = cross_val_score(rf, X_train, y_train, cv=cv, scoring='neg_mean_absolute_error').mean()
    
    return {'loss': -mae, 'status': STATUS_OK}

# search space
values = [5, 10, 50, 100, 500]
search_space = hp.choice('C', values)
# rand
algo=rand.suggest

In [0]:
# argmin = fmin(
#   fn=objective,
#   space=search_space,
#   algo=algo,
#   max_evals=3
# )
# values[argmin['C']]

In [0]:
spark_trials = SparkTrials(parallelism=4)
 
with mlflow.start_run():
    argmin = fmin(
        fn=objective,
        space=search_space,
        algo=algo,
        max_evals=3,
        trials=spark_trials
    )
# Print the best value found for C
print("Best value found: ", values[argmin['C']])

In [0]:
help(SparkTrials)