# Input/Output no PySpark

## Imports

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/03 22:32:54 WARN Utils: Your hostname, MacBook-Air-de-Vitor.local, resolves to a loopback address: 127.0.0.1; using 192.168.3.49 instead (on interface en0)
26/02/03 22:32:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/03 22:32:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/03 22:32:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Lendo CSV

In [19]:
# Vamos usar um dataset menor para testar todas as configurações
# No fim iremos aplicar todos os conhecimentos adquiridos em um dataset da Olist
playground_path = "data/playground/notas.csv"

df = spark.read.csv(path=playground_path)

In [20]:
df.show()

+--------------------+
|                 _c0|
+--------------------+
|nome;nascimento;nota|
|  João;20/05/1998;10|
|  Ana;19/05/2000;9.5|
| Maria;01/04/1986;-1|
|Fabiana;20/05/1973;6|
|   Jose;16/08/2005;9|
|  Vitor;20/05/1996;8|
+--------------------+



### Parametro de leitura 

In [21]:
# Definindo que a primeira linha do CSV é o header e o separador é ;
spark.read.csv(playground_path, header=True, sep=';').show()

+-------+----------+----+
|   nome|nascimento|nota|
+-------+----------+----+
|   João|20/05/1998|  10|
|    Ana|19/05/2000| 9.5|
|  Maria|01/04/1986|  -1|
|Fabiana|20/05/1973|   6|
|   Jose|16/08/2005|   9|
|  Vitor|20/05/1996|   8|
+-------+----------+----+



In [22]:
# Também podemos definir se um caracter é a representação de null

df = spark.read.csv(
    path=playground_path,
    header=True,
    sep=';',
    nullValue="-1",
)

df.show()

+-------+----------+----+
|   nome|nascimento|nota|
+-------+----------+----+
|   João|20/05/1998|  10|
|    Ana|19/05/2000| 9.5|
|  Maria|01/04/1986|NULL|
|Fabiana|20/05/1973|   6|
|   Jose|16/08/2005|   9|
|  Vitor|20/05/1996|   8|
+-------+----------+----+



In [23]:
# Mas o tipo dos dados estão corretos?

df.printSchema()

root
 |-- nome: string (nullable = true)
 |-- nascimento: string (nullable = true)
 |-- nota: string (nullable = true)



In [24]:
# Formas de corrigir esse problema com CSV

# 1. inferSchema + Date Formate
spark.read.csv(
    path=playground_path,
    header=True,
    sep=';',
    nullValue="-1",
    inferSchema=True,
).printSchema()


spark.read.csv(
    path=playground_path,
    header=True,
    sep=';',
    nullValue="-1",
    inferSchema=True,
    dateFormat="dd/MM/yyyy" #Add this line
).printSchema()

root
 |-- nome: string (nullable = true)
 |-- nascimento: string (nullable = true)
 |-- nota: double (nullable = true)

root
 |-- nome: string (nullable = true)
 |-- nascimento: date (nullable = true)
 |-- nota: double (nullable = true)



## Lendo Json
O padrão nativo do Spark/Big Data é o JSON Lines, onde não usa vírgulas separando as linhas e sem colchetes de array envolvendo o arquivo todo. [Documentação](https://jsonlines.org/examples/)

Exemplo
```
  {"nome": "João", "nascimento": "20/05/1998", "nota": 10}
  {"nome": "Ana", "nascimento": "19/05/2000", "nota": 9.5}
```

Como estamos utilizando o formato padrão de json precisamos da opção `multiLine=True`

In [31]:
json_path = "data/playground/notas.json"

df_lines = spark.read.json(
  json_path,
  multiLine=True,
  dateFormat="dd/MM/yyyy"
)
df_lines.show()
df_lines.printSchema()

+----------+-------+----+
|nascimento|   nome|nota|
+----------+-------+----+
|20/05/1998|   João|10.0|
|19/05/2000|    Ana| 9.5|
|01/04/1986|  Maria|-1.0|
|20/05/1973|Fabiana| 6.0|
|16/08/2005|   Jose| 9.0|
|20/05/1996|  Vitor| 8.0|
+----------+-------+----+

root
 |-- nascimento: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- nota: double (nullable = true)



### Especificando Schema

A forma garantir que os tipos estão corretos é definir o schema manualmente.

In [32]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    DateType
)

schema = StructType([
    StructField("nome", StringType(), True),
    StructField("nascimento", DateType(), True),
    StructField("nota", DoubleType(), True)
])

df_schema = spark.read.json(
    json_path,
    schema=schema,
    multiLine=True,
    dateFormat="dd/MM/yyyy"
)

df_schema.printSchema()
df_schema.show()

root
 |-- nome: string (nullable = true)
 |-- nascimento: date (nullable = true)
 |-- nota: double (nullable = true)

+-------+----------+----+
|   nome|nascimento|nota|
+-------+----------+----+
|   João|1998-05-20|10.0|
|    Ana|2000-05-19| 9.5|
|  Maria|1986-04-01|-1.0|
|Fabiana|1973-05-20| 6.0|
|   Jose|2005-08-16| 9.0|
|  Vitor|1996-05-20| 8.0|
+-------+----------+----+



