# Apache Spark Challenges
Author: Prof. Barbosa<br>
Contact: infobarbosa@gmail.com<br>
Github: [infobarbosa](https://github.com/infobarbosa)

In [2]:
! pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [3]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkSQL") \
    .config("spark.sql.files.maxPartitionBytes", "1048576") \
    .config("dfs.client.read.shortcircuit.skip.checksum", "true") \
    .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/06/23 20:26:12 WARN Utils: Your hostname, brubeck resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/06/23 20:26:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/23 20:26:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/23 20:26:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### O schema

In [9]:
schema = StructType([ \
    StructField("ID", LongType(), True), \
    StructField("CPF", StringType(), True),\
    StructField("NOME", StringType(), True),\
    StructField("MUNICIPIO", StringType(), True),\
    StructField("UF", StringType(), True)])

### O dataframe

In [16]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ";") \
    .schema(schema) \
    .load("../../../assets/data/pessoas/pessoas.csv")


In [17]:
df.printSchema()

root
 |-- ID: long (nullable = true)
 |-- CPF: string (nullable = true)
 |-- NOME: string (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- UF: string (nullable = true)



In [18]:
df.show()

+-----------+--------------+--------------------+-------------------+---+
|         ID|           CPF|                NOME|          MUNICIPIO| UF|
+-----------+--------------+--------------------+-------------------+---+
|20128077306|***.081.588-**|ADRIANE DE JESUS ...|ITAPIRAPUA PAULISTA| SP|
|16358953061|***.086.753-**|MARIA DO ROSARIO ...|        MORRO AGUDO| SP|
|13622350775|***.523.498-**|      ADRIANA SANTOS|          SAO PAULO| SP|
|20732189858|***.330.168-**|DANIELA OLIVEIRA ...|          SAO PAULO| SP|
|20789861660|***.575.228-**|MARCILENE DE SOUZ...|          SAO PAULO| SP|
|16076155060|***.835.265-**|TATIANE ALVES SANTOS|          SAO PAULO| SP|
|12102260099|***.615.358-**|MARCIA CRISTINA M...|      CARAGUATATUBA| SP|
|20128077306|***.081.588-**|ADRIANE DE JESUS ...|ITAPIRAPUA PAULISTA| SP|
|16358953061|***.086.753-**|MARIA DO ROSARIO ...|        MORRO AGUDO| SP|
|20030724885|***.799.568-**|EDNA FERREIRA DOS...|             OSASCO| SP|
|13622350775|***.523.498-**|      ADRI

In [20]:
df.explain(extended=True)

== Parsed Logical Plan ==
Relation [ID#136L,CPF#137,NOME#138,MUNICIPIO#139,UF#140] csv

== Analyzed Logical Plan ==
ID: bigint, CPF: string, NOME: string, MUNICIPIO: string, UF: string
Relation [ID#136L,CPF#137,NOME#138,MUNICIPIO#139,UF#140] csv

== Optimized Logical Plan ==
Relation [ID#136L,CPF#137,NOME#138,MUNICIPIO#139,UF#140] csv

== Physical Plan ==
FileScan csv [ID#136L,CPF#137,NOME#138,MUNICIPIO#139,UF#140] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/barbosa/labs/apache-spark-challenges/assets/data/pessoas/pe..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ID:bigint,CPF:string,NOME:string,MUNICIPIO:string,UF:string>



In [21]:
df.rdd.getNumPartitions()

1

### Contagem de registros

In [19]:
print("Quantidade de registros:" + str(df.count()) + ", distintos: " + str(df.distinct().count()))

24/06/23 20:40:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


Quantidade de registros:2458075, distintos: 2449428


                                                                                

### Escrevendo o arquivo parquet

In [22]:
pasta_destino="./output/clientes/cliente.parquet"

df.write.parquet(pasta_destino, mode="overwrite")

                                                                                