In [2]:
import pyspark
from   pyspark.sql import SparkSession, dataframe
from   pyspark.sql import functions as F

### Criando Seção Spark

In [3]:
spark:SparkSession = (
    SparkSession.builder.master("local")
    .appName('AprendendoSpark')             
    .getOrCreate()
    )
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/20 18:35:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Lendo Csv

In [4]:
data:dataframe = (
    spark.read
    .format("csv")
    .option('header',"True")
    .option('inferschema','True')
    .option("delimiter",';')
    .load('dataset/arquivo_geral.csv')
    )

In [5]:
data.show(3)

+------+------+----------+----------+---------------+-----------+----------------+
|regiao|estado|      data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados|
+------+------+----------+----------+---------------+-----------+----------------+
| Norte|    RO|2020-01-30|         0|              0|          0|               0|
| Norte|    RO|2020-01-31|         0|              0|          0|               0|
| Norte|    RO|2020-02-01|         0|              0|          0|               0|
+------+------+----------+----------+---------------+-----------+----------------+
only showing top 3 rows



In [6]:
#### me informa o tipo das colunas
data.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- data: date (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- casosAcumulados: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- obitosAcumulados: integer (nullable = true)



#### select

In [7]:
data.select('casosAcumulados','regiao','data').show(3)

+---------------+------+----------+
|casosAcumulados|regiao|      data|
+---------------+------+----------+
|              0| Norte|2020-01-30|
|              0| Norte|2020-01-31|
|              0| Norte|2020-02-01|
+---------------+------+----------+
only showing top 3 rows



### filter

In [8]:
data.select('casosAcumulados','regiao','data')\
    .filter(
            (F.col("regiao")=="Norte") &
            (F.col('casosAcumulados') > 1)
    ).show(3)

+---------------+------+----------+
|casosAcumulados|regiao|      data|
+---------------+------+----------+
|              3| Norte|2020-03-22|
|              3| Norte|2020-03-23|
|              3| Norte|2020-03-24|
+---------------+------+----------+
only showing top 3 rows



In [9]:
data.select('casosAcumulados','regiao','data')\
    .filter(data["regiao"] == "Norte").show(3)

+---------------+------+----------+
|casosAcumulados|regiao|      data|
+---------------+------+----------+
|              0| Norte|2020-01-30|
|              0| Norte|2020-01-31|
|              0| Norte|2020-02-01|
+---------------+------+----------+
only showing top 3 rows



In [10]:
# melhor usar essa opção
data.select('casosAcumulados','regiao','data')\
    .filter("regiao = 'Norte'").show(3)

+---------------+------+----------+
|casosAcumulados|regiao|      data|
+---------------+------+----------+
|              0| Norte|2020-01-30|
|              0| Norte|2020-01-31|
|              0| Norte|2020-02-01|
+---------------+------+----------+
only showing top 3 rows



> Depende de qual
>    * mais pratico

In [11]:
# melhor usar essa opção
data.select('casosAcumulados','regiao','data')\
    .filter("regiao like 'S%' and casosAcumulados > 1 ").show(10)

+---------------+-------+----------+
|casosAcumulados| regiao|      data|
+---------------+-------+----------+
|              2|Sudeste|2020-03-13|
|              2|Sudeste|2020-03-14|
|              2|Sudeste|2020-03-15|
|              5|Sudeste|2020-03-16|
|              7|Sudeste|2020-03-17|
|             15|Sudeste|2020-03-18|
|             29|Sudeste|2020-03-19|
|             35|Sudeste|2020-03-20|
|             38|Sudeste|2020-03-21|
|             83|Sudeste|2020-03-22|
+---------------+-------+----------+
only showing top 10 rows



In [12]:
data.filter(
    (F.col('regiao').isin(*['Sul','Sudeste'])) & (F.col("estado")== 'MG')).show(4)

+-------+------+----------+----------+---------------+-----------+----------------+
| regiao|estado|      data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados|
+-------+------+----------+----------+---------------+-----------+----------------+
|Sudeste|    MG|2020-01-30|         0|              0|          0|               0|
|Sudeste|    MG|2020-01-31|         0|              0|          0|               0|
|Sudeste|    MG|2020-02-01|         0|              0|          0|               0|
|Sudeste|    MG|2020-02-02|         0|              0|          0|               0|
+-------+------+----------+----------+---------------+-----------+----------------+
only showing top 4 rows



In [13]:
data.filter(
    (F.col('regiao').isin(*['Sul','Sudeste'])) & (F.col("estado").like('M%'))).show(4)

+-------+------+----------+----------+---------------+-----------+----------------+
| regiao|estado|      data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados|
+-------+------+----------+----------+---------------+-----------+----------------+
|Sudeste|    MG|2020-01-30|         0|              0|          0|               0|
|Sudeste|    MG|2020-01-31|         0|              0|          0|               0|
|Sudeste|    MG|2020-02-01|         0|              0|          0|               0|
|Sudeste|    MG|2020-02-02|         0|              0|          0|               0|
+-------+------+----------+----------+---------------+-----------+----------------+
only showing top 4 rows



In [14]:
data.count()

2349

&nbsp;

#### Adicionar novas colunas

In [15]:
# adcionado nova coluna
data.select('*').withColumn('Total',
        F.col('casosNovos')+F.col('casosAcumulados')
    ).show(3)

+------+------+----------+----------+---------------+-----------+----------------+-----+
|regiao|estado|      data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados|Total|
+------+------+----------+----------+---------------+-----------+----------------+-----+
| Norte|    RO|2020-01-30|         0|              0|          0|               0|    0|
| Norte|    RO|2020-01-31|         0|              0|          0|               0|    0|
| Norte|    RO|2020-02-01|         0|              0|          0|               0|    0|
+------+------+----------+----------+---------------+-----------+----------------+-----+
only showing top 3 rows



In [16]:
data.withColumn('Status',
                F.when(F.col('casosNovos') > 0, 'Tem Novos Casos')\
                .otherwise('Não tem')).show(3)

+------+------+----------+----------+---------------+-----------+----------------+-------+
|regiao|estado|      data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados| Status|
+------+------+----------+----------+---------------+-----------+----------------+-------+
| Norte|    RO|2020-01-30|         0|              0|          0|               0|Não tem|
| Norte|    RO|2020-01-31|         0|              0|          0|               0|Não tem|
| Norte|    RO|2020-02-01|         0|              0|          0|               0|Não tem|
+------+------+----------+----------+---------------+-----------+----------------+-------+
only showing top 3 rows



In [17]:
data.select('data').\
     withColumn('dia',F.substring('data',9,1).cast('integer'))\
    .withColumn('mes',F.substring('data',6,2).cast('string'))\
    .withColumn('ano',F.substring('data',1,4).cast('integer')).show(2)

+----------+---+---+----+
|      data|dia|mes| ano|
+----------+---+---+----+
|2020-01-30|  3| 01|2020|
|2020-01-31|  3| 01|2020|
+----------+---+---+----+
only showing top 2 rows



In [18]:
data:dataframe = (
    spark.read
    .format("csv")
    .option('header',"False")
    .option('inferschema','True')
    .option("delimiter",';')
    .load('dataset/arquivo_geral.csv')
    )

In [19]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [20]:
data.head()

Row(_c0='regiao', _c1='estado', _c2='data', _c3='casosNovos', _c4='casosAcumulados', _c5='obitosNovos', _c6='obitosAcumulados')

In [21]:
display(data)

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string]

In [22]:
data.show(2)

+------+------+----------+----------+---------------+-----------+----------------+
|   _c0|   _c1|       _c2|       _c3|            _c4|        _c5|             _c6|
+------+------+----------+----------+---------------+-----------+----------------+
|regiao|estado|      data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados|
| Norte|    RO|2020-01-30|         0|              0|          0|               0|
+------+------+----------+----------+---------------+-----------+----------------+
only showing top 2 rows



&nbsp;

# Particionamento de dados

In [24]:
data = [1,2,3,4,5,6,7,8]
spark.sparkContext.parallelize(data)

ParallelCollectionRDD[78] at readRDDFromFile at PythonRDD.scala:289