<a href="https://colab.research.google.com/github/leticiaschaves/leticiaschaves/blob/main/LESSON_01_Big_Data_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Instalando Spark - para python

In [None]:
%pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Importa a biblioteca Pyspark

In [None]:
from pyspark.sql import SparkSession

### Conecta com banco SQL. Como uma fila de processamento.

In [None]:
spark = SparkSession.builder.appName("teste").getOrCreate()

### Armazena as funções da biblioteca Pyspark como F

In [None]:
import pyspark.sql.functions as F

#### Spark lê o arquivo CSV

In [None]:
df_sal = spark.read.csv("sal_data.csv", header=True)

## Função **explain()**:
Prints the (logical and physical) plans to the console for debugging purpose.

### Parameters
extendedbool, optional
default False.
**If False**, prints only the physical plan. When this is a string without specifying the mode, it works as the mode is specified.

modestr, optional
specifies the expected output format of plans.
*   **simple:** Print only a physical plan.
*   **extended:** Print both logical and physical plans.
*   **codegen:** Print a physical plan and generated codes if they are available.
*   **cost:** Print a logical plan and statistics if they are available.
*   **formatted: **Split explain output into two sections: a physical plan outline and node details.

Changed in version 3.0.0: Added optional argument mode to specify the expected output format of plans.

In [None]:
df_sal.explain()

