In [1]:
En la actividad se pide que se importen los dos ficheros, cs_test y cs_training y asi lo realizaremos. Sin embargo, para entrenar y testar el modelo partiremos exclusivamente de cs_training ya que cs_test no tiene la variable objetivo informada.

In [2]:
# Loading CSV files from DBFS into RDDs in cluster memory

cs_test = sc.textFile('/FileStore/tables/0itamgtk1506008427622/cs_test-31b97.csv')
cs_training = sc.textFile('/FileStore/tables/94yylx7v1506008660804/cs_training-d35cb.csv')

In [3]:
# See what we've got in the RDDs

print('--- Test:')
print(cs_test.take(4))
print('--- Training:')
print(cs_training.take(4))

In [4]:
# Getting the Spark SQL context and imports
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext.getOrCreate(sc.getOrCreate())

In [5]:
# Reading CSV to a df, infering the schema
testDatawithoutlabels = sqlContext.read.format("com.databricks.spark.csv") \
  .option("header", "true").option("delimiter",",").option("inferschema", "true") \
  .load("/FileStore/tables/0itamgtk1506008427622/cs_test-31b97.csv")
  
trainingData = sqlContext.read.format("com.databricks.spark.csv") \
  .option("header", "true").option("delimiter",",").option("inferschema", "true") \
  .load("/FileStore/tables/94yylx7v1506008660804/cs_training-d35cb.csv")


In [6]:
# Check schema
print('--- Test:')
testDatawithoutlabels.printSchema()
print('--- Training:')
trainingData.printSchema()



In [8]:
#even though the columns MonthlyIncome and NumberOfDependents are numeric they are infered as as strings.
# The following call takes all columns (df.columns) and casts them using Spark SQL to a numeric type (DoubleType).
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
testDatawithoutlabels = testDatawithoutlabels.select([col(c).cast("double").alias(c) for c in testDatawithoutlabels.columns])
testDatawithoutlabels.printSchema()





In [9]:
#even though the columns MonthlyIncome and NumberOfDependents are numeric they are infered as as strings.
# The following call takes all columns (df.columns) and casts them using Spark SQL to a numeric type (DoubleType).
trainingData = trainingData.select([col(c).cast("double").alias(c) for c in trainingData.columns])
trainingData.printSchema()

In [10]:
# Check data
display(trainingData) 

In [11]:
trainingData.count()


In [12]:
testDatawithoutlabels.count()

In [13]:
 # drop rows with missing values
trainingData = trainingData.dropna()
trainingData.count()




In [14]:
display(trainingData)

In [15]:
#realizamos consultas SQL para ver la calidad de los datos
trainingData.registerTempTable("trainingData")
trainingData.count()

In [16]:
%sql select min(age), max(age) from trainingData

In [17]:
%sql select count(*) from trainingData where age <18

In [18]:
#eliminamos registro con cero años
trainingData=sqlContext.sql("SELECT * FROM trainingData WHERE age >18")

In [19]:
trainingData.count()


In [21]:
testDatawithoutlabels2 = testDatawithoutlabels.drop("SeriousDlqin2yrs")
display(testDatawithoutlabels2)


In [22]:
testDatawithoutlabels2 = testDatawithoutlabels2.dropna()
testDatawithoutlabels2.count()

In [23]:
display(testDatawithoutlabels2)

In [24]:
#We are going to use trainingData, and build from this dataframe the test data. The original test_cs data does not have label column to verify #the algorithm accuracy

df=trainingData
# Split the dataset randomly into 70% for training and 30% for testing.
train, test = df.randomSplit([0.7, 0.3])
print "We have %d training examples and %d test examples." % (train.count(), test.count())

In [25]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('SeriousDlqin2yrs')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [26]:
# We are going to use the GBTRegressor algorithm. Gradient boosting is a machine learning technique for regression and classification problems, #which produces a prediction model in the form of an ensemble of weak prediction models, typically decision trees. It builds the model in a #stage-wise fashion like other boosting methods do, and it generalizes them by allowing optimization of an arbitrary differentiable loss #cfunction.
from pyspark.ml.regression import GBTRegressor
# Takes the "features" column and learns to predict "cnt"
gbt = GBTRegressor(labelCol="SeriousDlqin2yrs")

In [27]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small.  In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [28]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [29]:
pipelineModel = pipeline.fit(train)

In [30]:

predictions = pipelineModel.transform(test)


In [31]:
display(predictions.select("SeriousDlqin2yrs", "prediction", *featuresCols))


In [32]:
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse

#Lower values of RMSE show a better fit