In [1]:
from pyspark.sql import SQLContext, HiveContext
sqlc = SQLContext(sc)
hivec = HiveContext(sc)
print type(sqlc)
print type (hivec)

In [2]:
# Creacion de un dataframe de manera sintetica
iris_data = [(4.3,3.0,1.1,0.1,'Iris-setosa'), (7.0,3.2,4.7,1.4,'Iris-versicolor'), (6.9,3.2,5.7,2.3,'Iris-virginica')]
print iris_data
df = sqlContext.createDataFrame(iris_data,['sepal_length','sepal_width','petal_length','petal_width','specie'])
df.show()
df.printSchema()
df.collect()

In [3]:
# Creacion de un dataframe de Spark a partir de un dataframe de Pandas
spark_df = sqlContext.createDataFrame(pandas_df)


In [4]:
# Crear un dataframe a partir de un RDD usando reflection
from pyspark.sql import Row
# Cargamos el fichero de texto con el dataset de iris y lo convertimos en un RDD
lineasRDD = sc.textFile('/FileStore/tables/vvlm20gs1474462720233/iris.csv')
# Convertimos cada línea en una fila del futuro dataframe
elementosRDD = lineasRDD.map(lambda l: l.split(","))
# Metemos cada uno de los campos separados por coma en un objeto tipo fila (Row) y le damos un nombre
irisRDD = elementosRDD.map(lambda p: Row(sepal_length=float(p[0]), sepal_width=float(p[1]), petal_length=float(p[2]), \
          petal_width=float(p[3]), specie=p[4]))
# Creamos el Dataframe a partir del RDD
irisDF = sqlContext.createDataFrame(irisRDD)
irisDF.show(3)

In [5]:
# Crea un dataframe a partir de un RDD de manera programática
# Importamos los datatypes
from pyspark.sql.types import *

# Cargamos el rdd desde un archivo de texto
lineasRDD = sc.textFile('/FileStore/tables/vvlm20gs1474462720233/iris.csv')
# Convertimos cada línea en una fila del futuro dataframe
elementosRDD = lineasRDD.map(lambda l: l.split(","))
irisRDD = elementosRDD.map(lambda p: (float(p[0]), float(p[1]), float(p[2]), float(p[3]), p[4]))
# Definimos el nombre y tipo de campos que queremos para nuestro dataframe
fields = [StructField("sepal_length", FloatType(), True), StructField("sepal_width", FloatType(), True), \
          StructField("petal_length", FloatType(), True), StructField("petal_width", FloatType(), True), \
          StructField("specie", StringType(), True)]
#Definimos el esquema con los tipos de campos creados
schema = StructType(fields)

# Creamos el Dataframe a partir del RDD aplicando el esquema con los campos
irisDF = sqlContext.createDataFrame(irisRDD, schema)
irisDF.show(3)


In [6]:
# Ver la estructura de un DataFrame
irisDF.printSchema()

In [7]:
# Creación de un RDD a partir de un DataFrame
irisRDD = irisDF.select("sepal_length", "sepal_width","specie").rdd
print type(irisRDD)
irisRDD.take(3)

In [8]:
# Creación de un dataset a partir de un fichero json
#{"name":"Michael"}
#{"name":"Andy", "age":30}
#{"name":"Justin", "age":19}

# Importamos los datatypes
from pyspark.sql.types import *
jsonDF = sqlContext.read.format('json').load('/FileStore/tables/h9cow6461473884794846/people.json')
jsonDF.show()

# Tambien podemos forzar el tipo de datos en el esquema antes de leer el fichero
# Definicion del esquema
peopleSchema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
jsonStructDF = sqlContext.read.format('json').load('/FileStore/tables/h9cow6461473884794846/people.json', schema=peopleSchema)
jsonStructDF.show()

In [9]:

# Creación de un dataset a partir de un fichero csv usando spark-csv
from pyspark.sql.types import *

# Utilizando cabecera e infiriendo el tipo de los campos, schema
irisDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/howzjb5q1474385450651/iris_with_header-f677d.csv')
irisDfCsv.show(3)
# Utilizando cabecera y definiendo el schema apriori
# Definicion del esquema
irisSchema = StructType([ \
    StructField("sepal_length", FloatType(), True), \
    StructField("sepal_width", FloatType(), True), \
    StructField("petal_length", FloatType(), True), \
    StructField("petal_width", FloatType(), True), \
    StructField("specie", StringType(), True)])
