# Spark 101 - Data Manipulation using Spark

Spark é um sistema de processamento de arquivos que trabalha com computação distribuída. Tem ambiente de execução em memória, que o torna mais rápido. Fácil utilização. Usa RDDs - resilient distributed datasets, que permite operações em paralelo. Tem suporte a várias linguagens e diversas bibliotecas integradas pra ML, streaming, gráficos e SQL. Tem tb otimizações em execução de consultas, planejamento automático de tarefas, particionamento de dados, cache em memória.

## 1. Running Spark in Colab

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=0f9e61967d5773d8f77cf0c4d19291aac67fe7f2589628e61525e9131056e96f
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()  #import e início da spark session

In [4]:
# Check if spark is initialized
df = spark.sql('''select 'Sucesso total, estamos online!' as hello''')
df.show()

+--------------------+
|               hello|
+--------------------+
|Sucesso total, es...|
+--------------------+



In [5]:
# Import spark libraries
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F # for more sql functions
from functools import reduce
#DÚVIDA: PQ USAR SPARK SQL? PODERIA USAR OUTRA LGG? TERIA QUE BAIXAR OUTRAS BIBLIOTECAS CORRESPONDENTES À LGG ESCOLHIDA?

## 2. Data manipulation using spark

In [7]:
df = spark.read.csv('aluguel.csv', sep = ';', inferSchema = True, header = True)

print('Linhas  :', df.count())
print('Total colunas :', len(df.columns))
print('Colunas:', df.columns)

Linhas  : 32960
Total colunas : 9
Colunas: ['Tipo', 'Bairro', 'Quartos', 'Vagas', 'Suites', 'Area', 'Valor', 'Condominio', 'IPTU']


In [9]:
df.createOrReplaceTempView("aluguel")

df_check = spark.sql('''select `Tipo`, Bairro, `Valor` from aluguel''')
df_check.show(10, truncate=False)
#DÚVIDA: O QUE É A TEMPVIEW? OU MELHOR, PRA QUE ELA SERVE?

+-----------------------+---------------+-----+
|Tipo                   |Bairro         |Valor|
+-----------------------+---------------+-----+
|Quitinete              |Copacabana     |1700 |
|Casa                   |Jardim Botânico|7000 |
|Conjunto Comercial/Sala|Barra da Tijuca|5200 |
|Apartamento            |Centro         |800  |
|Apartamento            |Higienópolis   |800  |
|Apartamento            |Vista Alegre   |1200 |
|Apartamento            |Cachambi       |1300 |
|Casa de Condomínio     |Barra da Tijuca|22000|
|Casa de Condomínio     |Ramos          |1000 |
|Conjunto Comercial/Sala|Centro         |35000|
+-----------------------+---------------+-----+
only showing top 10 rows



## 3. Dataframe Basic Operations

In [10]:
#describe do df inteiro
df.describe().show()

+-------+--------------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|          Tipo|    Bairro|           Quartos|             Vagas|            Suites|              Area|             Valor|        Condominio|              IPTU|
+-------+--------------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|         32960|     32960|             32960|             32960|             32960|             32960|             32943|             28867|             22723|
|   mean|          null|      null|1.7713895631067962|1.7490594660194174|0.6657766990291262| 231.9015473300971|12952.659715265761|2388.0624242214294|2364.4095850019803|
| stddev|          null|      null|1.7178687849639518|20.380401505419783|1.1765247843440245|1135.2541521229266| 667521.9694189351| 39184.95961454535| 17956

In [12]:
#describe de colunas individuais
df.describe('Vagas', 'Suites').show()

+-------+------------------+------------------+
|summary|             Vagas|            Suites|
+-------+------------------+------------------+
|  count|             32960|             32960|
|   mean|1.7490594660194174|0.6657766990291262|
| stddev|20.380401505419783|1.1765247843440245|
|    min|                 0|                 0|
|    max|              1966|                70|
+-------+------------------+------------------+



In [13]:
#printar algumas infos
print('df.count		:', df.count())
print('df.columns	:', df.columns)
print('df dtypes	:', df.dtypes)
print('df schema 1:', df.schema)

df.count		: 32960
df.columns	: ['Tipo', 'Bairro', 'Quartos', 'Vagas', 'Suites', 'Area', 'Valor', 'Condominio', 'IPTU']
df dtypes	: [('Tipo', 'string'), ('Bairro', 'string'), ('Quartos', 'int'), ('Vagas', 'int'), ('Suites', 'int'), ('Area', 'int'), ('Valor', 'int'), ('Condominio', 'int'), ('IPTU', 'int')]
df schema 1: StructType([StructField('Tipo', StringType(), True), StructField('Bairro', StringType(), True), StructField('Quartos', IntegerType(), True), StructField('Vagas', IntegerType(), True), StructField('Suites', IntegerType(), True), StructField('Area', IntegerType(), True), StructField('Valor', IntegerType(), True), StructField('Condominio', IntegerType(), True), StructField('IPTU', IntegerType(), True)])


