**Autor:** Yuri Dimitri Ramos Costa

**E-mail:** yuridrcosta@gmail.com

**Início** em 26/08/2022

**Última atualização** em 26/08/2022

# Instalação

Baseado nas informações contidas em https://colab.research.google.com/github/carlosfab/sigmoidal_ai/blob/master/Big_Data_Como_instalar_o_PySpark_no_Google_Colab.ipynb#scrollTo=RkpG11RQPbRf

In [1]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

# Criação de DataFrames

## Criação da [sessão do Spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)

In [3]:
from pyspark.sql import SparkSession
sc = SparkSession.builder \
        .master("local") \
        .appName("intro-pypspark-colab")\
        .getOrCreate()

## Lendo arquivo .csv

É possível ler um arquivo `.csv` utilizando o `.read.format("csv").load(CAMINHO_PARA_ARQUIVO)` ou utilizando o `.read.csv(CAMINHO_PARA_ARQUIVO)`

A lista de opções relacionadas a leitura dos conjuntos de dados está [disponível nesse site](https://dbmstutorials.com/pyspark/spark-read-write-dataframe-options.html).

In [6]:
TRAINING_DATASET_FILEPATH = 'sample_data/california_housing_train.csv'
TEST_DATASET_FILEPATH = 'sample_data/california_housing_test.csv'

In [15]:
train_df = sc.read.option('header',True)\
                .csv(TRAINING_DATASET_FILEPATH)
test_df = sc.read.option('header',True)\
                .csv(TEST_DATASET_FILEPATH)

# Manipulação de DataFrames

## Visualização do DataFrame

In [16]:
train_df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



In [41]:
train_df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [17]:
test_df.show(10)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000|606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000|277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000|495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000| 11.000000|     6.135900|     330000.000000|
|-119.670000|36.330000|         19.000000|1241.000000|    244.000000| 850.000000|237.000000|     2.937500|      81700.

In [18]:
test_df.toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000
1,-118.300000,34.260000,43.000000,1510.000000,310.000000,809.000000,277.000000,3.599000,176500.000000
2,-117.810000,33.780000,27.000000,3589.000000,507.000000,1484.000000,495.000000,5.793400,270500.000000
3,-118.360000,33.820000,28.000000,67.000000,15.000000,49.000000,11.000000,6.135900,330000.000000
4,-119.670000,36.330000,19.000000,1241.000000,244.000000,850.000000,237.000000,2.937500,81700.000000
...,...,...,...,...,...,...,...,...,...
2995,-119.860000,34.420000,23.000000,1450.000000,642.000000,1258.000000,607.000000,1.179000,225000.000000
2996,-118.140000,34.060000,27.000000,5257.000000,1082.000000,3496.000000,1036.000000,3.390600,237200.000000
2997,-119.700000,36.300000,10.000000,956.000000,201.000000,693.000000,220.000000,2.289500,62000.000000
2998,-117.120000,34.100000,40.000000,96.000000,14.000000,46.000000,14.000000,3.270800,162500.000000


Note que a função `.describe()` retorna um DataFrame assim como no Pandas, porém o Spark não mostra automaticamente.

In [43]:
test_df.describe().show()

+-------+-------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|    total_bedrooms|        population|        households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|               3000|              3000|              3000|             3000|              3000|              3000|              3000|              3000|              3000|
|   mean|-119.58920000000029| 35.63538999999999|28.845333333333333|2599.578666666667| 529.9506666666666|1402.7986666666666|           489.912| 3.807271799999998|        205846.275|
| stddev| 1.9949362939550166|2.1296695233438334|12.555395554955757|2155.593331625582|415.654368

## Visualização de coluna

Note que ao acessar simplesmente a coluna, temos um resultado diferente do Pandas

In [19]:
train_df.longitude

Column<b'longitude'>

Podemos visualizar uma coluna como segue

In [21]:
train_df.select(train_df.longitude).show(7)

+-----------+
|  longitude|
+-----------+
|-114.310000|
|-114.470000|
|-114.560000|
|-114.570000|
|-114.570000|
|-114.580000|
|-114.580000|
+-----------+
only showing top 7 rows



## Adicionando coluna

A função `.withColumn()` recebe como parâmetro o nome da coluna a ser criada e uma coluna. 

Lembrando que como `train_df.longitude` retorna a coluna longitude, então a operação `train_df.longitude + 2` retorna todos os elementos da coluna acrescidos de 2, portando uma coluna também.

Para criar uma nova coluna é necessário usar a função [`col()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.col.html#pyspark.sql.functions.col). Para criar uma coluna com um valor fixo é necessário utilizar a função [`lit()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lit.html#pyspark.sql.functions.lit)

Note que a alteração no DataFrame não é feita *inplace*, logo é necessário receber esse novo DataFrame.

In [27]:
from pyspark.sql.functions import lit

In [35]:
new_train_df = train_df.withColumn("Nova coluna",  lit('Valor fixo'))

In [36]:
train_df.show(4)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|      73400.000000|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------

In [38]:
new_train_df.show(4)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-----------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|Nova coluna|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-----------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000| Valor fixo|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000| Valor fixo|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|      85700.000000| Valor fixo|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|      73400.000000| Valor fixo|
+-----------+---------+-----------

## Renomeando coluna

In [39]:
new_train_df = new_train_df.withColumnRenamed("Nova coluna",  'Coluna renomeada')

In [40]:
new_train_df.show(4)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+----------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|Coluna renomeada|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+----------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000|      Valor fixo|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000|      Valor fixo|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|      85700.000000|      Valor fixo|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|      73400.000000|      Valor fixo|

## Filtragem

In [45]:
train_df.filter(train_df.housing_median_age == 40).show(4)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-116.980000|33.930000|         40.000000|2277.000000|    498.000000|1391.000000|453.000000|     1.947200|      73200.000000|
|-117.020000|32.760000|         40.000000|2523.000000|    488.000000| 976.000000|470.000000|     3.110000|     185700.000000|
|-117.070000|32.630000|         40.000000|1706.000000|    322.000000| 796.000000|303.000000|     3.558300|     154900.000000|
|-117.140000|32.700000|         40.000000|1227.000000|    330.000000|1199.000000|316.000000|     1.218800|      92500.000000|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------

## Agrupamentos (Group By)

In [48]:
train_df.groupby('housing_median_age').avg().show()

+------------------+
|housing_median_age|
+------------------+
|          1.000000|
|         37.000000|
|         32.000000|
|         47.000000|
|         45.000000|
|         11.000000|
|         18.000000|
|          3.000000|
|         17.000000|
|         33.000000|
|          4.000000|
|         19.000000|
|         44.000000|
|         39.000000|
|         23.000000|
|         34.000000|
|         25.000000|
|         15.000000|
|         51.000000|
|         22.000000|
+------------------+
only showing top 20 rows



# Salvando DataFrames

Visualizar documentação sobre [entrada e saída de conjuntos de dados](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#Getting-Data-in/out).

## Salvando como Parquet

O formato de arquivos `.parquet` é conhecido por ser mais eficiente para disponibilização e consulta de informações em um conjunto de dados. Mais informações podem ser obtidas no [site oficial](https://parquet.apache.org/).

In [49]:
train_df.write.parquet('training_data.parquet')

In [55]:
parquet_df = sc.read.parquet("training_data.parquet")

In [56]:
parquet_df.show(4)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|      73400.000000|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------

## Salvando como ORC

O formato de arquivos `.orc` é um formato de armazenamento de dados utilizado pelo ecosistema [**Hadoop**](https://hadoop.apache.org/) que tem suporte a transações ACID. Mais informações podem ser obtidas no [site oficial](https://orc.apache.org/).

In [59]:
parquet_df.write.orc('training_data.orc')

In [60]:
orc_df = sc.read.orc('training_data.orc')

In [61]:
orc_df.show(4)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|      73400.000000|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------