In [None]:
import os, sys
from google.colab import drive

drive.mount('/content/drive')
nb_path = '/content/drive/MyDrive/pyspark'

!pip install pyspark

Mounted at /content/drive
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=de0b65b445db5aa96701020cd603951b821c52aaadfa35a0b3cb0a2cf787f1d5
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

empresasColNames = [
    'cnpj_basico',
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'qualificacao_responsavel',
    'capital_social_empresa',
    'porte_empresa',
    'ente_federativo_responsavel'
]

spark = SparkSession.builder.appName("teste-1").getOrCreate()

empresas = spark.read.csv("/content/drive/MyDrive/empresas", sep=";", inferSchema=True)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

for index, item in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f"_c{index}", item)

import random

def gerar_data_aleatoria():
    ano = random.randint(1900, 2100)
    mes = random.randint(1, 12)
    dia = random.randint(1, 30)

    data = f"{ano:04d}{mes:02d}{dia:02d}"

    return data

# Trocando as virgula por pontos
empresas = empresas.withColumn(
    'capital_social_empresa',
    F.regexp_replace('capital_social_empresa', ',', '.')
)

# Cast de string para double
empresas = empresas.withColumn(
    'capital_social_empresa',
    empresas['capital_social_empresa'].cast(DoubleType())
)

udf_data = F.udf(gerar_data_aleatoria, StringType())

#Coluna de datas aleatorias (Em Strings)
empresas = empresas.withColumn("data", udf_data())

# Convertendo de String para data
empresas = empresas.withColumn(
    'data',
     F.to_date(empresas['data'], 'yyyMMdd')
)

In [None]:
#Selects

# Select simples
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'data'
)
#.show(5)

# Select, pegando apenas o ano
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    F.year('data').alias('ano')
)
#.show(5)

# Select com formatação de nomes
empresas.select(
    F.concat_ws(
        ', ',
        F.substring_index('razao_social_nome_empresarial', ' ', -1),
        F.substring_index('razao_social_nome_empresarial', ' ', 1)
    ).alias("nome"),
    'natureza_juridica',
    'data'
)
#.show(5, truncate=False)

In [None]:
# Ordenação

# Dados ordenados , parametro ascending=True|False para
# ordenar forma crescente/descrescente respectivamente
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'data'
).orderBy(
    'natureza_juridica'
)
#.show(5, truncate=False)

# Ordenação multipla
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'data'
).orderBy(
    ['natureza_juridica', 'data'],
    asceding=[False, True]
)
#.show(5, truncate=False)

+---------------------------------------------------------+-----------------+----------+
|razao_social_nome_empresarial                            |natureza_juridica|data      |
+---------------------------------------------------------+-----------------+----------+
|22 BATALHAO DE INFANTARIA                                |1015             |1901-11-16|
|SUPERINTENDENCIA FED. DE AGRICULTURA PEC. ABAST. NO TO   |1015             |1904-07-12|
|COMISSAO REGIONAL DE OBRAS DA 8 REGIAO MILITAR           |1015             |1905-12-29|
|4� BATALHAO DE ENGENHARIA DE COMBATE                     |1015             |1908-08-16|
|CENTRO DE INTENDENCIA TECNOLOGICO DA MARINHA EM SAO PAULO|1015             |1917-05-20|
+---------------------------------------------------------+-----------------+----------+
only showing top 5 rows



In [None]:
# Filters

# Filtrar empresas > maior que ano 1985
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'data'
).filter(F.year(empresas.data) > 1985)
#.show(5, truncate=False)

#Filtrando os lucas
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'data'
).filter(F.upper('razao_social_nome_empresarial').like("%LUCAS%"))
#.show(5, truncate=False)

#groupsID
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'data'
).where('natureza_juridica < 2000')\
#.groupBy('natureza_juridica')\
#.count()\
#.show(truncate=False)

# Agregações
empresas.select(
    'razao_social_nome_empresarial',
    'natureza_juridica',
    'data'
)\
.groupBy('data')\
#.agg(
#    F.avg('natureza_juridica'),
#    F.count('data')
#)\
#.show(truncate=False)

#Sumario
empresas\
.select('natureza_juridica')\
#.summary()\
#.show()

# When e Otherwise para refatorar colunas
empresas.withColumn(
    'status_data',
    F.when(F.year('data') > 2000, "Ano Maior que 2000").otherwise("Ano Menor que 2000")
).show()



+-----------+-----------------------------+-----------------+------------------------+----------------------+-------------+---------------------------+----------+------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_responsavel|capital_social_empresa|porte_empresa|ente_federativo_responsavel|      data|       status_data|
+-----------+-----------------------------+-----------------+------------------------+----------------------+-------------+---------------------------+----------+------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                      49|                   0.0|            1|                       NULL|2047-09-25|Ano Maior que 2000|
|       1355|         BRASILEIRO & OLIV...|             2062|                      49|                   0.0|            5|                       NULL|2055-09-20|Ano Maior que 2000|
|       4820|         REGISTRO DE IMOVE...|             3034|                      32|    

In [None]:
# Join

'''
 Este código combina informações de produtos
 e impostos com base na categoria dos produtos,
 mostrando os produtos e suas respectivas taxas de imposto.
'''
'''
 O argumento how está relacionado a como faremos essa junção,
 e existem quatro possibilidades principais: inner, left, right e outer.
 O argumento inner fará a interseção entre os dois conjuntos,
 e somente nos retornará os valores de cat que existem em ambos os dataframes.
'''

produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'),
        ('2', 'Limpeza', 'Sabão em pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

produtos = produtos.join(impostos, 'cat', how='inner')\

produtos\
.sort('id')\
.groupBy('tax')\
.count().alias("Frequencia")\
.union(
    produtos.select(
        F.lit("Total"),
        F.sum('tax')
    )
)\
.show()

+-----+-----+
|  tax|count|
+-----+-----+
| 0.05|  1.0|
|0.065|  1.0|
| 0.15|  2.0|
|Total|0.415|
+-----+-----+



In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_responsavel: integer (nullable = true)
 |-- capital_social_empresa: double (nullable = true)
 |-- porte_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)
 |-- data: date (nullable = true)



In [None]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_responsavel,capital_social_empresa,porte_empresa,ente_federativo_responsavel,data
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,,2074-06-15
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,,1969-10-18
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,,1955-04-05
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,,2004-01-30
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,,2097-03-25
