# Aula 8 - Pyspark

## Revisão

### Acessar um tabela Hive via pyspark

Criar o contexto `HiveContext`:

    from pyspark.sql import HiveContext
    contexto = HiveContext(sc)


Conectar o banco de dados na tabela:

    banco = contexto.table("hr.jobs")
    banco.show()

Vamos registra a tabela no spark para ficar disponível para execução de querys

    banco.registerTempTable("jobs")
    contexto.sql('Select * from jobs').show()
    contexto.sql('Select *  from jobs order by salario_max DESC limit 1').show()

In [26]:
from pyspark.sql import HiveContext
contexto = HiveContext(sc)

In [27]:
contexto.tableNames

<bound method SQLContext.tableNames of <pyspark.sql.context.HiveContext object at 0x7f5035beea50>>

### Criar um dataframe

A variável `jobs` é nosso dataframe

    jobs = contexto.sql("select * from jobs") 
    jobs.show()
    

### Alguns comandos

  
    jobs.show()
    jobs.printSchema()
    jobs.select('job_title').show()
    jobs.select('job_title', 'salario_max').show()
    jobs.select('salario_max').distinct().show()
    jobs.select('salario_max').distinct().count().show()
   

# Iniciando o pyspark

Para instalar `pyspark` localmente, execute em uma célula:

    pip install pyspark
    
    
**ATENÇÂO:** Se a célula abaixo falhar, pode ser necessário instalar o Java na sua máquina e reiniciar o computador!

