# Pós-Graduação - Ciência de Dados & Big Data

## Pontifícia Universidade Católica de Minas Gerais (PUC-MG)

### Aluno: Victor Hugo Negrisoli

### Utilizando DataFrames com Spark


In [1]:
from pyspark.sql import SparkSession
import json

#### Lendo os dados iniciais via dicionário e via JSON

In [2]:
pessoas = []
pessoas.append({
  'id': 1,
  'nome': 'Bob',
  'idade': 45,
  'gen': 'M'
})
pessoas.append({
  'id': 2,
  'nome': 'Gloria',
  'idade': 43,
  'gen': 'F'
})
pessoas.append({
  'id': 4,
  'nome': 'Albert',
  'idade': 28,
  'gen': 'M'
})
pessoas.append({
  'id': 5,
  'nome': 'Laura',
  'idade': 33,
  'gen': 'F'
})
pessoas.append({
  'id': 8,
  'nome': 'Simone',
  'idade': 18,
  'gen': 'T'
})
pessoas.append({
  'id': 12,
  'nome': 'Marta',
  'idade': 45,
  'gen': 'F'
})
pessoas.append({
  'id': 45,
  'nome': 'Jairo',
  'idade': 82,
  'gen': 'M'
})
pessoas.append({
  'id': 13,
  'nome': 'Teste',
  'idade': 38,
  'gen': 'T'
})

In [3]:
spark = SparkSession\
            .builder\
            .appName("Pratica - Funções Spark")\
            .getOrCreate()

sc = spark.sparkContext

In [7]:
pessoasRDD = sc.parallelize(pessoas)
pessoasRDD.collect()

[{'id': 1, 'nome': 'Bob', 'idade': 45, 'gen': 'M'},
 {'id': 2, 'nome': 'Gloria', 'idade': 43, 'gen': 'F'},
 {'id': 4, 'nome': 'Albert', 'idade': 28, 'gen': 'M'},
 {'id': 5, 'nome': 'Laura', 'idade': 33, 'gen': 'F'},
 {'id': 8, 'nome': 'Simone', 'idade': 18, 'gen': 'T'},
 {'id': 12, 'nome': 'Marta', 'idade': 45, 'gen': 'F'},
 {'id': 45, 'nome': 'Jairo', 'idade': 82, 'gen': 'M'},
 {'id': 13, 'nome': 'Teste', 'idade': 38, 'gen': 'T'}]

In [6]:
pessoasJson = sc.textFile('pessoasval.json')
pessoasJsonRDD = pessoasJson.flatMap(lambda x: json.loads(x))
pessoasJsonRDD.collect()

[{'id': 1, 'val': 45, 'dat': '12/01/2006'},
 {'id': 2, 'val': 53, 'dat': '04/06/2009'},
 {'id': 4, 'val': 345, 'dat': '18/01/2012'},
 {'id': 5, 'val': 435, 'dat': '12/01/2016'},
 {'id': 8, 'val': 2003, 'dat': '08/09/2015'},
 {'id': 12, 'val': 100, 'dat': '12/11/2000'},
 {'id': 45, 'val': 200, 'dat': '12/01/2006'},
 {'id': 13, 'val': 99999, 'dat': '12/01/2006'},
 {'id': 1, 'val': 405, 'dat': '12/03/2006'},
 {'id': 2, 'val': 503, 'dat': '04/09/2009'},
 {'id': 4, 'val': 35, 'dat': '01/10/2012'},
 {'id': 5, 'val': 45, 'dat': '12/12/2016'},
 {'id': 8, 'val': 23, 'dat': '01/01/2015'},
 {'id': 12, 'val': 10, 'dat': '02/01/2002'},
 {'id': 45, 'val': 20, 'dat': '12/12/2006'},
 {'id': 13, 'val': 99999, 'dat': '12/01/2007'}]

#### Criando os DataFrames com base nos RDDs definidos

In [10]:
pessoasDf = pessoasRDD.toDF()
pessoasJsonDf = pessoasJsonRDD.toDF()

In [13]:
pessoasDf.head(5)

[Row(gen='M', id=1, idade=45, nome='Bob'),
 Row(gen='F', id=2, idade=43, nome='Gloria'),
 Row(gen='M', id=4, idade=28, nome='Albert'),
 Row(gen='F', id=5, idade=33, nome='Laura'),
 Row(gen='T', id=8, idade=18, nome='Simone')]

In [14]:
pessoasJsonDf.head(5)

[Row(dat='12/01/2006', id=1, val=45),
 Row(dat='04/06/2009', id=2, val=53),
 Row(dat='18/01/2012', id=4, val=345),
 Row(dat='12/01/2016', id=5, val=435),
 Row(dat='08/09/2015', id=8, val=2003)]

Validando a estrutura dos DataFrames, ou seja, qual seu schema

In [15]:
pessoasDf.printSchema()

root
 |-- gen: string (nullable = true)
 |-- id: long (nullable = true)
 |-- idade: long (nullable = true)
 |-- nome: string (nullable = true)



In [16]:
pessoasJsonDf.printSchema()

root
 |-- dat: string (nullable = true)
 |-- id: long (nullable = true)
 |-- val: long (nullable = true)



#### Realizando operações estatísticas nos DataFrames

