In [3]:
#pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
    
# al igual que en el ejemplo anterior, creamos un dataframe a través
# de una lista con los datos de entrenamiento, la lista esta formada
# por tuplas (id, texto, label). Esta forma no nos servirá para poder meterla
# en los objetos de ML, pero más adelante arreglaremos esto

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark = SparkSession.builder \
            .appName("my_app") \
            .config('spark.sql.codegen.wholeStage', False) \
            .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "mykey")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "mysecret")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "eu-west-3.amazonaws.com")

# IMAGINAMOS QUE LEEMOS DATOS DEL S3


# FEATURE ENGINEERING (Transformacion de datos)

# MODELLING



training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])


# Definimos los transformers: Tokenizer y HashingTF, y los 
# estimators: LogisticRegression que ocuparemos. Nota que aquí no hemos hecho
# ningun fit todavia... la magia vendrá más adelante ;)
# Tokenizer convierte el string de entrada (inputCol) a minúsculas y separa en
# palabras utilizando como separador el espacio
tokenizer = Tokenizer(inputCol="text", outputCol="words")
# HashingTF permite hashear cada palabra utilizando MurmurHash3 convirtiendo
# el hash generado en el índice a poner en el "TDM". Este método optimiza el
# tiempo para generar el TDM de TF-IDF "normal". Para evitar colisiones en
# la conversión a hash se aumenta el número de buckets -se recomienda ocupar
# potencias de 2 para balancear las cubetas-
# Nota que en este transformer estamos ocupando como entrada la salida del
# transformer Tokenizer
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
# Ocuparemos una regresión logistica de nuex
lr = LogisticRegression(maxIter=10, regParam=0.001)
# Aqui viene lo bonito... definimos un pipeline que tiene como etapas/pasos
# primero el tokenizer, luego el hashing y luego la regresión logística. Aquí
# estamos definiendo el flujo de procesamiento, el DAG! 
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


# Voila, solo se requiere de hacer fit al pipeline para que esto funcione
# como un pipeline, siguiendo el orden de los pasos establecidos en la 
# definicion del pipeline :) ... recuerda que el fit hace 
# el entrenamiento una vez que ya definimos las configuraciones de 
# los objetos que ocuparemos (transformers y estimators)
model = pipeline.fit(training)

# Creamos el dataframe de pruebas mock! -> Nota que aqui no hay 
# label!!!! (asi funcionaría en producción cierto!)
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])


# Lixto, "ejecutamos" el pipeline haciendo un transform al pipeline para 
# obtener las predicciones del set de pruebas
prediction = model.transform(test)

# De nuevo, prediction es un DataFrame generado con un transformer generado
# a través de estimadores y transformers :) 
# Seleccionamos las columnas que queremos ver 
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("({}, {}) --> prob={}, prediction={}".format( \
    rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.0
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.0
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.0
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.0


In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(), #solo auc o areaunderPR:(
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)

Row(id=4, text='spark i j k', probability=DenseVector([0.1257, 0.8743]), prediction=1.0)
Row(id=5, text='l m n', probability=DenseVector([0.9952, 0.0048]), prediction=0.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.307, 0.693]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.804, 0.196]), prediction=0.0)


# Utils

## One hot encoding

In [None]:

from pyspark.ml.feature import OneHotEncoder, StringIndexer

# creamos nuestro set de datos de entrada categorico
df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

# Esta funcion agrega un id numerico a cada valor diferente de un valor categorico 
# es como establecer los niveles en R de una factor pero los niveles son numericos,
# sus id. El indice se establece por orden de frecuencia (descendente), por lo que 
# el indice 0 corresponde a la variable que aparece con mas frecuencia
string_indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = string_indexer.fit(df)
indexed = model.transform(df)
indexed.show()


# OneHotEncoder no tiene un fit ya que solo es un transformador
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

## Standard scaler

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler

# Configuramos el estimator StandarScaler como lo necesitamos (por default
# withMean esta en False porque hace que se regrese un vector dense...
# hay que tener cuidado con eso cuando estemos manejandoo vectores sparse
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=True)
# Creamos el modelo StandardScaler para los datos de entrada
scaler_model = scaler.fit(data_frame)

