<a href="https://colab.research.google.com/github/patricktapajos/pos-ciencia-dados/blob/master/mod04-appv/aula01/atividade02/grupo9_etl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **ETL com PySpark**

## Instalação do postgres

In [None]:
# Install postgresql server
!sudo -S apt-get -y -qq update
!sudo -S apt-get -y -qq install postgresql
!sudo -S service postgresql start

## Criação de usuário e database

In [None]:
# Setup a password `postgres` for username `postgres`
!sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"

# Setup a database with name `northwind` to be used
!sudo -u postgres psql -U postgres -c 'DROP DATABASE IF EXISTS northwind;'
!sudo -u postgres psql -U postgres -c 'CREATE DATABASE northwind;'
!pip install pgspecial

## Importação do arquivo para o schema/database

Para facilitar a importação da estrutura e dos dados (DDL), vamos criar variáveis de ambiente

In [3]:
%env DATABASE_NAME=northwind
%env DATABASE_HOST=localhost
%env DATABASE_PORT=5432
%env DATABASE_USER=postgres
%env DATABASE_PASS=postgres

env: DATABASE_NAME=northwind
env: DATABASE_HOST=localhost
env: DATABASE_PORT=5432
env: DATABASE_USER=postgres
env: DATABASE_PASS=postgres


In [4]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [5]:
!sudo PGPASSWORD=$DATABASE_PASS psql -q -h $DATABASE_HOST -p $DATABASE_PORT -U $DATABASE_USER -d $DATABASE_NAME -f '/content/drive/Shareddrives/pgds_dataset/northwind.sql'

## Conexão com o postgresql



In [6]:
# set connection
%load_ext sql
%config SqlMagic.feedback=False 
%config SqlMagic.autopandas=True
%sql postgresql+psycopg2://postgres:postgres@localhost/northwind


  """)


'Connected: postgres@northwind'

## Exportando tabelas do schema 'Northwind' para o formato csv

In [7]:
# %sql SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'northwind'
tables_list = %sql SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public'
tables_list

 * postgresql+psycopg2://postgres:***@localhost/northwind


Unnamed: 0,tablename
0,customer_customer_demo
1,suppliers
2,shippers
3,customer_demographics
4,territories
5,region
6,us_states
7,categories
8,employees
9,employee_territories


In [14]:
import os

if not os.path.exists("northwind_db"):
    !mkdir northwind_db

for i in tables_list['tablename']:
  if not os.path.exists(f"/content/northwind_db/{i}.csv"):
    !touch "/content/northwind_db/{i}.csv"
  !chmod 777 "/content/northwind_db/{i}.csv"
  %sql copy {i} TO '/content/northwind_db/{i}.csv' DELIMITER ',' CSV HEADER;

 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind
 * postgresql+psycopg2://postgres:***@localhost/northwind


## Instalação do PySpark

No bloco abaixo, realizamos a instalação dos pacotes necessários para o funcionamento do PySpark no Google Colab

In [16]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

Com o PySpark instalado, precisamos setar duas variáveis de sistema que o mesmo irá utilizar durante sua execução:

In [17]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

Neste ponto, fazemos um pequeno teste de modo a verificar o funcionamento correto do PySpark.

In [18]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## ETL - Extract, Transform and Load

Com o PySpark corretamente instalado, o primeiro passo de nosso ETL é a *Extração*.

Neste ponto, vamos importar nossa base e preparar a mesma para ser carregada no PySpark. Depois de carregada no PySpark, iremos agrupar os dados com base em uma das colunas.

In [19]:
from pyspark.sql import SQLContext

scSpark = SparkSession \
        .builder \
        .appName("reading csv") \
        .getOrCreate()
data_file = '/content/northwind_db/orders.csv'
sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache()
print('Total Records = {}'.format(sdfData.count()))
sdfData.show()

Total Records = 830
+--------+-----------+-----------+----------+-------------+------------+--------+-------+--------------------+--------------------+--------------+-----------+----------------+------------+
|order_id|customer_id|employee_id|order_date|required_date|shipped_date|ship_via|freight|           ship_name|        ship_address|     ship_city|ship_region|ship_postal_code|ship_country|
+--------+-----------+-----------+----------+-------------+------------+--------+-------+--------------------+--------------------+--------------+-----------+----------------+------------+
|   10248|      VINET|          5|1996-07-04|   1996-08-01|  1996-07-16|       3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|       null|           51100|      France|
|   10249|      TOMSP|          6|1996-07-05|   1996-08-16|  1996-07-10|       1|  11.61|  Toms Spezialitäten|       Luisenstr. 48|       Münster|       null|           44087|     Germany|
|   10250|      HANAR|          4|1