In [20]:
totalPessoas = pessoasDf.count()
totalPessoasJson = pessoasJsonDf.count()

print('Total de itens em Pessoas: {}\nTotal de itens em Pessoas JSON: {}'.format(totalPessoas, totalPessoasJson))

Total de itens em Pessoas: 8
Total de itens em Pessoas JSON: 16


Selecionando valores específicos

In [23]:
pessoasDf \
        .select('nome', 'idade') \
        .distinct() \
        .collect()

[Row(nome='Teste', idade=38),
 Row(nome='Laura', idade=33),
 Row(nome='Marta', idade=45),
 Row(nome='Jairo', idade=82),
 Row(nome='Gloria', idade=43),
 Row(nome='Simone', idade=18),
 Row(nome='Albert', idade=28),
 Row(nome='Bob', idade=45)]

Estatística descritiva sobre uma consulta

In [24]:
pessoasDf \
        .select('nome', 'idade') \
        .describe() \
        .show()

+-------+------+------------------+
|summary|  nome|             idade|
+-------+------+------------------+
|  count|     8|                 8|
|   mean|  null|              41.5|
| stddev|  null|18.845234335047607|
|    min|Albert|                18|
|    max| Teste|                82|
+-------+------+------------------+



In [32]:
pessoasJsonDf \
            .select('id') \
            .distinct() \
            .collect()

[Row(id=5),
 Row(id=1),
 Row(id=12),
 Row(id=8),
 Row(id=2),
 Row(id=4),
 Row(id=13),
 Row(id=45)]

In [33]:
pessoasDf \
        .crosstab('idade', 'gen') \
        .show()

+---------+---+---+---+
|idade_gen|  F|  M|  T|
+---------+---+---+---+
|       28|  0|  1|  0|
|       38|  0|  0|  1|
|       33|  1|  0|  0|
|       45|  1|  1|  0|
|       18|  0|  0|  1|
|       43|  1|  0|  0|
|       82|  0|  1|  0|
+---------+---+---+---+



In [34]:
pessoasJsonDf \
        .crosstab('dat', 'val') \
        .show()

+----------+---+---+---+---+----+---+---+---+---+---+---+---+---+-----+
|   dat_val| 10|100| 20|200|2003| 23|345| 35|405|435| 45|503| 53|99999|
+----------+---+---+---+---+----+---+---+---+---+---+---+---+---+-----+
|18/01/2012|  0|  0|  0|  0|   0|  0|  1|  0|  0|  0|  0|  0|  0|    0|
|01/10/2012|  0|  0|  0|  0|   0|  0|  0|  1|  0|  0|  0|  0|  0|    0|
|12/01/2007|  0|  0|  0|  0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    1|
|04/06/2009|  0|  0|  0|  0|   0|  0|  0|  0|  0|  0|  0|  0|  1|    0|
|02/01/2002|  1|  0|  0|  0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    0|
|04/09/2009|  0|  0|  0|  0|   0|  0|  0|  0|  0|  0|  0|  1|  0|    0|
|08/09/2015|  0|  0|  0|  0|   1|  0|  0|  0|  0|  0|  0|  0|  0|    0|
|12/03/2006|  0|  0|  0|  0|   0|  0|  0|  0|  1|  0|  0|  0|  0|    0|
|12/12/2006|  0|  0|  1|  0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    0|
|12/01/2016|  0|  0|  0|  0|   0|  0|  0|  0|  0|  1|  0|  0|  0|    0|
|01/01/2015|  0|  0|  0|  0|   0|  1|  0|  0|  0|  0|  0|  0|  0

Realizando filtros no DataFrame

In [40]:
pessoasDf \
        .filter((pessoasDf.gen == 'M') & (pessoasDf.idade > 30)) \
        .show()

+---+---+-----+-----+
|gen| id|idade| nome|
+---+---+-----+-----+
|  M|  1|   45|  Bob|
|  M| 45|   82|Jairo|
+---+---+-----+-----+



In [66]:
pessoasJsonDf \
        .filter((pessoasJsonDf.val > 100) & (pessoasJsonDf.id < 100)) \
        .show()

+----------+---+-----+
|       dat| id|  val|
+----------+---+-----+
|18/01/2012|  4|  345|
|12/01/2016|  5|  435|
|08/09/2015|  8| 2003|
|12/01/2006| 45|  200|
|12/01/2006| 13|99999|
|12/03/2006|  1|  405|
|04/09/2009|  2|  503|
|12/01/2007| 13|99999|
+----------+---+-----+



Utilizando agregações com `agg` e `groupBy`

In [91]:
# Criando uma coluna de Ano a partir da coluna de data
# e agrupando esta coluna pelo somatório da coluna Val
# e ordenando por ano em ordem decrescente

pessoasValPorAno = pessoasJsonDf \
                    .withColumn('Ano', pessoasJsonDf.dat.substr(7, 11)) \
                    .groupBy('Ano') \
                    .agg({'val': 'sum'}) \
                    .orderBy('Ano', ascending = False)
pessoasValPorAno.show()

+----+--------+
| Ano|sum(val)|
+----+--------+
|2016|     480|
|2015|    2026|
|2012|     380|
|2009|     556|
|2007|   99999|
|2006|  100669|
|2002|      10|
|2000|     100|
+----+--------+