== Physical Plan ==
FileScan csv [Employee_ID#17,CPF#18,First_Name#19,Last_Name#20,Basic_Salary#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/sal_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Employee_ID:string,CPF:string,First_Name:string,Last_Name:string,Basic_Salary:string>




## Função **show()**:
Prints the first n rows to the console.

### Parameters
**nint, optional**

Number of rows to show.

**truncatebool, optional**

If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.

**verticalbool, optional**

If set to True, print output rows vertically (one line per column value).

In [None]:
df_sal.show(5)

+-----------+--------------+----------+---------+------------+
|Employee_ID|           CPF|First_Name|Last_Name|Basic_Salary|
+-----------+--------------+----------+---------+------------+
|     E-1001|557.216.676-64|    Mahesh|    Joshi|       16860|
|     E-1002|647.770.644-09|    Rajesh|    Kolte|       14960|
|     E-1004|785.276.601-30|     Priya|     Jain|       12670|
|     E-1005|127.891.042-55|     Sneha|    Joshi|       15660|
|     E-1007|635.807.316-88|       Ram|   Kanade|       15850|
+-----------+--------------+----------+---------+------------+
only showing top 5 rows



## Função **withColumn()**:
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

The column expression must be an expression over this DataFrame; attempting to add a column from some other DataFrame will raise an error.

New in version 1.3.0.

### Parameters
colNamestr
string, name of the new column.

colColumn
a Column expression for the new column.

## Função **lit()**:
Creates a Column of literal value.

In [None]:
dobrado = df_sal.withColumn("salario_dobrado", F.col("Basic_Salary") * F.lit(2))

### Atribui a 'nomes' (nova variável) a variável criada acima (dobrado)

In [None]:
nomes = dobrado

### Faz um loop in para percorrer os nomes das colunas e deixar todas em minúsculo.

## Função **lower()**:
Converts a string expression to lower case.




In [None]:
for coluna in nomes.columns:
  nomes = nomes.withColumnRenamed(coluna, coluna.lower())

## Função **concat()**:
Concatenates multiple input columns together into a single column. The function works with strings, binary and compatible array columns.

## Função **col()**:
Returns a Column based on the given column name.

In [None]:
nomes = nomes.withColumn("nome_completo",
                         F.concat(F.col("first_name"), 
                                  F.lit(" "),
                                  F.col("last_name")))

### Traz a nova coluna dentro de 'nomes'

In [None]:
nomes.show()

+-----------+--------------+----------+---------+------------+---------------+-------------+
|employee_id|           cpf|first_name|last_name|basic_salary|salario_dobrado|nome_completo|
+-----------+--------------+----------+---------+------------+---------------+-------------+
|     E-1001|557.216.676-64|    Mahesh|    Joshi|       16860|        33720.0| Mahesh Joshi|
|     E-1002|647.770.644-09|    Rajesh|    Kolte|       14960|        29920.0| Rajesh Kolte|
|     E-1004|785.276.601-30|     Priya|     Jain|       12670|        25340.0|   Priya Jain|
|     E-1005|127.891.042-55|     Sneha|    Joshi|       15660|        31320.0|  Sneha Joshi|
|     E-1007|635.807.316-88|       Ram|   Kanade|       15850|        31700.0|   Ram Kanade|
|     E-1008|644.788.226-62|     Nishi|   Honrao|       15950|        31900.0| Nishi Honrao|
|     E-1009|521.515.132-62|    Hameed|    Singh|       15120|        30240.0| Hameed Singh|
+-----------+--------------+----------+---------+------------+--------

### Spark vai ler novo arquivo CSV que vai ficar guardado dentro de bonus_data

In [None]:
bonus_data = spark.read.csv("bonus_data.csv", header=True)

### Mostra o arquivo

In [None]:
bonus_data.show()

+-----------+-----+
|Employee_ID|Bonus|
+-----------+-----+
|     E-1001|16070|
|     E-1003|15200|
|     E-1004|13490|
|     E-1006|14200|
|     E-1008|15880|
|     E-1010|15120|
+-----------+-----+



## Função **alias()**
Returns a new DataFrame with an alias set.

New in version 1.3.0.

### Parameters
alias: str
an alias name to be set for the DataFrame.
## Função **join()**
Joins with another DataFrame, using the given join expression.

### Parameters
otherDataFrame

Right side of the join

**on**: *str, list or Column, optional*

a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

**how**: *str, optional*

default inner. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

In [None]:
nomes.alias("a").join(bonus_data.alias("b"),
                      F.col("a.employee_id") == F.col("b.Employee_ID"), how="left")

DataFrame[employee_id: string, cpf: string, first_name: string, last_name: string, basic_salary: string, salario_dobrado: double, nome_completo: string, Employee_ID: string, Bonus: string]

## Função **avg()**:
Aggregate function: returns the average of the values in a group.

## Função **take()**:
Returns the first num rows as a list of Row.

In [None]:
avg = nomes.select(F.avg("basic_salary")).take(1)[0][0]

## Função **when()**:
Evaluates a list of conditions and returns one of multiple possible result expressions. If pyspark.sql.Column.otherwise() is not invoked, None is returned for unmatched conditions.

New in version 1.4.0.

### Parameters
conditionColumn

a boolean Column expression.

value :
a literal value, or a Column expression.

## Função **otherwise()**:

Evaluates a list of conditions and returns one of multiple possible result expressions. If Column.otherwise() is not invoked, None is returned for unmatched conditions.

New in version 1.4.0.

### Parameters
value
a literal value, or a Column expression.

In [None]:
classificacao = nomes.withColumn("classificacao",
                 F.when(F.col("basic_salary") > F.lit(avg),
                        F.lit("acima_da_media"))\
                 .otherwise(F.lit("abaixo_da_media")))

In [None]:
classificacao.show()

+-----------+--------------+----------+---------+------------+---------------+-------------+---------------+
|employee_id|           cpf|first_name|last_name|basic_salary|salario_dobrado|nome_completo|  classificacao|
+-----------+--------------+----------+---------+------------+---------------+-------------+---------------+
|     E-1001|557.216.676-64|    Mahesh|    Joshi|       16860|        33720.0| Mahesh Joshi| acima_da_media|
|     E-1002|647.770.644-09|    Rajesh|    Kolte|       14960|        29920.0| Rajesh Kolte|abaixo_da_media|
|     E-1004|785.276.601-30|     Priya|     Jain|       12670|        25340.0|   Priya Jain|abaixo_da_media|
|     E-1005|127.891.042-55|     Sneha|    Joshi|       15660|        31320.0|  Sneha Joshi| acima_da_media|
|     E-1007|635.807.316-88|       Ram|   Kanade|       15850|        31700.0|   Ram Kanade| acima_da_media|
|     E-1008|644.788.226-62|     Nishi|   Honrao|       15950|        31900.0| Nishi Honrao| acima_da_media|
|     E-1009|521.51

In [None]:
classificacao.groupBy("classificacao").sum().show()

+---------------+--------------------+
|  classificacao|sum(salario_dobrado)|
+---------------+--------------------+
|abaixo_da_media|             85500.0|
| acima_da_media|            128640.0|
+---------------+--------------------+



In [None]:
classificacao.explain()

== Physical Plan ==
*(1) Project [Employee_ID#17 AS employee_id#61, CPF#18 AS cpf#68, First_Name#19 AS first_name#75, Last_Name#20 AS last_name#82, Basic_Salary#21 AS basic_salary#89, (cast(Basic_Salary#21 as double) * 2.0) AS salario_dobrado#96, concat(First_Name#19,  , Last_Name#20) AS nome_completo#103, CASE WHEN (cast(Basic_Salary#21 as double) > 15295.714285714286) THEN acima_da_media ELSE abaixo_da_media END AS classificacao#208]
+- FileScan csv [Employee_ID#17,CPF#18,First_Name#19,Last_Name#20,Basic_Salary#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/sal_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Employee_ID:string,CPF:string,First_Name:string,Last_Name:string,Basic_Salary:string>




In [None]:
def mask_cpf(cpf: str) -> str:
    '''Remove caracteres de numeros de CPF a fim de obfusca-los.
    Feita para ser utilizada como uma UDF do Pyspark.'''

    return "***" + cpf[3:9] + "-**" if len(cpf)==14 else len(cpf) * "*"

In [None]:
mask_cpf("557.216.676-64")

In [None]:
from pyspark.sql import types as T

In [None]:
mascara_cpf_udf = F.udf(mask_cpf, T.StringType())

In [None]:
classificacao.withColumn("cpf", mascara_cpf_udf(F.col("cpf")))\
  .filter(F.col("classificacao") == F.lit("acima_da_media"))\
    .drop("classificacao") \
    .show()