In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import numpy as np
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [3]:
spark = SparkSession.builder.appName('APP_LOGISTIC_REGRESSION').getOrCreate()
data_train = spark.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .load("gs://ds-data/lg.csv")

data_test = spark.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .load("gs://ds-data/notebooks/jonap/data/prueba.csv")

In [4]:
def print_metrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    accuracy = metrics.accuracy
    print("Summary Stats")
    print("Accuracy = %s" % accuracy)
    print('Confusion Matrix\n', metrics.confusionMatrix()) 

In [12]:
def df_train(df):
    df_rows = df.select(
        df.id_pasajero,
        df.manana.cast("Double"),
        df.noche.cast("Double"), 
        df.tarde.cast("Double"), 
        df.juerga.cast("Double"),
        df.semana.cast("Double"), 
        df.fin_de_semana.cast("Double"),
        df.monto.cast("Double"),
        df.distancia.cast("Double"),
        df.tiempo_entre_viajes.cast("Double"),
        df.desviaciones_rutas.cast("Double"), 
        df.desviaciones_horarios.cast("Double"), 
        df.desviaciones_dias_semana.cast("Double"),  
        df.forma_uso.cast("Integer").alias("label")
    )
    #data selections
    feature_columns = df_rows.columns[1:-1]
    
    #data preparations
    assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
    stages = [assembler]
    label_stringIdx = StringIndexer(inputCol= 'label', outputCol='label_index')
    stages += [label_stringIdx]
    
    #data pipeline
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df_rows)
    df_features = pipelineModel.transform(df_rows)
    
    #data partitions
    (trainingData, testData) = df_features.randomSplit([0.8, 0.2])
    
    #data models
    Classifier= LogisticRegression(labelCol="label_index", featuresCol="features")
    model = Classifier.fit(trainingData)
    
    #data test
    predictions = model.transform(testData)
    
    # Evaluate model
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", 
                                              labelCol='label_index')
    accuracy = evaluator.evaluate(predictions)
    print('F1 Accuracy: %f' % accuracy)
    paramGrid = (ParamGridBuilder()
                 .addGrid(Classifier.regParam, [0.1, 0.01])
                 .addGrid(Classifier.elasticNetParam, [0.0, 0.5, 1.0])
                 .addGrid(Classifier.maxIter, [1, 10, 100])
                 .build())

     # Create 5-fold CrossValidator
    crosval = CrossValidator(estimator=Classifier, estimatorParamMaps=paramGrid, 
                             evaluator=evaluator, numFolds=5)
    crosval_Model = crosval.fit(trainingData)
    crosval_predictions = crosval_Model.transform(testData)
    crosval_accuracy = crosval_Model.getEvaluator().evaluate(crosval_predictions)
    print('F1 Accuracy crossval: %f' % crosval_accuracy)
    predictions_and_labels = crosval_predictions.select("prediction", "label_index").rdd.map(lambda r: (float(r[0]), float(r[1])))
    print_metrics(predictions_and_labels)
    return crosval_Model
    

In [16]:
def df_test(model, df):
    df_rows = df.select(
        df.id_pasajero,
        df.manana.cast("Double"),
        df.noche.cast("Double"), 
        df.tarde.cast("Double"), 
        df.juerga.cast("Double"),
        df.semana.cast("Double"), 
        df.fin_de_semana.cast("Double"),
        df.monto.cast("Double"),
        df.distancia.cast("Double"),
        df.tiempo_entre_viajes.cast("Double"),
        df.desviaciones_rutas.cast("Double"), 
        df.desviaciones_horarios.cast("Double"), 
        df.desviaciones_dias_semana.cast("Double")
    )
    #data selections
    feature_columns = df_rows.columns[1:]
    
    #data preparationsg
    assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
    stages = [assembler]
    
    #data pipeline
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df_rows)
    df_features = pipelineModel.transform(df_rows)
    
    #adding columns empty
    df_features = df_features.withColumn('label_index', F.lit(None).cast(IntegerType()))
    
    #data partitions
    testData = df_features
    
    #data predictions
    predictions = model.transform(testData)
    return predictions

In [14]:
model = df_train(data_train)

F1 Accuracy: 0.766211
F1 Accuracy crossval: 0.760960
Summary Stats
Accuracy = 0.6777041942604857
Confusion Matrix
 DenseMatrix([[ 210.,   25.],
             [ 121.,   97.]])


In [17]:
result = df_test(model, data_test)

In [19]:
result.select("id_pasajero", "rawPrediction","probability","prediction").toPandas()

Unnamed: 0,id_pasajero,rawPrediction,probability,prediction
0,57eebdf58349d848478bfddc,"[-2.50135794925, 2.50135794925]","[0.0757630375449, 0.924236962455]",1.0
1,582bd95622178d495e8b456c,"[-1.334025444, 1.334025444]","[0.208494289121, 0.791505710879]",1.0
2,5579b13aa27819787b8b4568,"[0.418254219026, -0.418254219026]","[0.603065424433, 0.396934575567]",0.0
3,578f76360fa0c02a7f8b4569,"[0.185269709921, -0.185269709921]","[0.546185394043, 0.453814605957]",0.0
4,52ebc5578078191d77000000,"[0.195023056856, -0.195023056856]","[0.54860181834, 0.45139818166]",0.0
