Imprimo la ubicacion de la variable de entorno JAVA_HOME

Debe ser JAVA 64bits y la ruta no debe tener espacios ( Windows )

In [1]:
echo %JAVA_HOME%

C:\Progra~1\Java\jre1.8.0_271


Se configura pyspark para utilizar 20GB de RAM

In [2]:
import os
import pyspark

memory = '20g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

sc = pyspark.SparkContext('local[*]')

In [3]:
from pyspark.sql.types import StructType, DateType, IntegerType, StringType, DoubleType, BooleanType
from pyspark.sql.functions import datediff

In [4]:
sqlContext = pyspark.SQLContext(sc)

Date – The date of the file in yyyy-mm-dd format.

Serial Number – The manufacturer-assigned serial number of the drive.

Model – The manufacturer-assigned model number of the drive.

Capacity – The drive capacity in bytes.

Failure – Contains a “0” if the drive is OK. Contains a “1” if this is the last day the drive was operational before failing.


In [5]:
schema = StructType() \
      .add("Date", DateType(),True) \
      .add("Serial_Number",StringType(),True) \
      .add("Model",StringType(),True) \
      .add("capacity_bytes",DoubleType(),True) \
      .add("Failure",IntegerType(),True)

In [6]:
dia = sqlContext.read.options(header='True', delimiter=',') \
        .schema(schema) \
        .csv("../bigdata/subset/**")

In [7]:
dia.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Serial_Number: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- capacity_bytes: double (nullable = true)
 |-- Failure: integer (nullable = true)



In [8]:
dia.registerTempTable("diskData")

In [9]:
sqlContext.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        | diskdata|       true|
+--------+---------+-----------+



Total de discos Discos del Datacenter con su fecha de Puesta en funcionamiento y dia de retiro

In [10]:
df_AllDisksDayInDayOut = sqlContext.sql('Select Serial_Number, Model, Min(Date) as DayIn, Max(Date) as DayOut, Count(1) as Count from diskData group by Serial_Number, Model order by Model')

In [11]:
df_AllDisksDayInDayOut.createOrReplaceTempView('tbl_AllDisksDayInDayOut')

Se agrega nueva tabla tbl_AllDisksDayInDayOut

In [12]:
sqlContext.sql('show tables').show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|        |            diskdata|       true|
|        |tbl_alldisksdayin...|       true|
+--------+--------------------+-----------+



Esquema de tabla tbl_AllDisksDayInDayOut

In [13]:
df_AllDisksDayInDayOut.printSchema()

root
 |-- Serial_Number: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- DayIn: date (nullable = true)
 |-- DayOut: date (nullable = true)
 |-- Count: long (nullable = false)



Cantidad de discos por modelo que existen.

In [14]:
sqlContext.sql('Select Model, Count(1) as Count from tbl_AllDisksDayInDayOut group by Model order by Count desc').show()

+--------------------+-----+
|               Model|Count|
+--------------------+-----+
|       ST12000NM0007|37004|
|         ST4000DM000|19211|
|        ST8000NM0055|14452|
|HGST HMS5C4040BLE640|12746|
|HGST HUH721212ALN604|10859|
|         ST8000DM002| 9809|
|       ST12000NM0008| 7217|
| TOSHIBA MG07ACA14TA| 3620|
|HGST HMS5C4040ALE640| 2829|
|HGST HUH721212ALE600| 1560|
|       ST10000NM0086| 1200|
|HGST HUH728080ALE600| 1000|
|         ST6000DX000|  886|
|       ST500LM012 HN|  488|
|  TOSHIBA MQ01ABF050|  451|
| TOSHIBA MQ01ABF050M|  415|
|          ST500LM030|  253|
|      WDC WD5000LPVX|  208|
|Seagate BarraCuda...|  159|
|         Seagate SSD|  107|
+--------------------+-----+
only showing top 20 rows



In [15]:
df_AllDisksDayInDayOut.write.mode('overwrite').parquet("./output/result5.parquet")

Listar la cantidad promedio de días que un dura un disco duro en operacion según su modelo.

In [18]:
dfread = sqlContext.read.load("./output/result5.parquet")

In [22]:
dfread.printSchema()

