# Creación de Nuevas Variables - Feature Extraction

In [1]:
from pyspark import SparkContext
sc = SparkContext()
from pyspark.sql import SQLContext
sqlContext=SQLContext(sc)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/22 14:47:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/22 14:47:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
bd5 = sqlContext.read.format(
    "com.databricks.spark.csv"
).option("header", "true").load("../bd5.csv", inferSchema=True)
sqlContext.registerDataFrameAsTable(bd5, "bd5")

                                                                                

## Variables Dummy

In [3]:
bd5 = bd5.withColumn('Horario1',(bd5.Horario==1) 
).withColumn('Horario2',(bd5.Horario==2) 
).withColumn('Horario3',(bd5.Horario==3))

## Variables Discretizadas Binarias

In [4]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer(threshold=15.0, inputCol='DepDelay', outputCol='SalidaBin')
binarizer.transform(bd5).head()


22/02/22 14:48:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Row(Year=2016, Month=12, DayofMonth=1, DayOfWeek=4, CRSDepTime=845, UniqueCarrier='AA', TailNum='N8ARAA', ArrDelay=-7.0, DepDelay=-5.0, Origin='LAX', Dest='DFW', Distance=1235.0, Cancelled=0.0, Diverted=0.0, CarrierDelay=0.0, WeatherDelay=0.0, NASDelay=0.0, SecurityDelay=0.0, LateAircraftDelay=0.0, LogD=3.0916669575956846, Retraso=0, RetrasoNeto=-2.0, Horario=2, Horario1=False, Horario2=True, Horario3=False, SalidaBin=0.0)

In [5]:
binarizer.transform(bd5).select('DepDelay','SalidaBin').show()

+--------+---------+
|DepDelay|SalidaBin|
+--------+---------+
|    -5.0|      0.0|
|     5.0|      0.0|
|    -3.0|      0.0|
|    -7.0|      0.0|
|    -6.0|      0.0|
|    -1.0|      0.0|
|     0.0|      0.0|
|     0.0|      0.0|
|    -1.0|      0.0|
|    -1.0|      0.0|
|     1.0|      0.0|
|    -2.0|      0.0|
|    -4.0|      0.0|
|    -1.0|      0.0|
|     0.0|      0.0|
|     0.0|      0.0|
|    13.0|      0.0|
|    17.0|      1.0|
|    12.0|      0.0|
|    19.0|      1.0|
+--------+---------+
only showing top 20 rows



## Variables Discretizadas en Buckets

In [6]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[-float("inf"), 0.0, 15.0, float("inf")],
                        inputCol='DepDelay', outputCol='SalidaCat')
bucketizer.transform(bd5).select('DepDelay','SalidaCat').show()


+--------+---------+
|DepDelay|SalidaCat|
+--------+---------+
|    -5.0|      0.0|
|     5.0|      1.0|
|    -3.0|      0.0|
|    -7.0|      0.0|
|    -6.0|      0.0|
|    -1.0|      0.0|
|     0.0|      1.0|
|     0.0|      1.0|
|    -1.0|      0.0|
|    -1.0|      0.0|
|     1.0|      1.0|
|    -2.0|      0.0|
|    -4.0|      0.0|
|    -1.0|      0.0|
|     0.0|      1.0|
|     0.0|      1.0|
|    13.0|      1.0|
|    17.0|      2.0|
|    12.0|      1.0|
|    19.0|      2.0|
+--------+---------+
only showing top 20 rows



Versiones más nuevas de Pyspark incluyen otras transformaciones, por ejemplo QuantileDiscretizer

## Expansión polinómica de Variables 
(términos cuadráticos, productos, etc.) 

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PolynomialExpansion

assembler = VectorAssembler(
    inputCols=['DepDelay','Distance'],
    outputCol='features')

px = PolynomialExpansion(
    degree=2, 
    inputCol="features", 
    outputCol="Polyn")

bd6 = px.transform(assembler.transform(bd5))

bd6.select('DepDelay','Distance','Polyn').head(5)

[Row(DepDelay=-5.0, Distance=1235.0, Polyn=DenseVector([-5.0, 25.0, 1235.0, -6175.0, 1525225.0])),
 Row(DepDelay=5.0, Distance=1235.0, Polyn=DenseVector([5.0, 25.0, 1235.0, 6175.0, 1525225.0])),
 Row(DepDelay=-3.0, Distance=1235.0, Polyn=DenseVector([-3.0, 9.0, 1235.0, -3705.0, 1525225.0])),
 Row(DepDelay=-7.0, Distance=1235.0, Polyn=DenseVector([-7.0, 49.0, 1235.0, -8645.0, 1525225.0])),
 Row(DepDelay=-6.0, Distance=1235.0, Polyn=DenseVector([-6.0, 36.0, 1235.0, -7410.0, 1525225.0]))]

In [17]:
bd6.count()

30466

## Estandarización de las variables

In [8]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="stdfeatures",
                        withStd=True, withMean=True)
scalerModel = scaler.fit(bd6)
bd6std = scalerModel.transform(bd6)

bd6std.select('features','stdfeatures').show()

+-------------+--------------------+
|     features|         stdfeatures|
+-------------+--------------------+
|[-5.0,1235.0]|[-0.4459454808573...|
| [5.0,1235.0]|[-0.2452533483159...|
|[-3.0,1235.0]|[-0.4058070543490...|
|[-7.0,1235.0]|[-0.4860839073656...|
|[-6.0,1235.0]|[-0.4660146941114...|
|[-1.0,1235.0]|[-0.3656686278407...|
| [0.0,1235.0]|[-0.3455994145866...|
| [0.0,1235.0]|[-0.3455994145866...|
|[-1.0,1235.0]|[-0.3656686278407...|
|[-1.0,1235.0]|[-0.3656686278407...|
| [1.0,1235.0]|[-0.3255302013325...|
|[-2.0,1235.0]|[-0.3857378410949...|
|[-4.0,1235.0]|[-0.4258762676032...|
|[-1.0,1235.0]|[-0.3656686278407...|
| [0.0,1235.0]|[-0.3455994145866...|
| [0.0,1235.0]|[-0.3455994145866...|
|[13.0,1235.0]|[-0.0846996422828...|
|[17.0,1235.0]|[-0.0044227892663...|
|[12.0,1235.0]|[-0.1047688555370...|
|[19.0,1235.0]|[0.03571563724193...|
+-------------+--------------------+
only showing top 20 rows



## Tranformación manual

In [None]:
bd7 = bd6.withColumn('DepDelay2',(bd6.DepDelay**2)
).withColumn('DepD_Distance',(bd6.DepDelay * bd6.Distance)) 