irisDfCsvStruct = sqlContext.read.format('com.databricks.spark.csv').options(header='true') \
.load('/FileStore/tables/howzjb5q1474385450651/iris_with_header-f677d.csv', schema = irisSchema)

# display(irisDfCsvStruct)


In [10]:
# Correlacion entre columnas de un DataFrame
from pyspark.sql.functions import count
from pyspark.sql.functions import corr

print "Correlacion sepal_petal_length: {0}".format(irisDfCsv.corr("sepal_length", "petal_length"))
# irisDfCsv.agg(corr("sepal_length", "petal_length")).show()
irisDfCsv.select(corr("sepal_length", "petal_length").alias("corr_sepal_petal_length"), \
                corr("sepal_width", "petal_width").alias("corr_sepal_petal_width")).show()




In [11]:
# Reemplazo de ceros por otro valor
iris_dat = [(4.3,3.0, 0.0 ,0.1,'Iris-setosa'), (7.0,3.2,4.7,1.4,'Iris-versicolor'), (6.9,3.2,5.7, 0.0 ,'Iris-virginica')]
irisZero = sqlContext.createDataFrame(iris_dat,['sepal_length','sepal_width','petal_length','petal_width','specie'])
irisZero.show()
irisZero.na.replace(0.0, 1.0,["petal_length"]).show()

In [12]:
# Operaciones con join
iris_dat = [(4.3,3.0, 0.0 ,0.1,'Iris-setosa'), (7.0,3.2,4.7,1.4,'Iris-versicolor'), (6.9,3.2,5.7, 0.0 ,'Iris-virginica')]
iris_col = [('Iris-setosa', 'Naranja'), ('Iris-virginica', 'Violeta')]
# Creamos un DataFrame basado en iris
irisDf = sqlContext.createDataFrame(iris_dat,['sepal_length','sepal_width','petal_length','petal_width','specie'])
# Creamos otro Dataframe con el color de cada especie, excepto para versicolor
irisColDf = sqlContext.createDataFrame(iris_col, ['especie','color'])
#irisDf.show()
irisColDf.show()
# Hacemos un inner join
irisDf.join(irisColDf, irisDf.specie == irisColDf.especie, 'inner'). \
select('sepal_length', 'sepal_width','petal_length','petal_width','specie','color'). \
show()
# Hacemos un left outer join
irisDf.join(irisColDf, irisDf.specie == irisColDf.especie, 'left_outer'). \
select('sepal_length', 'sepal_width','petal_length','petal_width','specie','color'). \
show()

In [13]:
# Consultar tablas en HIVE
sqlContext.tables(dbName='test').show()
sqlContext.sql("USE default")
sqlContext.sql("SHOW TABLES").show()
irisFromHive = sqlContext.table("test.iris2")
irisFromHive.show(3)

In [14]:
# Interaccion con HIVE
# Creación de una tabla con comandos HIVE con Spark SQL
# Primero creamos la BBDD donde vamos a colocar la tabla
sqlContext.sql("CREATE DATABASE IF NOT EXISTS test")
sqlContext.sql("SHOW DATABASES").show()
# Creamos la tabla HIVE
sqlContext.sql("CREATE TABLE IF NOT EXISTS test.iris (sepal_length FLOAT, \
sepal_width FLOAT, petal_length FLOAT, petal_with FLOAT, specie STRING) \
ROW FORMAT DELIMITED \
FIELDS TERMINATED BY ',' \
STORED AS TEXTFILE")
# Vemos si la tabla se ha creado
sqlContext.sql("USE test")
sqlContext.sql("SHOW TABLES").show()
# Cargamos los datos en la tabla desde un fichero
sqlContext.sql("LOAD DATA INPATH '/FileStore/tables/ofv0n3w01474198366371/iris.csv' INTO TABLE test.iris")
sqlContext.sql("SELECT * FROM test.iris limit 5").show()

# Creación de un DataFrame a partir de una tabla HIVE
irisFromHive = sqlContext.table("test.iris2")

