# Aprendendo Spark 

Documentação disponível [aqui](https://spark.apache.org/docs/latest/api/python/getting_started/install.html)

## Instalação   


In [2]:
!pip install pyspark


Collecting pyspark
  Using cached pyspark-3.5.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.1


Instalando depedencias extras

In [3]:
# Spark SQL
!pip install pyspark[sql]

# Pandas API on Spark
# !pip install plotly
!pip install pyspark[pandas_on_spark] plotly # para gerar gráficos com os dados, necessário instalar ploty

# Spark Connect
!pip install pyspark[connect]

Collecting pandas>=1.0.5 (from pyspark[sql])
  Using cached pandas-2.2.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Collecting pyarrow>=4.0.0 (from pyspark[sql])
  Using cached pyarrow-16.0.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (3.0 kB)
Collecting numpy>=1.15 (from pyspark[sql])
  Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Collecting pytz>=2020.1 (from pandas>=1.0.5->pyspark[sql])
  Using cached pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas>=1.0.5->pyspark[sql])
  Using cached tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.0 MB)
Using cached pandas-2.2.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.7 MB)
Using cached pyarrow-16.0.0-cp312-cp312-manylinux_2_28_x86_64.whl (40.8 MB)
Using cached pytz-2024.1-py2.py3-none-any.whl (505 kB

## DataFrame

PySpark applications start with initializing SparkSession which is the entry point of PySpark as below. In case of running it in PySpark shell via pyspark executable, the shell automatically creates the session in the variable spark for users.

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

24/05/06 20:08:12 WARN Utils: Your hostname, codespaces-195a24 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/05/06 20:08:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/06 20:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Maneiras de criar dataframes

In [5]:
# Criando DataFrame com linhas

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [6]:
# Criando com schema explicito 

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp') # definindo o formato das colunas
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [7]:
# Criando atravez do Pandas

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})

df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

## Verificando os Dados


In [8]:
df.show()

df.printSchema()

                                                                                

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



Alterando a configuração para aprensentar dataframe

In [9]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


As linhas podem ser mostradas na vertical.

In [10]:
df.show(1,vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



Obtendo o nome das colunas.

In [11]:
df.columns

['a', 'b', 'c', 'd', 'e']

Resumo estatístico do DataFrame


In [12]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   NULL|
| stddev|1.0|1.0|   NULL|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



**DataFrame.collect()** coleta os dados distribuidos nos drives. Isso pode trazer um erro de memória para grandes conjuntos de dados, para que não ocorra este erro, recomenda-se usar **DataFrame.take()** ou **DataFrame.tail()**

In [13]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

## Convertendo Spark para Pandas

In [14]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


## Selecionando e Acessando os Dados.

In [15]:
df.a

Column<'a'>

### Instalando Bibliotecas

In [16]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [17]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



### Criando uma nova coluna

In [18]:
df.withColumn('upper_c',upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



### Selecionando linhas com *filter*

In [19]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



### Agrupando Dados

In [20]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [21]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



df.v1

In [22]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def merge_ordered(l, r):
    return pd.merge_ordered(l, r)


In [23]:
df1.show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+



## Obtendo e Salvando Dados 

In [25]:
df.write.csv('foo.csv', header=True) # Salvar no formato csv
spark.read.csv('foo.csv', header=True).show() # ler em formato csv

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



In [28]:
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



### Trabalhando com SQL

In [26]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()


+--------+
|count(1)|
+--------+
|       8|
+--------+



É possível misturar funções do SQL e PySpark

In [30]:
from pyspark.sql.functions import expr

df.selectExpr('fruit').show()

df.select(expr('count(*)')>1).show()

+------+
| fruit|
+------+
|banana|
|banana|
|carrot|
| grape|
|carrot|
|carrot|
|banana|
| grape|
+------+

+--------------+
|(count(1) > 1)|
+--------------+
|          true|
+--------------+



### Valores Nulos Faltantes

In [50]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

In [42]:
dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

In [43]:

pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
pdf1.loc[dates[0]:dates[1], 'E'] = 1
psdf1 = ps.from_pandas(pdf1)

psdf1

Unnamed: 0,A,B,C,D,E
2013-01-01,0.784301,-0.153998,0.406843,-0.595844,1.0
2013-01-02,-0.542446,-1.430844,-0.688093,-2.078219,1.0
2013-01-03,0.156496,-1.502589,0.031849,-0.390477,
2013-01-04,0.106609,-0.687562,-1.559024,-0.300757,


In [44]:
# Deletando

psdf1.dropna(how='any')

Unnamed: 0,A,B,C,D,E
2013-01-01,0.784301,-0.153998,0.406843,-0.595844,1.0
2013-01-02,-0.542446,-1.430844,-0.688093,-2.078219,1.0


In [46]:
# Preenchendo

psdf1.fillna(value=5)

Unnamed: 0,A,B,C,D,E
2013-01-01,0.784301,-0.153998,0.406843,-0.595844,1.0
2013-01-02,-0.542446,-1.430844,-0.688093,-2.078219,1.0
2013-01-03,0.156496,-1.502589,0.031849,-0.390477,5.0
2013-01-04,0.106609,-0.687562,-1.559024,-0.300757,5.0


## Configuração Spark

In [51]:
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")  # Keep its default value.
ps.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()

In [60]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()

1.04 s ± 111 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [61]:
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)  # Set its default value back.