## Bibliotecas

In [154]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

## Criação da Sessão do PySpark

In [155]:
spark = SparkSession.builder.appName("Treinamento").getOrCreate()

## Leitura da Base

In [156]:
schema = StructType([

    StructField("datetime", TimestampType(), True),
    StructField("instance_type", StringType(), True),
    StructField("os", StringType(), True),
    StructField("region", StringType(), True),
    StructField("price", DoubleType(), True)

])

In [157]:
%%time
df = spark.read.csv('../datasets/*.csv', header = False, schema = schema)

CPU times: user 15.6 ms, sys: 15.6 ms, total: 31.2 ms
Wall time: 21.6 ms


In [158]:
df = df.limit(1000)

In [159]:
df.count()

1000

In [160]:
# CRiando uma view temporária
df.createOrReplaceTempView("Spots")

## Conhecendo o negócio

In [161]:
# Quantas instâncias distintas temos?
df.select('instance_type').distinct().count()

51

In [162]:
df.select('instance_type').distinct().show()

+-------------+
|instance_type|
+-------------+
|   c3.8xlarge|
|     i3.large|
|   c4.8xlarge|
|   m2.4xlarge|
|   r3.2xlarge|
|    r3.xlarge|
|    i2.xlarge|
|     c4.large|
|    c4.xlarge|
|    m1.xlarge|
|    m3.xlarge|
|   r3.8xlarge|
|   c4.2xlarge|
|   r4.4xlarge|
|     m3.large|
|    c3.xlarge|
|   m2.2xlarge|
|    m2.xlarge|
|   g2.2xlarge|
|   m4.4xlarge|
+-------------+
only showing top 20 rows



In [163]:
# Quais são os sistemas operacionais disponibilizados pela AWS?
df.select('os').distinct().show()

+----------+
|        os|
+----------+
|   Windows|
|SUSE Linux|
|Linux/UNIX|
+----------+



In [164]:
spark.sql(
    """
    SELECT DISTINCT os

    FROM Spots

    """

).show()

+----------+
|        os|
+----------+
|   Windows|
|SUSE Linux|
|Linux/UNIX|
+----------+



In [165]:
# Qual a média, max e min de preços das instâncias do tipo c3?
df.filter(df.instance_type.contains("c3")).agg(
                                                F.round(F.avg(df.price), 2).alias('price_avg'),
                                                F.max(df.price).alias('price_max'),
                                                F.min(df.price).alias('price_min')
                                                ).show()

+---------+---------+---------+
|price_avg|price_max|price_min|
+---------+---------+---------+
|     0.61|   1.7468|    0.045|
+---------+---------+---------+



In [166]:
spark.sql(
    """
    SELECT round(avg(price),2) price_avg,
           max(price) price_max,
           min(price) price_min

    FROM Spots

    WHERE instance_type like '%c3%'

    """
).show()

+---------+---------+---------+
|price_avg|price_max|price_min|
+---------+---------+---------+
|     0.61|   1.7468|    0.045|
+---------+---------+---------+



In [167]:
# Se as maquinas windows em média são mais caras que as outras?
df.groupBy('os').agg(F.avg(df.price).alias('avg')).sort(F.col('avg').desc()).show()

+----------+-------------------+
|        os|                avg|
+----------+-------------------+
|   Windows| 0.6005775933609958|
|SUSE Linux| 0.5088418421052634|
|Linux/UNIX|0.40886332453825847|
+----------+-------------------+



## Aplicando regras

In [168]:
df_tratamento = df.withColumn("tipo", F.split(df['instance_type'], "\.")[0])

In [169]:
df_tratamento.show()

+-------------------+-------------+----------+---------------+------+----+
|           datetime|instance_type|        os|         region| price|tipo|
+-------------------+-------------+----------+---------------+------+----+
|2017-05-08 18:46:36|   c3.8xlarge|   Windows|ap-northeast-1a|1.6503|  c3|
|2017-05-08 18:46:36|   c3.8xlarge|   Windows|ap-northeast-1c|1.7461|  c3|
|2017-05-08 18:46:34|     i3.large|SUSE Linux|ap-northeast-1c|0.1223|  i3|
|2017-05-08 18:46:34|     i3.large|Linux/UNIX|ap-northeast-1c|0.0223|  i3|
|2017-05-08 18:46:17|   c4.8xlarge|SUSE Linux|ap-northeast-1a| 0.789|  c4|
|2017-05-08 18:46:17|   c4.8xlarge|Linux/UNIX|ap-northeast-1a| 0.689|  c4|
|2017-05-08 18:46:17|   m2.4xlarge|SUSE Linux|ap-northeast-1c|0.2782|  m2|
|2017-05-08 18:46:17|   m2.4xlarge|Linux/UNIX|ap-northeast-1c|0.1782|  m2|
|2017-05-08 18:46:10|   r3.2xlarge|SUSE Linux|ap-northeast-1c|0.2282|  r3|
|2017-05-08 18:46:10|   r3.2xlarge|Linux/UNIX|ap-northeast-1c|0.1282|  r3|
|2017-05-08 18:46:09|    

In [170]:
# Média por familia
df_type = df_tratamento.groupBy('tipo').agg(F.avg(df.price).alias('Avg')).sort(F.col('Avg').desc())

In [171]:
df_type = df_type.withColumn('os', F.lit("TESTE"))

In [172]:
# df_teste = df_tratamento.join(df_type, ['tipo'], how = 'left')
df_tratamento = df_tratamento.alias('A').join(df_type.alias('B'),
                              F.col('A.tipo') == F.col('B.tipo'),
                              how = 'left').select(
                                  [F.col('A.' + xx) for xx in df_tratamento.columns] + [F.col('B.avg')]
                              )

In [173]:
# Realizar condição que classifica um preço como ALTO ou BAIXO comparando preço e avg
df_tratamento = df_tratamento.withColumn('classificacao', F.when(df.price > df_tratamento.avg, "ALTO").otherwise("BAIXO"))


In [174]:
# Count por classificacao
df_tratamento.groupBy('classificacao').count().show()

+-------------+-----+
|classificacao|count|
+-------------+-----+
|         ALTO|  363|
|        BAIXO|  637|
+-------------+-----+



In [175]:
# Transformar dataframe em parquet
df_tratamento.limit(1000).write.mode('overwrite').partitionBy('classificacao').parquet('../datasets/tratamento.parquet')

In [177]:
# Transformar dataframe em parquet
df_tratamento = df_tratamento.withColumn('date', df_tratamento['datetime'].cast(DateType()))
df_tratamento = df_tratamento.withColumn('date_str', df_tratamento['date'].cast(StringType()))

In [183]:
# df_tratamento.write.mode('overwrite').partitionBy('date').parquet('../datasets/tratamento.parquet')
df_tratamento = df_tratamento.withColumn("ano", F.split(df_tratamento['date_str'], "\-")[0]) \
             .withColumn("mes", F.split(df_tratamento['date_str'], "\-")[1]) \
             .withColumn("dia", F.split(df_tratamento['date_str'], "\-")[2]) 
             

In [185]:
df_tratamento.write.mode('overwrite').partitionBy(['ano', 'mes', 'dia']).parquet('../datasets/tratamento.parquet')