# Creación de una tabla HIVE a partir de un dataframe existente
irisDfCsv.write.saveAsTable('test.iris2')
sqlContext.sql("SELECT * FROM test.iris2 limit 5").show()

In [15]:
# Transformaciones: Actuando sobre las columnas de un dataframe
# Crear un dataframes solo con la columna specie
irisSpecie = irisDF.select('specie')
irisSpecie.show(2)
# Crear un dataframe con las columnas petal_length y especie
irisSpecie2 = irisDF.select(irisDF.petal_length,irisDF.specie)
irisSpecie2.show(2)
# Crear un dataframe en el que se sume uno al petal_length y se le cambie el nombre
irisSpecie3 = irisDF.select((irisDF.petal_length + 1.0).alias('petal_length+1'),irisDF.specie)
irisSpecie3.show(2)


In [16]:
# Crear un nuevo dataframe borrando una columna
irisSinSpecie = irisDF.drop(irisDF.specie)
irisSinSpecie.show(2)


In [17]:
# Selección de filas con filter
# Vemos el número de observaciones con la función count()
print("Numero de filas antes del filtrado "+str(irisDF.count()))
# Nos quedamos con las observaciones con sepal_length mayor que 5
irisFiltrado = irisDF.filter(irisDF.sepal_length > 5.0)
print("Numero de filas despues del filtrado "+str(irisFiltrado.count()))

In [18]:
# Selección de los registros distintos
# Vemos el número de observaciones con la función count()
print("Numero de filas totales "+str(irisDF.count()))
# Obtenemos las filas distintas
print("Numero de filas distintas "+str(irisDF.distinct().count()))


In [19]:
# Ordenacion de los dataframe
irisDF.sort(irisDF.specie, ascending=False).show(2)

In [20]:
# Expansion de un campo tipo array en varias filas
from pyspark.sql import Row
from pyspark.sql.functions import explode
data=[Row(specie="Virginica",sepal_length_list=[0.1,2.3,3.1,4.0])]
IrisDF=sqlContext.createDataFrame(data)
IrisDF.show()
IrisExplodeDF = IrisDF.select(IrisDF.specie, explode(IrisDF.sepal_length_list).alias("sepal_length"))
IrisExplodeDF.show()


In [21]:
# Operaciones de Group by. Funcion agg
from pyspark.sql.functions import avg
irisDFGroup = irisDF.groupBy(irisDF.specie)
irisDFGroupCount = irisDFGroup.agg({"*":"count"})
irisDFGroupCount.show()
# irisDFGroupAVG = irisDFGroup.agg({"sepal_length":"avg","sepal_width":"avg"})
# Obtenemos el mismo resultado con la nomenclatura tipo función:
irisDFGroupAVG = irisDFGroup.agg(avg("sepal_length"), avg("sepal_width"))
irisDFGroupAVG.show()

In [22]:
# Operaciones de Group by. Funciones count y avg
irisDFGroup = irisDF.groupBy(irisDF.specie)
irisDFGroup.count().show()
irisDFGroup.avg().show()
irisDFGroup.avg("sepal_length").show()
irisDF.groupBy().avg().show()


In [23]:
# Funciones definidas por el usuario UDF
from pyspark.sql.types import FloatType
# Definimos la udf indicando la función a aplicar, en este caso sumar 1 al argumento que se le pasa,
# También se le pasa el tipo del valor retornado
masUno = udf(lambda s: s + 1,FloatType())
# Aplicamos la funcion creada masUno al campo sepal_length para sumarle uno
irisDFMasUno= irisDF.select(irisDF.specie,masUno(irisDF.sepal_length).alias('sepal_length_mas_uno'))
irisDFMasUno.show(3)

In [24]:
# Acciones. Ver todos los datos de un dataframe con collect()
# La salida es una lista python, se puede limitar el número de registros utilizando la notacion de array 
irisDF.collect()[1:5]

In [25]:
# Acciones. Ver los datos parciales de un dataframe con take()
irisDF.take(5)

In [26]:
# Acciones. Ver los datos parciales de un dataframe con show()
irisDF.show(5)

In [27]:
# Acciones. Contar el número de registros de un dataframe
irisDF.count()

In [28]:
# Acciones. Obtener un resumen de los campos
irisDF.describe().show()

