#Utilizando o Spark no Google Colab

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

###Carregamento de Dados

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

###Acessando o Spark UI (Google Colab)

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

In [None]:
get_ipython().system_raw('./ngrok http 4050 &')

!curl -s http://localhost:4040/api/tunnels

#DataFrames com Spark

###Montando nosso drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

###Carregando os dados das empresas

In [None]:
import zipfile

In [None]:
path = '/content/drive/MyDrive/curso-spark-alura/empresas'
empresas = spark.read.csv(path, sep=';', inferSchema=True)
empresas.count()

#Manipulando os Dados

###Renomeando as colunas do DataFrame

In [None]:
col_empresas = empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
col_estabelecimentos = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
col_socios = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']


In [None]:
for index, name_col in enumerate(col_empresas):
  empresas = empresas.withColumnRenamed(f'_c{index}', name_col)
empresas.limit(5).toPandas()

###Analisando os dados

In [None]:
empresas.printSchema()

In [None]:
capital_social = string ---> converter string para double
separador decimal = ,   ---> converter , para .
colunas data = string   ---> converter para tipo date

###Modificando os tipos de dados

StringType ➔ DoubleType

In [None]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()) )

"," ➔ "."

In [None]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace("capital_social_da_empresa", ",", "."))

StringType ➔ DateType

In [None]:
stabelecimentos = estabelecimentos\
  .withColumn('data_situacao_cadastral' , f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd'))\
  .withColumn('data_de_inicio_atividade' , f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd'))\
  .withColumn('data_da_situacao_especial' , f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd'))

#Seleções e consultas

###Selecionando informações

In [None]:
empresas.select('natureza_juridica', 'porte_da_empresa').show(5)

In [None]:
socios.select('faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_entrada_sociedade')).show(5, False)

###Identificando valores nulos

In [None]:
socios1 = socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

In [None]:
socios.na.fill(0).limit(5).toPandas()

In [None]:
socios.na.fill('-').limit(5).toPandas()

###Ordenando os dados

In [None]:
socios.select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending= [False, False])\
  .show(10, False)

###Filtrando os dados

In [None]:
empresas.where('capital_social_da_empresa == 50').show(5, False)

In [None]:
socios.select('nome_do_socio_ou_razao_social')\
  .filter(socios.nome_do_socio_ou_razao_social.startswith('JOSE'))\
  .filter(socios.nome_do_socio_ou_razao_social.endswith('SILVA'))\
  .limit(10).toPandas()

###O comando LIKE

In [None]:
empresas.select('razao_social_nome_empresarial', 'porte_da_empresa', 'capital_social_da_empresa').filter(f.upper(empresas.razao_social_nome_empresarial).like('%RESTAURANTE%')).show(15)

#Agregações e Junções

###Sumarizando os dados

In [None]:
empresas.select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
  .groupBy('porte_da_empresa')\
  .agg(
      f.avg('capital_social_da_empresa').alias('capital_social_medio'),
      f.count('cnpj_basico').alias('frequencia')
  )\
    .orderBy('porte_da_empresa', ascending=True)\
    .show()

In [None]:
empresas.select('capital_social_da_empresa').summary().show()

In [None]:
df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

###Juntando DataFrames - Joins

In [None]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how = 'inner')

In [None]:
freq = empresas_join.select('cnpj_basico', f.year('data_de_inicio_atividade').alias('data_inicio'))\
  .where('data_inicio >= 2010')\
  .groupBy('data_inicio')\
  .agg(f.count('cnpj_basico').alias('frequencia'))\
  .orderBy('data_inicio', ascending=True)

  freq.toPandas()

In [None]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_inicio'),
        f.sum(freq['frequencia']).alias('frequencia')
    )
).toPandas()

total = freq.select(
        f.lit('Total').alias('data_inicio'),
        f.sum(freq['frequencia']).alias('frequencia')
    )
total.show()

freq.union(total).show()

freq.select(
        f.lit('Total').alias('data_inicio'),
        f.sum(freq['frequencia']).alias('frequencia')
    ).toPandas()

#SparkSQL

In [None]:
empresas.createOrReplaceTempView('empresasView')

spark.sql('SELECT * FROM empresasView').show(5, False)

spark.sql('SELECT * FROM empresasView WHERE capital_social >=50').show(5, False)

spark.sql("""
  SELECT porte_empresa, MEAN (capital_social) AS Media
  FROM empresasView GROUP BY porte_empresa""").show(5, False)

In [None]:
empresas_join.createOrReplaceTempView('empresasJoinView')

frequencia = spark.sql("""
  SELECT YEAR (data_inicio_atividade) AS ano_inicio,
  count(cnpj_basico) AS count
  FROM empresasJoinView
  WHERE YEAR (data_inicio_atividade) >= 2010
  GROUP BY ano_inicio
  ORDER BY ano_inicio """)

In [None]:
frequencia.createOrReplaceTempView('frequenciaView')

frequencia = spark.sql("""
  SELECT YEAR (data_inicio_atividade) AS ano_inicio,
  count(cnpj_basico) AS count
  FROM empresasJoinView
  WHERE YEAR (data_inicio_atividade) >= 2010
  GROUP BY ano_inicio
  ORDER BY ano_inicio """)

In [None]:
frequencia.createOrReplaceTempView('frequenciaView')

frequencia = spark.sql("""
  SELECT * FROM frequenciaView
  UNION ALL
  SELECT 'Total' AS ano_inicio,
  SUM (count) AS count
  FROM frequenciaView""").show(5, False)

#Formas de Armazenamento

###Arquivos CSV

In [None]:
empresas.write.csv(
    path= '/content/drive/MyDrive/curso-spark-alura/empresas/csv',
    mode= 'overwrite',
    sep= ';',
    header= True
)

In [None]:
empresas2 = spark.read.csv('/content/drive/MyDrive/curso-spark-alura/empresas/csv',
                           sep= ';',
                           inferSchema= True,
                           header= True
                           )

###Arquivos PARQUET

In [None]:
empresas.write.parquet(
    path= '/content/drive/MyDrive/curso-spark-alura/empresas/parquet',
    mode= 'overwrite',
)

In [None]:
empresas3 = spark.read.parquet('/content/drive/MyDrive/curso-spark-alura/empresas/parquet')

#Particionamento dos dados

In [None]:
empresas.colesce(1).write.csv(
    path = 'caminho pasta/csv-unico',
    mode = 'overwrite',
    sep = ';',
    header = True
)

In [None]:
empresas.colesce(1).write.parquet(
    path = 'caminho pasta/parquet-partitionBy',
    mode = 'overwrite',
    partitionBy = 'porte_empresa'
    header = True
)

# Fechar a Session


In [None]:
spark.stop()