# Introdução ao PySpark
Mais informações no link https://spark.apache.org/docs/latest/api/python/

#Instalação das dependências e configuração do ambiente

In [1]:
pip install pyspark pyarrow



In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

# Importando as dependências necessárias aos nossos exemplos

In [4]:
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

In [5]:
from pyspark import SparkFiles
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from functools import reduce

import pandas as pd

# Verificando o Spark Context

In [6]:
print(sc)

<pyspark.sql.session.SparkSession object at 0x7fe1e714b190>


Configurando nível de log do sparkContext desejado para nossos exemplos

In [None]:
sc.setLogLevel("WARN")

In [8]:
#Criado um DataFrame Pandas a partir de uma lista utilizando Python

In [9]:
valor = [("Joao",23,"M"),]
print(type(valor))


print("joao da chuva"[:5])
colunas = ['nome', 'idade','camiseta']
print(type(colunas))

df_test = pd.DataFrame(valor, columns=colunas)
df_outro  = df_test["nome"]
df_outro.describe()

<class 'list'>
joao 
<class 'list'>


count        1
unique       1
top       Joao
freq         1
Name: nome, dtype: object

In [10]:
#linhas do Dataframe
lines = [('DF','Centro-Oeste'),('GO','Centro-Oeste'),('MT','Centro-Oeste'),('MS','Centro-Oeste'),
('AL','Nordeste'),('BA','Nordeste'),('CE','Nordeste'),('MA','Nordeste'),('PB','Nordeste'),('PE','Nordeste'),('PI','Nordeste'),('RN','Nordeste'),
('SE','Nordeste'),('AC','Norte'),('AP','Norte'),('AM','Norte'),('PA','Norte'),('RO','Norte'),('RR','Norte'),('TO','Norte'),
('ES','Sudeste'),('MG','Sudeste'),('RJ','Sudeste'),('SP','Sudeste'),('PR','Sul'),('RS','Sul'),('SC','Sul')]
columns = ["Sigla", "Regiao"]
df_uf_reg = pd.DataFrame(lines, columns=columns)

In [11]:
df_uf_reg.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27 entries, 0 to 26
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Sigla   27 non-null     object
 1   Regiao  27 non-null     object
dtypes: object(2)
memory usage: 560.0+ bytes


In [12]:
df_uf_reg.describe()

Unnamed: 0,Sigla,Regiao
count,27,27
unique,27,5
top,DF,Nordeste
freq,1,9


In [13]:
df_uf_reg['Sigla'].head(5)

0    DF
1    GO
2    MT
3    MS
4    AL
Name: Sigla, dtype: object

In [14]:
spark = SparkSession.builder.getOrCreate()

In [15]:
url = 'https://handson-zup-data.s3.amazonaws.com/caso_full.csv'

#Baixando um Dataset direto para um DataFrame Pandas

In [16]:
df = pd.read_csv(url)

# Convertendo o DataFrame pandas em um DataFrame Spark e realizando operações com os dados

In [None]:
df_covid = spark.createDataFrame(df)

In [None]:
print(type(df_covid))

In [None]:
df_covid.printSchema()

In [None]:
df_covid.select(df_covid.city).show(5)

In [None]:
df_covid.select(df_covid['city'], df_covid['state'],df_covid['date'], df_covid['new_deaths']).orderBy(df_covid['new_deaths']).show(5)

In [None]:
df_covid = df_covid.withColumn('ano', df_covid['date'].substr(0,4))

In [None]:
df_covid.select('ano','date').show(4)

In [None]:
df_resumo = df_covid.groupBy('ano').sum('new_deaths','new_confirmed')

In [None]:
df_resumo.show()

In [None]:
df_uf_reg_spk = spark.createDataFrame(df_uf_reg)

In [None]:
df_covid = df_covid.join(df_uf_reg_spk, df_covid['state']==df_uf_reg_spk['sigla'], how="left")

In [None]:
df_covid.columns

In [None]:
df_covid.groupBy('Regiao').sum('new_deaths').show()

In [None]:
print(spark.catalog.listTables())