In [29]:
# Operaciones SQL sobre Dataframes
# Es necesario registrar una tabla temporal ligada al dataframe
irisDF.registerTempTable("iris_table")
sqlContext.sql("SHOW TABLES").show()
sqlContext.sql("SELECT * FROM iris_table where sepal_length > 5 limit 3").show()
sqlContext.sql("DROP TABLE iris_table")
sqlContext.sql("SHOW TABLES").show()


In [30]:
# Operaciones SQL group by
# Registramos la tabla temporal ligada al dataframe
irisDF.registerTempTable("irisTabla")
sqlContext.sql("SELECT SPECIE, COUNT(*) AS NUMSPECIES \
FROM IRISTABLA GROUP BY SPECIE ORDER BY SPECIE ASC").show()


In [31]:
sqlContext.tables().show()
# sqlContext.sql("Drop table iris_table")

In [32]:
# Vectores densos y spare
from pyspark.mllib.linalg import Vectors

vdense = Vectors.dense([0, 1.0, 0, 5.5])
print vdense

vsparse1 = Vectors.sparse(4, {1: 1.0, 3: 5.5})
print vsparse1

vsparse2 = Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
print vsparse2

vsparse3 = Vectors.sparse(4, [1, 3], [1.0, 5.5])
print vsparse3

In [33]:
# Labeled point
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
# Creacion a partir de un dense vector
l1 = LabeledPoint(1.0, [1.0, 0.0, 3.0])
print l1
# Creacion a partir de un sparse vector
l2 = LabeledPoint(1.0, Vectors.sparse(3, [0, 2], [1.0, 3.0]))
print l2

In [34]:
# Regresión lineal con ml - creacion sintética de datos
from pyspark.mllib.linalg import Vectors
from pyspark.ml.regression import LinearRegression
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85
## 6     2.883      55

#ignore = ['id', 'label', 'binomial_label']
#assembler = VectorAssembler(
#    inputCols=[x for x in df.columns if x not in ignore],
#    outputCol='features')

#assembler.transform(df)

#df = sqlContext.createDataFrame([(1.0, 1.0, Vectors.dense(1.0)),(0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])
df = sqlContext.createDataFrame([(3.6, Vectors.dense(79)),(1.8, Vectors.dense(54)),(3.333,Vectors.dense(74))], ["duration", "features"])

df.show()

lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", labelCol="duration")
print type(lr)

model = lr.fit(df)
print type(model)

print model.coefficients
print model.intercept
#print model.params


In [35]:
# Regresión lineal con ml a partir de un DataFrame
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# Dataset faithful
#        eruptions  waiting
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85

# Creamos el dataframe a partir de un fichero csv
faithfulDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/k250v29z1473806605274/faithful.csv')

# Creamos un VectorAssembler para tener todas las variables predictoras en un vector
vecAssembler = VectorAssembler(inputCols=["waiting"], outputCol="features")
vaDf = vecAssembler.transform(faithfulDfCsv)

vaDf.cache()
vaDf.show(3)

# Creamos el conjunto de entrenamiento y test
splits = vaDf.randomSplit([0.7, 0.3], 1234)
vaDfTraining = splits[0]
vaDfTest = splits[1]

# Definimos el algoritmo de regresión lineal, indicando la columna target=labelCol
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", labelCol="eruptions", featuresCol="features")
lr.setRegParam(0.0)
lr.setElasticNetParam(0.0)

# Obtenemos el modelo una vez aplicado el algoritmo al conjunto de entrenamiento
LRmodel = lr.fit(vaDfTraining)
print type(LRmodel)

# Mostramos los coeficientes y el intercept de la regresión lineal
print "Intercept: {0}".format(LRmodel.intercept)
print "Coeficientes: {0}".format(LRmodel.coefficients)

# Obtenemos el dataframe con las predicciones sobre el conjunto de test 
prediccion_faithful = LRmodel.transform(vaDfTest)
prediccion_faithful.cache()
prediccion_faithful.show(5)

# Aplicamos un evaluador para conocer el error en entrenamiento y test
evaluator = RegressionEvaluator(labelCol="eruptions")
print "RMSE en test: {0}".format(evaluator.evaluate(prediccion_faithful,{evaluator.metricName: "rmse"}))
print "RMSE en training: {0}".format( evaluator.evaluate(LRmodel.transform(vaDfTraining), {evaluator.metricName: "rmse"}))