## Read Parquet

In [34]:
# Parquet é um formato que compacta melhor os nossos dados

import os

file_parquet = 'data/playground/olist_order_items_dataset.parquet'
file_csv = 'data/raw/olist_order_items_dataset.csv'

size1 = os.path.getsize(file_parquet) / (1024 * 1024)
size2 = os.path.getsize(file_csv) / (1024 * 1024)

print(f"Tamanho Parquet: {size1:.2f} MB")
print(f"Tamanho CSV: {size2:.2f} MB")

Tamanho Parquet: 6.44 MB
Tamanho CSV: 14.72 MB


In [35]:
parquet_path = 'data/playground/olist_order_items_dataset.parquet'

df = spark.read.parquet(parquet_path)

df.show(5)

df.printSchema()

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

                                                                                

### Multi Files

Trabalhando com spark vai ser comum vcs receberem tabelas divididas em varios arquivos.

Isso acontece porque o Spark é um sistema distribuído. Vários executores podem estar escrevendo partes do arquivo simultaneamente. Ao final, teremos vários arquivos `part-00000`, `part-00001`, etc., dentro da pasta de destino.

Exemplo
```
dataset.parquet/ --pasta
 ├── part-00000.parquet --arquivos
 ├── part-00001.parquet
 └── part-00002.parquet
```

In [36]:
# Ler um arquivo ou multiplos arquivos é feito da mesma forma

multi_file_path = "data/playground/olist_order__multi_files"
df = spark.read.parquet(multi_file_path)

df.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|304dfaa2530a17749...|            1|0aed9d548c366ad80...|643214e62b870443c...|2018-05-18 15:59:46|  43.8|         7.39|
|3948c2f40e71b2178...|            1|887dba291adc295b5...|98dac6635aee4995d...|2017-05-11 16:35:11|  26.4|         14.1|
|274954faa1775d863...|            1|96277485af1fa20c3...|b499c00f28f4b7069...|2017-12-04 12:53:20| 59.99|        15.17|
|43ee3ed7fa79e982f...|            1|e67307ff0f15ade43...|f4aba7c0bca51484c...|2017-06-02 17:30:19| 18.99|         15.1|
|24508eba7537515f4...|            1|29b85c2f1ec5f4d02...|44073f8b7e41514de...|2017-04-26 21:35:19| 186.9|        21.36|
|3addeedb2235eeb1e...|            1|58be

## Salvando Arquivos

Uma característica importante do Spark é que, ao salvar dados (seja CSV, Parquet, JSON, etc.), ele **sempre cria uma pasta** e não um arquivo único.

In [34]:
# Criando um DataFrame de exemplo
data = [
    ("João", 25, "RJ"),
    ("Maria", 30, "SP"),
    ("Pedro", 22, "MG"),
    ("Ana", 28, "RJ"),
    ("Carlos", 35, "SP")
]
columns = ["Nome", "Idade", "Estado"]

df = spark.createDataFrame(data, columns)
df.show()

+------+-----+------+
|  Nome|Idade|Estado|
+------+-----+------+
|  João|   25|    RJ|
| Maria|   30|    SP|
| Pedro|   22|    MG|
|   Ana|   28|    RJ|
|Carlos|   35|    SP|
+------+-----+------+



### Salvando em CSV
As configurações usadas no read também podem ser utilizadas no write

In [None]:
df.write.csv(
    "data/playground/write/output_csv",
    header=True,
    sep=";"
)

#### Modo de escrita
Se a gente rerodar o comando superior, ele vai retornar um erro, pq o arquivo já existe. Para resolver isso, temos que definir que podemos sobreescrever usando o modo de escrita (mode)

In [None]:
df.write.mode("overwrite").csv(
    "data/playground/write/output_csv",
    header=True,
    sep=";"
)

### Salvando em Parquet

In [None]:
df.write.mode("overwrite").parquet("data/playground/write/output_parquet")

### Reparticionamento (Repartition)

Como mencionado, o Spark salva os dados em vários arquivos (partições). Podemos controlar quantos arquivos serão gerados usando `repartition()` ou `coalesce()`.

- `repartition(N)`: Aumenta ou diminui o número de partições (faz um shuffle completo dos dados).
- `coalesce(N)`: Diminui o número de partições (mais eficiente que repartition para reduzir, pois evita shuffle total).

In [None]:
(
    df.coalesce(1)
    .write
    .mode("overwrite")
    .parquet("data/playground/write/output_parquet_one_file")
)

## Aplicando no dataset de review da Olist

In [41]:
from pyspark.sql.types import *

schema = StructType([
    StructField("review_id", StringType()),
    StructField("order_id", StringType()),
    StructField("review_score", IntegerType()),
    StructField("review_comment_title", StringType()),
    StructField("review_comment_message", StringType()),
    StructField("review_creation_date", TimestampType()),
    StructField("review_answer_timestamp", TimestampType()) 
])

olist_path = "data/raw/olist_order_reviews_dataset.csv"

df = spark.read.csv(
    path=olist_path,
    header=True,
    sep=',',
    schema=schema
)

In [42]:
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



In [43]:
df.write.mode("overwrite").parquet("data/processed/olist_order_reviews_dataset")