# Transformamos los datos 
scaled_data = scaler_model.transform(data_frame)
scaled_data.show(truncate=False)

## MinMax

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

data_frame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
data_frame.show()

# Configuramos el estimator MinMaxScaler como lo necesitamos
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Creamos el modelo MinMaxScaler (transformer)
scaler_model = scaler.fit(data_frame)

# Transformamos los datos reescalando 
scaled_data = scaler_model.transform(data_frame)
# Nota que cuando pedimos getMin y getMax lo hacemos al estimator, no al modelo
print("Features scaled to range: [{}, {}]".format(scaler.getMin(), scaler.getMax()))
scaled_data.select("features", "scaled_features").show(truncate=False)

## Pipelines

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")


hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

prediction = model.transform(test)


selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("({}, {}) --> prob={}, prediction={}".format( \
    rid, text, str(prob), prediction))


## Gridsearch

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)

AttributeError: 'Pipeline' object has no attribute 'HashingTF'

## Logistic Regression

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

lr = LogisticRegression(maxIter=10, regParam=0.01)
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

model_1 = lr.fit(training)
print("Model 1 was fit using parameters: ")
print(lr.extractParamMap())


param_map = {lr.maxIter: 20}
# Si el valor ya existe en el diccionario puedes actualizarlo
param_map[lr.maxIter] = 30  
# También puedes actualizar varios parámetros del diccionario al mismo tiempo
param_map.update({lr.regParam: 0.1, lr.threshold: 0.55}) 


param_map_2 = {lr.probabilityCol: "my_probability"}  
param_map_combined = param_map.copy()
param_map_combined.update(param_map_2)

model_2 = lr.fit(training, param_map_combined)
print("Model 2 was fit using parameters: ")
#aquí queremos ver cono que parametros se quedo configurado el modelo que ocupamos para entrenar
print(lr.extractParamMap())


#aqui queremos ver con qué parámetros se quedó configurado el modelo que ocupamos para entrenar
print(lr.extractParamMap())

test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

prediction = model_2.transform(test)

type(prediction)
prediction.columns

result = prediction.select("features", "label", "my_probability", "prediction") \
    .collect()

for row in result:
    print("features={}, label={} -> prob={}, prediction={}".format( \
    row.features, row.label, row.my_probability, row.prediction))

## IDF

In [None]:

from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer

# Creamos nuestro set de entrada para formar la TDM
sentence_data = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

# Ocupamos el transformer Tokenizer para separar por palabras
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# Aqui no hay train! porque no estamos entrenando nanda... estamos en un problema
# de IR. Tokenizer no tiene un metodo fit -no hay entrenamiento-
words_data = tokenizer.transform(sentence_data)

# Ocupamos el transformer CountVectorizer para generar una matriz de 
# terminos y sus frecuencias 
count_vectorizer = CountVectorizer(inputCol="words", outputCol="raw_features")
featurized_model = count_vectorizer.fit(words_data)
featurized_data = featurized_model.transform(words_data)
featurized_data.show(truncate=False)

# Ocupamos IDF para obtener el IDF de la coleccion de documentos mock que 
# generamos. IDF si tiene un metodo fit a traves del cual le enviamos el set 
# de tokens al que queremos obtener el IDF
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=1)
# Aqui obtenemos el modelo a ocupar (transformer) a ocupar 
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

rescaled_data.select("label", "features").show(truncate=False)

## SQL

In [None]:
airlines_parquet = spark.read.parquet("s3://dpa/flights/airlines/parquet/")
airlines_parquet.show()
airlines_parquet.printSchema()
airlines_parquet.count()

airlines_csv.describe().show()

# withColumnRenamed

original = flights.columns
new_col_names = [element.lower() for element in flights.columns]

df = reduce(lambda flights, idx: flights.withColumnRenamed(original[idx], new_col_names[idx]), range(len(original)), flights)
df.show()

df.createOrReplaceTempView("vuelos")
spark.sql("select count(*) from vuelos").show()
sql_query = """
select year, month, day, airline, origin_airport, destination_airport, departure_delay
from vuelos
where departure_delay > 0 
and departure_delay < 100
""" 

departure_delays = spark.sql(sql_query)

flights_csv.describe(["YEAR","MONTH","DAY"]).show()