In [14]:
#printar Schema
print('df schema 1:')
df.printSchema()

df schema 1:
root
 |-- Tipo: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Quartos: integer (nullable = true)
 |-- Vagas: integer (nullable = true)
 |-- Suites: integer (nullable = true)
 |-- Area: integer (nullable = true)
 |-- Valor: integer (nullable = true)
 |-- Condominio: integer (nullable = true)
 |-- IPTU: integer (nullable = true)



In [15]:
#dropar duplicados
df = df.dropDuplicates()
print('df.count		:', df.count())
print('df.columns	:', df.columns)

df.count		: 31800
df.columns	: ['Tipo', 'Bairro', 'Quartos', 'Vagas', 'Suites', 'Area', 'Valor', 'Condominio', 'IPTU']


In [17]:
#selecionar algumas colunas
df2 = df.select(*['Tipo', 'Bairro'])
df2.show()
#DÚVIDA: esse select * é SQL?

+--------------------+--------------------+
|                Tipo|              Bairro|
+--------------------+--------------------+
|Conjunto Comercia...|             Ipanema|
|      Casa Comercial|              Olaria|
|         Apartamento|             Ipanema|
|Conjunto Comercia...|Recreio dos Bande...|
|Loja Shopping/ Ct...|Recreio dos Bande...|
|         Apartamento|              Centro|
|         Apartamento|         São Conrado|
|         Apartamento|          Copacabana|
|         Apartamento|              Leblon|
|         Apartamento|          Copacabana|
|         Apartamento|              Leblon|
|Conjunto Comercia...|     Barra da Tijuca|
|         Apartamento|     Barra da Tijuca|
|         Apartamento|            Botafogo|
|         Apartamento|          Praça Seca|
|         Apartamento|             Ipanema|
|         Apartamento|            Botafogo|
|Conjunto Comercia...|              Centro|
|Galpão/Depósito/A...|            Cachambi|
|Conjunto Comercia...|          

In [18]:
#outra forma de selecionar colunas
col_l = list(set(df.columns)  - {'Tipo','Bairro'})
df2 = df.select(*col_l)
df2.show(2)

+------+----------+-------+-----+----+----+-----+
|Suites|Condominio|Quartos|Vagas|Area|IPTU|Valor|
+------+----------+-------+-----+----+----+-----+
|     0|      4680|      0|    5| 200|null|25200|
|     0|      null|      0|    1| 100|null| 2000|
+------+----------+-------+-----+----+----+-----+
only showing top 2 rows



In [19]:
#renomear colunas
df2 = df \
  .withColumnRenamed('Tipo'            , 'tipo') \
  .withColumnRenamed('Bairro', 'bairro') \
  .withColumnRenamed('Suites'         , 'suites') \
  .withColumnRenamed('Condominio'                   , 'condominio') \
  .withColumnRenamed('Vagas'                 , 'vagas') #\

df2.show(2)

+--------------------+-------+-------+-----+------+----+-----+----------+----+
|                tipo| bairro|Quartos|vagas|suites|Area|Valor|condominio|IPTU|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|null|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
only showing top 2 rows



In [20]:
#renomear colunas com loop
rename_expr = [col(column).alias(column.replace(' ', '_')) for column in df.columns]

df2 = df.select(*rename_expr)
df2.show(2)

+--------------------+-------+-------+-----+------+----+-----+----------+----+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|null|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
only showing top 2 rows



In [21]:
#add colunas baseada em outra coluna
df2 = df.withColumn('localizacao', col('bairro'))
df2.show(2)
#DÚVIDA: A COLUNA QUE EU ADD NÃO MOSTRA NA VISUALIZAÇÃO SEGUINTE (CÓDIGO SEGUINTE). NÃO FICA SALVO?

+--------------------+-------+-------+-----+------+----+-----+----------+----+-----------+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|localizacao|
+--------------------+-------+-------+-----+------+----+-----+----------+----+-----------+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|    Ipanema|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|null|     Olaria|
+--------------------+-------+-------+-----+------+----+-----+----------+----+-----------+
only showing top 2 rows