In [36]:
# Regresión lineal con Validacion Cruzada
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
# Dataset faithful
#        eruptions  waiting
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85

# Creamos el dataframe a partir de un fichero csv
faithfulDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/k250v29z1473806605274/faithful.csv')

# Creamos un VectorAssembler para tener todas las variables predictoras en un vector
vecAssembler = VectorAssembler(inputCols=["waiting"], outputCol="features")
vaDf = vecAssembler.transform(faithfulDfCsv)

vaDf.cache()
vaDf.show(3)

# Creamos el conjunto de entrenamiento y test
splits = vaDf.randomSplit([0.7, 0.3], 1234)
vaDfTraining = splits[0]
vaDfTest = splits[1]

# Creacion del Modelo de LR: Ajuste con k-fold y grid de parámetros 
lr = LinearRegression(labelCol="eruptions")
lrgrid = ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.01, 0.05, 0.5]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
lrevaluator = RegressionEvaluator(labelCol="eruptions")
cv = CrossValidator(estimator=lr, estimatorParamMaps=lrgrid, evaluator=lrevaluator, numFolds=5)
cvModel = cv.fit(vaDf)

print type(cvModel)
print type(cvModel.bestModel)

# Mostramos los coeficientes y el intercept de la regresión lineal
print "Intercept: {0}".format(cvModel.bestModel.intercept)
print "Coeficientes: {0}".format(cvModel.bestModel.coefficients)

# Realizamos la prediccion sobre el conjunto de test
prediccionTestCsv = cvModel.transform(vaDfTest)

# Aplicamos el evaluador para conocer el error en el dataset de test
print "RMSE en test: {0}".format(lrevaluator.evaluate(prediccionTestCsv,{evaluator.metricName: "rmse"}))

In [37]:
# Regresión lineal con ml a partir de un DataFrame convencional
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
# Dataset faithful
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85

# Creamos el dataframe a partir de un fichero csv
faithfulDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/k250v29z1473806605274/faithful.csv')

faithfulDfCsv.cache()

faithfulDfCsv.show(3)

vecAssembler = VectorAssembler(inputCols=["waiting"], outputCol="features")
vaDf = vecAssembler.transform(faithfulDfCsv)

splits = vaDf.randomSplit([0.7, 0.3], 1234)
vaDfTraining = splits[0]
vaDfTest = splits[1]

# Ajuste tradicional training-test
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", labelCol="eruptions")
#print type(lr)

model = lr.fit(vaDfTraining)
print type(model)

print model.coefficients
print model.intercept

prediccion_faithful = model.transform(vaDfTest)
prediccion_faithful.cache()

prediccion_faithful.show(5)
evaluator = RegressionEvaluator(labelCol="eruptions")
print evaluator.evaluate(prediccion_faithful,{evaluator.metricName: "rmse"})
print evaluator.evaluate(model.transform(vaDfTraining), {evaluator.metricName: "rmse"})

# Ajuste con k-fold
lr = LinearRegression(labelCol="eruptions")
grid = ParamGridBuilder().addGrid(lr.maxIter, [1, 5]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5)
print type(cv)
cvModel = cv.fit(vaDf)

prediccion_test = cvModel.transform(vaDfTest)
# prediccion_test.show()
print evaluator.evaluate(prediccion_test, {evaluator.metricName: "rmse"})

print type(cvModel.bestModel)
print type(cvModel)
print cvModel.bestModel.coefficients
print cvModel.bestModel.intercept
#print cvModel.extractParamMap()





In [38]:
# Regresión lineal con ml
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85
## 6     2.883      55

# Creamos un DataFrame con algunos de los valores del dataset faithful
df = sqlContext.createDataFrame([(3.6,79),(1.8,54),(3.333,74),(2.283,62),(4.533,85)], ["duration", "eruptions"] )
df.show()

