In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()
spark

In [3]:
df_kids = spark.read.parquet('../data/kickstarter.parquet')
print(f'Number of lines: {df_kids.count()}')

Number of lines: 107615


In [4]:
df_kids.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- goal: double (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country_clean: string (nullable = true)
 |-- currency_clean: string (nullable = true)
 |-- deadline_clean: date (nullable = true)
 |-- created_at_clean: date (nullable = true)
 |-- launched_at_clean: date (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |-- hours_prepa: double (nullable = true)
 |-- text: string (nullable = true)



In [7]:
df_kids.select('goal', 'days_campaign', 'hours_prepa').describe().show()

+-------+-----------------+------------------+-----------------+
|summary|             goal|     days_campaign|      hours_prepa|
+-------+-----------------+------------------+-----------------+
|  count|           107615|            107615|           107615|
|   mean|36839.03487571435| 34.17846954420852|905.8462699437829|
| stddev|974215.3015402365|12.953461603195978|2091.017693399736|
|    min|             0.01|                 1|             0.04|
|    max|            1.0E8|                92|          45691.1|
+-------+-----------------+------------------+-----------------+



In [30]:
numerical_columns = ['days_campaign', 'hours_prepa', 'goal']
categorical_columns = ['country_clean', 'currency_clean']
features = numerical_columns + categorical_columns

label = 'final_status'

df = df_kids.select(features + [label])
df.show()

+-------------+-----------+-------+-------------+--------------+------------+
|days_campaign|hours_prepa|   goal|country_clean|currency_clean|final_status|
+-------------+-----------+-------+-------------+--------------+------------+
|           90|        1.1|25000.0|           US|           USD|           0|
|           29|      41.62| 1000.0|           US|           USD|           1|
|           28|       18.7| 2200.0|           US|           USD|           0|
|           47|      75.69|10000.0|           US|           USD|           0|
|           45|     457.74| 5000.0|           US|           USD|           0|
|           45|     380.94|10000.0|           US|           USD|           1|
|           46|     814.58|18000.0|           US|           USD|           1|
|           61|      58.93| 2500.0|           US|           USD|           0|
|           40|       2.16| 7500.0|           US|           USD|           1|
|           44|     191.41| 2500.0|           US|           USD|

In [31]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, OneHotEncoderEstimator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [32]:
def build_string_indexer(col_name: str) -> StringIndexer:
    return StringIndexer().setInputCol(col_name).setOutputCol(f'{col_name}_indexed').setHandleInvalid("keep")

def build_one_hot_encoder(input_cols: str) -> OneHotEncoderEstimator:
    output_cols = [f'{c}_encoded' for c in input_cols]
    return OneHotEncoderEstimator().setInputCols(input_cols).setOutputCols(output_cols)

indexing_stages = [build_string_indexer(c) for c in categorical_columns]
indexed_columns = [s.getOutputCol() for s in indexing_stages]
ohe = build_one_hot_encoder(indexed_columns)
encoding_stages = [ohe]

In [33]:
vector_assembler = VectorAssembler()\
    .setInputCols(numerical_columns + ohe.getOutputCols())\
    .setOutputCol('features')

In [38]:
lr = LogisticRegression()\
    .setMaxIter(10)\
    .setFeaturesCol(vector_assembler.getOutputCol())\
    .setLabelCol('final_status')

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier().setFeaturesCol(vector_assembler.getOutputCol()).setLabelCol('final_status')
model_specs = Pipeline().setStages(indexing_stages + encoding_stages + [vector_assembler] + [gbt])

In [39]:
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import mlflow
from mlflow.spark import log_model

df_train, df_test = df.randomSplit([0.8, 0.2], seed=12345)
df_train = df_train.cache()

evaluator = BinaryClassificationEvaluator()\
    .setMetricName('areaUnderROC')\
    .setRawPredictionCol('rawPrediction')\
    .setLabelCol('final_status')

mlflow_tracking_ui = 'http://35.246.84.226'
mlflow_experiment_name = 'kickstarter'

mlflow.set_tracking_uri(mlflow_tracking_ui)
mlflow.set_experiment(experiment_name=mlflow_experiment_name)

with mlflow.start_run() as active_run:
    print(f'Fitting model on {df_train.count()} lines')

    model = model_specs.fit(df_train)

    print('Evaluating model')
    train_metrics = evaluator.evaluate(model.transform(df_train))
    metrics = {'train_auc': train_metrics}

    test_metrics = evaluator.evaluate(model.transform(df_test))
    metrics.update({'test_auc': test_metrics})
    print(metrics)
    
#     print('Logging to mlflow')
#     mlflow.log_params({'model_class': 'logistic regression', 'lr_max_iter': lr.getMaxIter()})
#     mlflow.log_metrics(metrics)
#     log_model(model, 'model')

Fitting model on 86092 lines
Evaluating model
{'train_auc': 0.677942142468496, 'test_auc': 0.6767387585283429}