In [22]:
#add coluna constante
df2 = df.withColumn('país', lit('BR'))
df2.show(2)
#DÚVIDA: A COLUNA QUE EU ADD NÃO MOSTRA NA VISUALIZAÇÃO SEGUINTE (CÓDIGO SEGUINTE). NÃO FICA SALVO?

+--------------------+-------+-------+-----+------+----+-----+----------+----+----+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|país|
+--------------------+-------+-------+-----+------+----+-----+----------+----+----+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|  BR|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|null|  BR|
+--------------------+-------+-------+-----+------+----+-----+----------+----+----+
only showing top 2 rows



In [23]:
#dropar coluna
df2 = df.drop('localizacao')
df2.show(2)
#DÚVIDA: O DROP FOI SALVO, PQ O ADD NÃO FOI?

+--------------------+-------+-------+-----+------+----+-----+----------+----+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|null|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
only showing top 2 rows



In [25]:
#dropar várias colunas
df2 = df.drop(*['IPTU','país'])
df2.show(2)

+--------------------+-------+-------+-----+------+----+-----+----------+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|
+--------------------+-------+-------+-----+------+----+-----+----------+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|
+--------------------+-------+-------+-----+------+----+-----+----------+
only showing top 2 rows



In [26]:
#outra forma de dropar várias colunas
df2 = reduce(DataFrame.drop, ['Area','Quartos'], df)
df2.show(2)

+--------------------+-------+-----+------+-----+----------+----+
|                Tipo| Bairro|Vagas|Suites|Valor|Condominio|IPTU|
+--------------------+-------+-----+------+-----+----------+----+
|Conjunto Comercia...|Ipanema|    5|     0|25200|      4680|null|
|      Casa Comercial| Olaria|    1|     0| 2000|      null|null|
+--------------------+-------+-----+------+-----+----------+----+
only showing top 2 rows



In [28]:
#filtrar dados
# Equal to values
df2 = df.where(df['Bairro'] == 'Ipanema')

# Between values
df3 = df.where(df['Condominio'].between('500','2000'))

# Is inside multiple values
df4 = df.where(df['Tipo'].isin('Apartamento','Casa'))

print('df.count  :', df.count())
print('df2.count :', df2.count())
print('df3.count :', df3.count())
print('df4.count :', df4.count())
#DÚVIDA: ISSO É SQL TB, CERTO?

df.count  : 31800
df2.count : 2092
df3.count : 16919
df4.count : 19791


In [30]:
#filtrar dados
#na célula de cima só fizemos os counts por condição, agora mostramos os resultados do primeiro count
df2 = df.where(df['Bairro'] == 'Ipanema')
df2.show(2)

+--------------------+-------+-------+-----+------+----+-----+----------+----+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|
|         Apartamento|Ipanema|      3|    1|     1| 120| 6500|      1500| 450|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
only showing top 2 rows



In [33]:
#filtrar dados
df2 = df.where((df['Bairro'] == 'Ipanema') & (df['Suites'] == 2))
print('df2.count :', df2.count())
df2.show(3)

df2.count : 304
+-----------+-------+-------+-----+------+----+-----+----------+----+
|       Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|
+-----------+-------+-------+-----+------+----+-----+----------+----+
|Apartamento|Ipanema|      2|    1|     2|  90|10000|      3687| 475|
|Apartamento|Ipanema|      2|    2|     2|  90|12000|      3900|null|
|Apartamento|Ipanema|      3|    1|     2|  85| 4500|      1200|null|
+-----------+-------+-------+-----+------+----+-----+----------+----+
only showing top 3 rows



In [None]:
#cast datatypes
#print('='*50)
#print('Pre cast')
#print(df.printSchema())

#df2 = df \
#.withColumn('CERT_str1', df['CERT'].cast('string')) \
#.withColumn('CERT_str2', df['CERT'].cast(StringType())) #\

#print('='*50)
#print('Post cast')
#print(df2.printSchema())

In [34]:
#Replace values
# Pre replace
df.show(2)

# Post replace
print('Replace 7 in the above dataframe with 17 at all instances')
df.na.replace(7,17).show(2)

+--------------------+-------+-------+-----+------+----+-----+----------+----+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|null|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
only showing top 2 rows

Replace 7 in the above dataframe with 17 at all instances
+--------------------+-------+-------+-----+------+----+-----+----------+----+
|                Tipo| Bairro|Quartos|Vagas|Suites|Area|Valor|Condominio|IPTU|
+--------------------+-------+-------+-----+------+----+-----+----------+----+
|Conjunto Comercia...|Ipanema|      0|    5|     0| 200|25200|      4680|null|
|      Casa Comercial| Olaria|      0|    1|     0| 100| 2000|      null|null|
+--------------------+-------+-------+-----+----