splits = df.randomSplit([0.7, 0.3], 24)
print(splits[0].count())
print(splits[1].count())
#dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
#sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
#sampled.groupBy("key").count().orderBy("key").show()
# sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
# Realizamos un assebler para unir todas las columnas predictoras del dataset en un vector de features
vecAssembler = VectorAssembler(inputCols=["eruptions"], outputCol="features")
va = vecAssembler.transform(df)
va.show()
# Definimos la función de linear regression a aplicar
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", labelCol="duration")
# Construimos el modelo aplicándolo sobre el dataset que va a ser nuestro conjunto de test
model = lr.fit(va)
# Mostramos los valores de los coeficientes y el punto de paso de la recta por el eje y
print model.coefficients
print model.intercept

# Predicción con regresión lineal sobre un conjunto de test y evaluación del modelo
from pyspark.ml.evaluation import RegressionEvaluator
test0 = sqlContext.createDataFrame([(2.283,Vectors.dense(62)), (1.8,Vectors.dense(54))], ["duration", "features"])
test0.show()
prediccion_test = model.transform(test0)
prediccion_test.show()
evaluator = RegressionEvaluator(labelCol="duration")
print(evaluator.evaluate(prediccion_test, {evaluator.metricName: "rmse"}))
prediccion_training = model.transform(va)
prediccion_training.show()
print(evaluator.evaluate(prediccion_training, {evaluator.metricName: "rmse"}))

In [39]:
# Regresión lineal con Cross Validation

from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85
## 6     2.883      55

# Creamos un DataFrame con algunos de los valores del dataset faithful
dataset = sqlContext.createDataFrame([(3.6,79),(1.8,54),(3.333,74),(2.283,62),(4.533,85)], ["label", "eruptions"] )
dataset.show()

# Realizamos un assebler para unir todas las columnas predictoras del dataset en un vector de features
vecAssembler = VectorAssembler(inputCols=["eruptions"], outputCol="features")
vacross = vecAssembler.transform(dataset)
vacross.show()

lr = LinearRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = RegressionEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
print type(cv)
cvModel = cv.fit(vacross)

testcross = sqlContext.createDataFrame([(2.283,Vectors.dense(62)), (1.8,Vectors.dense(54))], ["label", "features"])
testcross.show()
prediccion_test = cvModel.bestModel.transform(testcross)
prediccion_test.show()
evaluator.evaluate(cvModel.transform(testcross), {evaluator.metricName: "rmse"})

print type(cvModel.bestModel)
print type(cvModel)
print cvModel.bestModel.coefficients
print cvModel.bestModel.intercept



In [40]:
# Regresion logistica
from pyspark.mllib.linalg import Vectors
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Nos hemos de asegurar que el target sea DoubleType
# Definicion del esquema
admisionSchema = StructType([ \
    StructField("admit", BooleanType(), True), \
    StructField("gre", DoubleType(), True), \
    StructField("gpa", DoubleType(), True), \
    StructField("rank", StringType(), True)])
admisionDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/z1wr0h6j1473892626476/election.csv', schema=eleccionSchema)

admisionDfCsv.printSchema()
# Transformamos la variable rank de tipo String en una lista de valores numéricos
indexer = StringIndexer(inputCol="rank", outputCol="ranknum")
modelInd = indexer.fit(admisionDfCsv)
admisionDfInd = modelInd.transform(admisionDfCsv)

# Transformamos la columna con la variable categórica ranknum en variables dummies
encoder = OneHotEncoder(inputCol="ranknum", outputCol="dummiesRank", dropLast=False)
admisionDfdumm = encoder.transform(eleccionDfInd)

# Realizamos un assebler para unir todas las columnas predictoras en un vector de features
vecAssembler = VectorAssembler(inputCols=["gre","gpa","dummiesRank"], outputCol="features")
admisionAssemblerDf = vecAssembler.transform(admisionDfdumm)
# admisionAssemblerDf.show(3)

# Obtenemos los DataFrames de entrenamiento y test
splitado = admisionAssemblerDf.randomSplit([0.7, 0.3], 124)
admisionTrainingDf = splitado[0]
admisionTestDf = splitado[1]

# Definimos el modelo de Regresion Logistica y lo aplicamos al DataFrame de training
lr = LogisticRegression(maxIter=5, regParam=0.0, labelCol="admit")
LrModel = lr.fit(admisionTrainingDf)
# Mostramos los coeficientes y el intercept de la regresión lineal
print "Intercept: {0}".format(LrModel.intercept)
print "Coeficientes: {0}".format(LrModel.coefficients)