root
 |-- Serial_Number: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- DayIn: date (nullable = true)
 |-- DayOut: date (nullable = true)
 |-- Count: long (nullable = true)



In [24]:
dfread.registerTempTable("newDiskData")

In [27]:
sqlContext.sql('Select CAST(AVG(DATEDIFF(DayOut, DayIn)) AS DECIMAL(10,0)) as Days, Model from newDiskData Group by Model order by Days desc').show()

+----+--------------------+
|Days|               Model|
+----+--------------------+
|   2|        ST8000NM0055|
|   2|         ST4000DM000|
|   2| TOSHIBA MQ01ABF050M|
|   2|       ST12000NM0007|
|   2|         ST8000DM005|
|   2| TOSHIBA MG07ACA14TA|
|   2|        WDC WD60EFRX|
|   2|         ST8000DM002|
|   2|Seagate BarraCuda...|
|   2|         ST4000DM005|
|   2|         DELLBOSS VD|
|   2|HGST HUS726040ALE610|
|   2|     TOSHIBA HDWF180|
|   2|HGST HMS5C4040ALE640|
|   2|HGST HUH721010ALE600|
|   2|         Seagate SSD|
|   2| TOSHIBA MD04ABA400V|
|   2|       ST10000NM0086|
|   2|          ST500LM021|
|   2|Seagate BarraCuda...|
+----+--------------------+
only showing top 20 rows



In [25]:
sqlContext.sql('Select CAST(AVG(DATEDIFF(DayOut, DayIn)) AS DECIMAL(10,2)) as Days, Model from tbl_AllDisksDayInDayOut Group by Model order by Days desc').show()

+----+--------------------+
|Days|               Model|
+----+--------------------+
|2.00|        ST8000NM0055|
|2.00| TOSHIBA MQ01ABF050M|
|2.00|         ST4000DM000|
|2.00|       ST12000NM0007|
|2.00|         ST8000DM005|
|2.00|         ST8000DM002|
|2.00| TOSHIBA MG07ACA14TA|
|2.00|        WDC WD60EFRX|
|2.00|         DELLBOSS VD|
|2.00|Seagate BarraCuda...|
|2.00|         ST4000DM005|
|2.00|     TOSHIBA HDWF180|
|2.00|HGST HUS726040ALE610|
|2.00| TOSHIBA MD04ABA400V|
|2.00|       ST10000NM0086|
|2.00|HGST HMS5C4040ALE640|
|2.00|HGST HUH721010ALE600|
|2.00|         Seagate SSD|
|2.00|  TOSHIBA MQ01ABF050|
|2.00|          ST500LM021|
+----+--------------------+
only showing top 20 rows



In [None]:
sqlContext.sql('Select Count(1) as Cuenta, Model from diskData group by Model, Serial_Number order by Cuenta desc').show()

In [None]:
sqlContext.sql('Select Count(1) as Cuenta, Model from diskData group by Model, Serial_Number order by Cuenta desc')\
          .repartition(1)\
          .write.format("parquet")\
          .save("output/parquet/transacciones", mode="OVERWRITE")

In [None]:
df_averagedia.createOrReplaceTempView('tblAverageDia')

Cantidad de TB en el Datacenter por modelo de disco duro

In [None]:
sqlContext.sql('select Model, CAST(SUM(capacity_bytes/1099511627776) AS DECIMAL(10,2)) as TB from diskData group by model order by TB desc').show()

Top 5 modeos de discos más confiables por año

In [None]:
sqlContext.sql('').show()

Top 5 modeos de discos más confiables por año

In [None]:
sqlContext.sql('').show(150)

In [None]:
Cantidad de discos que fallaron por año vs discos en el data center

In [None]:
sqlContext.sql('Select Count, Failures, Failures.Year from (select count(distinct Serial_Number) as Count, Year(Date) as Year from diskData group by Year(date)) as Disks inner Join (select Count(Failure) as Failures, Year(Date) as Year from diskData where Failure = 1 group by year(Date) order by Year) as Failures on Disks.Year = Failures.Year order by Failures.Year').show()

In [None]:
selectedData = dia.select("Date", "Serial_Number")

In [None]:
selectedData.write.csv('test2.csv')