[Download Java para Windows](https://www.java.com/pt-BR/download/ie_manual.jsp?locale=pt_BR)

Para que o pyspark funcione, é preciso criar e configurar um ambiente.

In [1]:
# importar as funções
from pyspark import sql, SparkContext, HiveContext

# criar o sparkcontext
sc = SparkContext()

# criar a sessão spark
spark = sql.SparkSession(sc)

In [2]:
sc

In [3]:
spark

In [4]:
sc.getConf()

<pyspark.conf.SparkConf at 0x1e35a323df0>

# Acessar arquivo csv com pyspark 

## Criar uma variável RDD a partir do CSV

In [8]:
jobs = sc.textFile('data/jobs.csv')
jobs.collect()

['1,Public Accountant,4200.00,9000.00',
 '2,Accounting Manager,8200.00,16000.00',
 '3,Administration Assistant,3000.00,6000.00',
 '4,President,20000.00,40000.00',
 '5,Administration Vice President,15000.00,30000.00',
 '6,Accountant,4200.00,9000.00',
 '7,Finance Manager,8200.00,16000.00',
 '8,Human Resources Representative,4000.00,9000.00',
 '9,Programmer,4000.00,10000.00',
 '10,Marketing Manager,9000.00,15000.00',
 '11,Marketing Representative,4000.00,9000.00',
 '12,Public Relations Representative,4500.00,10500.00',
 '13,Purchasing Clerk,2500.00,5500.00',
 '14,Purchasing Manager,8000.00,15000.00',
 '15,Sales Manager,10000.00,20000.00',
 '16,Sales Representative,6000.00,12000.00',
 '17,Shipping Clerk,2500.00,5500.00',
 '18,Stock Clerk,2000.00,5000.00',
 '19,Stock Manager,5500.00,8500.00']

## Acessar um arquivo csv via pyspark

In [12]:
# ler tabela countries
countries = spark.read.csv('data/countries.csv', header=True)
countries.show()

+----------+------------+---------+
|country_id|country_name|region_id|
+----------+------------+---------+
|        AR|   Argentina|        2|
|        AU|   Australia|        3|
|        BE|     Belgium|        1|
|        BR|      Brazil|        2|
|        CA|      Canada|        2|
|        CH| Switzerland|        1|
|        CN|       China|        3|
|        DE|     Germany|        1|
|        DK|     Denmark|        1|
|        EG|       Egypt|        4|
|        FR|      France|        1|
|        HK|    HongKong|        3|
|        IL|      Israel|        4|
|        IN|       India|        3|
|        IT|       Italy|        1|
|        JP|       Japan|        3|
|        KW|      Kuwait|        4|
|        MX|      Mexico|        2|
|        NG|     Nigeria|        4|
|        NL| Netherlands|        1|
+----------+------------+---------+
only showing top 20 rows



## Adicionar headers quando não estão presentes no arquivo

In [13]:
# ler arquivo jobs
jobs = spark.read.csv('data/jobs.csv')
jobs.show()

+---+--------------------+--------+--------+
|_c0|                 _c1|     _c2|     _c3|
+---+--------------------+--------+--------+
|  1|   Public Accountant| 4200.00| 9000.00|
|  2|  Accounting Manager| 8200.00|16000.00|
|  3|Administration As...| 3000.00| 6000.00|
|  4|           President|20000.00|40000.00|
|  5|Administration Vi...|15000.00|30000.00|
|  6|          Accountant| 4200.00| 9000.00|
|  7|     Finance Manager| 8200.00|16000.00|
|  8|Human Resources R...| 4000.00| 9000.00|
|  9|          Programmer| 4000.00|10000.00|
| 10|   Marketing Manager| 9000.00|15000.00|
| 11|Marketing Represe...| 4000.00| 9000.00|
| 12|Public Relations ...| 4500.00|10500.00|
| 13|    Purchasing Clerk| 2500.00| 5500.00|
| 14|  Purchasing Manager| 8000.00|15000.00|
| 15|       Sales Manager|10000.00|20000.00|
| 16|Sales Representative| 6000.00|12000.00|
| 17|      Shipping Clerk| 2500.00| 5500.00|
| 18|         Stock Clerk| 2000.00| 5000.00|
| 19|       Stock Manager| 5500.00| 8500.00|
+---+-----

In [14]:
# importar arquivos de suporte
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType

# criar schema
schema_ = StructType() \
        .add('indice', IntegerType(), True) \
        .add('job_title', StringType(), True) \
        .add('salario_min', FloatType(), True) \
        .add('salario_max', FloatType(), True)

# ler o arquivo
df = spark.read.csv('data/jobs.csv', schema=schema_)
df.show()

+------+--------------------+-----------+-----------+
|indice|           job_title|salario_min|salario_max|
+------+--------------------+-----------+-----------+
|     1|   Public Accountant|     4200.0|     9000.0|
|     2|  Accounting Manager|     8200.0|    16000.0|
|     3|Administration As...|     3000.0|     6000.0|
|     4|           President|    20000.0|    40000.0|
|     5|Administration Vi...|    15000.0|    30000.0|
|     6|          Accountant|     4200.0|     9000.0|
|     7|     Finance Manager|     8200.0|    16000.0|
|     8|Human Resources R...|     4000.0|     9000.0|
|     9|          Programmer|     4000.0|    10000.0|
|    10|   Marketing Manager|     9000.0|    15000.0|
|    11|Marketing Represe...|     4000.0|     9000.0|
|    12|Public Relations ...|     4500.0|    10500.0|
|    13|    Purchasing Clerk|     2500.0|     5500.0|
|    14|  Purchasing Manager|     8000.0|    15000.0|
|    15|       Sales Manager|    10000.0|    20000.0|
|    16|Sales Representative

# Analíses com pyspark

## select

In [16]:
df.select('job_title', 'salario_max').show()

+--------------------+-----------+
|           job_title|salario_max|
+--------------------+-----------+
|   Public Accountant|     9000.0|
|  Accounting Manager|    16000.0|
|Administration As...|     6000.0|
|           President|    40000.0|
|Administration Vi...|    30000.0|
|          Accountant|     9000.0|
|     Finance Manager|    16000.0|
|Human Resources R...|     9000.0|
|          Programmer|    10000.0|
|   Marketing Manager|    15000.0|
|Marketing Represe...|     9000.0|
|Public Relations ...|    10500.0|
|    Purchasing Clerk|     5500.0|
|  Purchasing Manager|    15000.0|
|       Sales Manager|    20000.0|
|Sales Representative|    12000.0|
|      Shipping Clerk|     5500.0|
|         Stock Clerk|     5000.0|
|       Stock Manager|     8500.0|
+--------------------+-----------+



## filter e/ou where

In [18]:
df.filter(df.salario_min>=15000).show()

+------+--------------------+-----------+-----------+
|indice|           job_title|salario_min|salario_max|
+------+--------------------+-----------+-----------+
|     4|           President|    20000.0|    40000.0|
|     5|Administration Vi...|    15000.0|    30000.0|
+------+--------------------+-----------+-----------+



In [19]:
df.where(df.salario_min < 15000).show()

+------+--------------------+-----------+-----------+
|indice|           job_title|salario_min|salario_max|
+------+--------------------+-----------+-----------+
|     1|   Public Accountant|     4200.0|     9000.0|
|     2|  Accounting Manager|     8200.0|    16000.0|
|     3|Administration As...|     3000.0|     6000.0|
|     6|          Accountant|     4200.0|     9000.0|
|     7|     Finance Manager|     8200.0|    16000.0|
|     8|Human Resources R...|     4000.0|     9000.0|
|     9|          Programmer|     4000.0|    10000.0|
|    10|   Marketing Manager|     9000.0|    15000.0|
|    11|Marketing Represe...|     4000.0|     9000.0|
|    12|Public Relations ...|     4500.0|    10500.0|
|    13|    Purchasing Clerk|     2500.0|     5500.0|
|    14|  Purchasing Manager|     8000.0|    15000.0|
|    15|       Sales Manager|    10000.0|    20000.0|
|    16|Sales Representative|     6000.0|    12000.0|
|    17|      Shipping Clerk|     2500.0|     5500.0|
|    18|         Stock Clerk

In [20]:
df_min = df.filter(df.salario_min>=15000)
df_min.show()

+------+--------------------+-----------+-----------+
|indice|           job_title|salario_min|salario_max|
+------+--------------------+-----------+-----------+
|     4|           President|    20000.0|    40000.0|
|     5|Administration Vi...|    15000.0|    30000.0|
+------+--------------------+-----------+-----------+



In [22]:
df_min.select('job_title').where(df.salario_min == 15000).show()

+--------------------+
|           job_title|
+--------------------+
|Administration Vi...|
+--------------------+



## sum, min, max, mean

In [25]:
df.select('salario_min').groupby().sum().show()

+----------------+
|sum(salario_min)|
+----------------+
|        124800.0|
+----------------+



In [26]:
df_min.select('salario_min').groupby().sum().show()

+----------------+
|sum(salario_min)|
+----------------+
|         35000.0|
+----------------+



In [27]:
df.select('salario_max').groupby().mean().show()

+------------------+
|  avg(salario_max)|
+------------------+
|13210.526315789473|
+------------------+



In [28]:
df.select('salario_max').groupby().stddev().show()

AttributeError: 'GroupedData' object has no attribute 'stddev'

## agg

In [29]:
df.agg({'salario_max': 'stddev'}).show()

+-------------------+
|stddev(salario_max)|
+-------------------+
|  8876.178778483585|
+-------------------+



In [32]:
df.agg({'salario_max': 'mean'}).show()

+------------------+
|  avg(salario_max)|
+------------------+
|13210.526315789473|
+------------------+



## [join](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html)



In [33]:
# ler tabela employees
df_emp = spark.read.csv('data/employees.csv', header=True)
df_emp.show()

+-----------+-----------+----------+--------------------+------------+----------+------+--------+----------+-------------+
|employee_id| first_name| last_name|               email|phone_number| hire_date|job_id|  salary|manager_id|department_id|
+-----------+-----------+----------+--------------------+------------+----------+------+--------+----------+-------------+
|        100|     Steven|      King|steven.king@sqltu...|515.123.4567|1987-06-17|     4|24000.00|      null|            9|
|        101|      Neena|   Kochhar|neena.kochhar@sql...|515.123.4568|1989-09-21|     5|17000.00|       100|            9|
|        102|        Lex|   De Haan|lex.de haan@sqltu...|515.123.4569|1993-01-13|     5|17000.00|       100|            9|
|        103|  Alexander|    Hunold|alexander.hunold@...|590.423.4567|1990-01-03|     9| 9000.00|       102|            6|
|        104|      Bruce|     Ernst|bruce.ernst@sqltu...|590.423.4568|1991-05-21|     9| 6000.00|       103|            6|
|        105|   

In [35]:
# join
df_join = df_emp.join(df, on=df_emp.job_id==df.indice, how='left' )
df_join.select(df_emp.first_name, df_emp.salary, df.job_title, df.salario_max).show()

+-----------+--------+--------------------+-----------+
| first_name|  salary|           job_title|salario_max|
+-----------+--------+--------------------+-----------+
|     Steven|24000.00|           President|    40000.0|
|      Neena|17000.00|Administration Vi...|    30000.0|
|        Lex|17000.00|Administration Vi...|    30000.0|
|  Alexander| 9000.00|          Programmer|    10000.0|
|      Bruce| 6000.00|          Programmer|    10000.0|
|      David| 4800.00|          Programmer|    10000.0|
|      Valli| 4800.00|          Programmer|    10000.0|
|      Diana| 4200.00|          Programmer|    10000.0|
|      Nancy|12000.00|     Finance Manager|    16000.0|
|     Daniel| 9000.00|          Accountant|     9000.0|
|       John| 8200.00|          Accountant|     9000.0|
|     Ismael| 7700.00|          Accountant|     9000.0|
|Jose Manuel| 7800.00|          Accountant|     9000.0|
|       Luis| 6900.00|          Accountant|     9000.0|
|        Den|11000.00|  Purchasing Manager|    1

## toPandas

In [36]:
df_pandas = df.toPandas()
df_pandas

Unnamed: 0,indice,job_title,salario_min,salario_max
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0
2,3,Administration Assistant,3000.0,6000.0
3,4,President,20000.0,40000.0
4,5,Administration Vice President,15000.0,30000.0
5,6,Accountant,4200.0,9000.0
6,7,Finance Manager,8200.0,16000.0
7,8,Human Resources Representative,4000.0,9000.0
8,9,Programmer,4000.0,10000.0
9,10,Marketing Manager,9000.0,15000.0


In [38]:
df_pandas[['salario_max']]

Unnamed: 0,salario_max
0,9000.0
1,16000.0
2,6000.0
3,40000.0
4,30000.0
5,9000.0
6,16000.0
7,9000.0
8,10000.0
9,15000.0


# Comparando pandas e pyspark (local)

## Pandas

In [39]:
%%timeit
df_pandas['salario_max'].sum()

45.8 µs ± 1.95 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


## Pyspark

In [41]:
%%timeit
df.agg({'salario_max': 'mean'})

4.96 ms ± 368 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## Conclusão

In [42]:
# 4960 microsegundos

# fator: 100

# GHP

In [None]:
from bifrost.hadoop import Hadoop

ghp = Hadoop(database='ghp_que_voce_quer', password=getpass.getpass(prompt='Digite a senha: '))
df = ghp.query('SELECT ....')