# Aplicamos el modelo al conjunto de Test
prediccionTestDf = LrModel.transform(admisionTestDf)
prediccionTestDf.show(3)

# Evaluamos el resultado mostrando el area bajo la  ROC
evaluator = BinaryClassificationEvaluator(labelCol="admit")
print "Area bajo la curva ROC: {0}".format(evaluator.evaluate(prediccionTestDf, \
                                                    {evaluator.metricName: "areaUnderROC"}))


In [41]:
# Regresion logistica con variables dummy
encoder = OneHotEncoder(inputCol="indexed", outputCol="features")

In [42]:
# Clasificador multiclase con RandomForest
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
irisDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/fjobo1ml1473803578241/iris_with_header-f677d.csv')

# Transformamos la etiqueta (specie) de String a Double en varios niveles
indexer = StringIndexer(inputCol="specie", outputCol="specieNum")
modelInd = indexer.fit(irisDfCsv)
irisDfInd = modelInd.transform(irisDfCsv)

# Realizamos un assebler para unir todas las columnas predictoras del dataset en un vector de features
vecAssembler = VectorAssembler(inputCols=["sepal_length","sepal_width","petal_length", "petal_width"], \
                               outputCol="features")
irisDfAssembled = vecAssembler.transform(irisDfInd)
# irisDfAssembled.show(3)

# Creamos los DataFrame de entrenamiento y test
splitado = irisDfAssembled.randomSplit([0.7, 0.3], 124)
irisDfTraining = splitado[0]
irisDfTest = splitado[1]

# Definimos el algoritmo de RandomForest y creamos el modelo 
rf = RandomForestClassifier(numTrees=20, maxDepth=2, labelCol="specieNum", seed=42)
rfModel = rf.fit(irisDfTraining)

# Aplicamos el modelo sobre el conjunto de test para obtener las prediciones 
prediccionDfTest = rfModel.transform(irisDfTest)
prediccionDfTest.show(5)

# Aplicamos el evaluador multiclase par ver la eficiencia del modelo
evaluator = MulticlassClassificationEvaluator(labelCol="specieNum")
print 'Precision: {0}, Recall: {1}, F1: {2}'.format(evaluator.evaluate(prediccionDfTest, {evaluator.metricName: "precision"}), \
                                           evaluator.evaluate(prediccionDfTest, {evaluator.metricName: "recall"}), \
                                           evaluator.evaluate(prediccionDfTest, {evaluator.metricName: "f1"}))

In [43]:
# Pipe: Clasificador multiclase con RandomForest
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
irisDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/howzjb5q1474385450651/iris_with_header-f677d.csv')

# Creamos los DataFrame de entrenamiento y test
splitado = irisDfCsv.randomSplit([0.7, 0.3], 124)
irisDfTraining = splitado[0]
irisDfTest = splitado[1]

# Transformamos la etiqueta (specie) de String a Double en varios niveles
indexer = StringIndexer(inputCol="specie", outputCol="specieNum")
# Realizamos un assebler para unir todas las columnas predictoras del dataset en un vector de features
vecAssembler = VectorAssembler(inputCols=["sepal_length","sepal_width","petal_length", "petal_width"], \
                               outputCol="features")
# Definimos el algoritmo de RandomForest y creamos el modelo 
rf = RandomForestClassifier(numTrees=20, maxDepth=2, labelCol="specieNum", seed=42)
# Definimos el pipeline con las operaciones en orden
pipeline = Pipeline(stages=[indexer, vecAssembler, rf])

# Creamos el modelo aplicando el pipeline al conjunto de entrenamiento
pipeModel = pipeline.fit(irisDfTraining)

# Aplicamos el modelo sobre el conjunto de test para obtener las prediciones 
prediccionDfTest = pipeModel.transform(irisDfTest)
prediccionDfTest.show(5)

In [44]:
# Clustering con K-means
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
irisDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/howzjb5q1474385450651/iris_with_header-f677d.csv')

# Realizamos un assebler para unir todas las columnas predictoras del dataset en un vector de features
vecAssembler = VectorAssembler(inputCols=["sepal_length","sepal_width","petal_length", "petal_width"], outputCol="features")
irisAssembled = vecAssembler.transform(irisDfCsv)

