In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import (StructType, StructField, StringType,
                               DoubleType, IntegerType, LongType)
from pyspark.sql.functions import *
from pyspark import SparkConf
from pyspark import SparkContext
import multiprocessing
from pyspark.ml import Pipeline
import sys

print('Inicio del Script')

# Configuracion de memoria y cores
cores = multiprocessing.cpu_count()
p = 3
particiones = cores * p
conf = SparkConf()
conf.set("spark.sql.shuffle.partitions", particiones)
conf.set("spark.default.parallelism", particiones)
sc = SparkContext(conf=conf)


Inicio del Script


In [2]:
spark = SparkSession.builder.appName("Microsoft_Kaggle").getOrCreate()

# Read data
print('Lectura del DF crudo')
data = spark.read.csv('../data/df_cat/*.csv', header=True, inferSchema=True)\
.select('MachineIdentifier', 'ProductName', 'Census_PrimaryDiskTypeName', 'Census_PowerPlatformRoleName', 'Census_OSArchitecture',
                    'Census_ProcessorClass', 'Census_OSInstallTypeName', 'Census_OSWUAutoUpdateOptionsName',
                    'Census_GenuineStateName', 'Platform', 'Processor', 'OsPlatformSubRelease', 'SkuEdition', 'PuaMode',
                    'Census_DeviceFamily', 'Census_OSVersion', 'Census_OSBranch', 'EngineVersion', 'AppVersion',
'AvSigVersion', 'OsBuildLab', 'OsVer')
# Persistimos el DF para mejorar el rendimiento
data.persist()
print('Numero de casos totales = {}'.format(data.count()))

Lectura del DF crudo
Numero de casos totales = 16774736


In [3]:
frequency_census = data.groupBy('Census_ChassisTypeName').count().withColumnRenamed('count','Census_ChassisTypeName_freq')


In [4]:
cols = data.columns
cols.remove('MachineIdentifier')

In [5]:
for c in cols:
    print(data.select(c).distinct().show())

+-------------+
|  ProductName|
+-------------+
|windowsintune|
|          fep|
|         scep|
|          mse|
|mseprerelease|
| win8defender|
+-------------+

None
+--------------------------+
|Census_PrimaryDiskTypeName|
+--------------------------+
|                       SSD|
|                       HDD|
|               Unspecified|
|                   UNKNOWN|
|                      null|
+--------------------------+

None
+----------------------------+
|Census_PowerPlatformRoleName|
+----------------------------+
|                  SOHOServer|
|            EnterpriseServer|
|                 AppliancePC|
|                 Unspecified|
|                     UNKNOWN|
|                     Desktop|
|           PerformanceServer|
|                 Workstation|
|                        null|
|                      Mobile|
|                       Slate|
+----------------------------+

None
+---------------------+
|Census_OSArchitecture|
+---------------------+
|                  x86|


In [9]:
a = data.withColumn('Census_PrimaryDiskTypeName',
                    when((col('Census_PrimaryDiskTypeName').isNull()) |\
                         (col('Census_PrimaryDiskTypeName') == 'Unspecified'), 'UNKNOWN')\
                   .otherwise(col('Census_PrimaryDiskTypeName')))

In [10]:
a.select('Census_PrimaryDiskTypeName').distinct().show()

+--------------------------+
|Census_PrimaryDiskTypeName|
+--------------------------+
|                       SSD|
|                       HDD|
|                   UNKNOWN|
+--------------------------+



In [11]:
b = data.withColumn('Census_PowerPlatformRoleName',
                    when((col('Census_PowerPlatformRoleName').isNull()) |\
                         (col('Census_PowerPlatformRoleName') == 'Unspecified'), 'UNKNOWN')\
                   .otherwise(col('Census_PowerPlatformRoleName')))

In [12]:
b.select('Census_PowerPlatformRoleName').distinct().show()

+----------------------------+
|Census_PowerPlatformRoleName|
+----------------------------+
|                  SOHOServer|
|            EnterpriseServer|
|                 AppliancePC|
|                     UNKNOWN|
|                     Desktop|
|           PerformanceServer|
|                 Workstation|
|                      Mobile|
|                       Slate|
+----------------------------+



In [13]:
c = data.withColumn('Census_ProcessorClass',
                    when(col('Census_ProcessorClass').isNull(), 'UNKNOWN')\
                   .otherwise(col('Census_ProcessorClass')))

In [15]:
c.select('Census_ProcessorClass').distinct().show()

+---------------------+
|Census_ProcessorClass|
+---------------------+
|                  low|
|              UNKNOWN|
|                 high|
|                  mid|
+---------------------+



In [16]:
d = data.withColumn('Census_GenuineStateName',
                    when(col('Census_GenuineStateName').isNull(), 'UNKNOWN')\
                   .otherwise(col('Census_GenuineStateName')))

In [18]:
d.select('Census_GenuineStateName').distinct().show()

+-----------------------+
|Census_GenuineStateName|
+-----------------------+
|             IS_GENUINE|
|                UNKNOWN|
|               TAMPERED|
|        INVALID_LICENSE|
|                OFFLINE|
+-----------------------+



In [19]:
e = data.withColumn('PuaMode',
                    when(col('PuaMode').isNull(), 'UNKNOWN')\
                   .otherwise(col('PuaMode')))

In [20]:
e.select('PuaMode').distinct().show()

+-------+
|PuaMode|
+-------+
|     on|
|UNKNOWN|
|  audit|
+-------+