# Aplicamos el algoritmo K-means con un número de centroides de 3
kmeans = KMeans(k=3, featuresCol="features", predictionCol="prediccion", seed=1234)
model = kmeans.fit(irisAssembled)

# Mostramos los valores para los centroides
centers = model.clusterCenters()
for i in range(3):
  print "Centroide del cluster {0}: {1}".format(i, centers[i])

# Vemos el resultado del clustering
# Los identificadores de los grupos (prediccion) es un entero comenzando por el 0
transformed = model.transform(irisAssembled)
transformed.show(5)

# Vemos el número de observaciones para cada cluster
irisDFGroup = transformed.groupBy(transformed.prediccion)
irisDFGroup.agg({"*":"count"}).show()


In [45]:
# Transformadores. VectorAssembler
from pyspark.ml.feature import VectorAssembler
irisDfCsv = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true') \
.load('/FileStore/tables/howzjb5q1474385450651/iris_with_header-f677d.csv')

# Mostramos el DataFrame original
irisDfCsv.show(2)
# Realizamos un assebler para unir todas las columnas predictoras del dataset en un vector de features
vecAssembler = VectorAssembler(inputCols=["sepal_length","sepal_width","petal_length", "petal_width"], \
                               outputCol="features")
irisAssembled = vecAssembler.transform(irisDfCsv)
irisAssembled.show(2)


In [46]:
# Assemblers
from pyspark.ml.feature import VectorAssembler
ignore = ['specie']
assembler = VectorAssembler(
    inputCols=['petal_length', 'petal_width', 'sepal_length', 'sepal_width'],
    outputCol='features')

dv = assembler.transform(irisDF)
dv.select(dv.features).show(3)
print type(dv.features)

df = sqlContext.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
#vecAssembler.transform(df).head().features
va = vecAssembler.transform(df)
print type(va.features)


In [47]:
# Ejemplo randomForest con libreria mllib

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest

# Creamos una función para pasar las etiquetas a numérico que aplicaremos en el map()
def getLabelNum(label):
  if label == 'Iris-virginica':
    return 0.0
  elif label == 'Iris-setosa':
    return 1.0
  else:
    return 2.0

lineasRDD = sc.textFile('/FileStore/tables/vvlm20gs1474462720233/iris.csv')
# Convertimos cada línea en una fila del futuro dataframe
elementosRDD = lineasRDD.map(lambda l: l.split(","))
# Metemos cada uno de los campos separados por coma en un punto etiquetado
irisRDD = elementosRDD.map(lambda p: LabeledPoint(getLabelNum(p[4]), [p[0],p[1],p[2],p[3]]))
print irisRDD.take(3)
# Creamos el modelo randomForest
model = RandomForest.trainClassifier(irisRDD, numClasses=3, categoricalFeaturesInfo={}, \
                                     numTrees=1, maxDepth = 2, seed=42)
print "Numero de nodos de los arboles {0}".format(model.totalNumNodes())
print(model.toDebugString())
# Creamos un RDD sobre el que realizar predicciones
testRDD = sc.parallelize([[0.2,3.1,1.2,2.0], [1.4,2.8,3.0,4.0]])
print "Prediccion: {0}".format(model.predict(testRDD).collect())

In [48]:
from pyspark.sql import Row
row = Row(name="Alice", age=11)
print type(row)
print row
print row['name']
print row.age
print type(row.name)
print type(row.age)
print "Spark version " + sc.version

In [49]:
data = [('Alice',11), ('Robert', 14)]
print data
df = sqlContext.createDataFrame(data,['name','age'])
df.take(2)
df.show()

In [50]:
#dbutils.fs.ls("file:/home")
#dbutils.fs.ls("dbfs:/databricks-datasets/samples")
dbutils.fs.ls("dbfs:/FileStore/tables")
#dbutils.fs.rm("dbfs:/FileStore/tables/hev5dygo1473887816783")

In [51]:
dbutils.fs.ls("dbfs:/FileStore/tables/ofv0n3w01474198366371")

In [52]:
dbutils.fs.ls("dbfs:/FileStore/tables/test")

In [53]:
dbutils.fs.rm("dbfs:/FileStore/tables/fjobo1ml1473803578241/iris_with_header-f677d.csv")