# PySpark I

## Configurações Básicas

In [11]:
from pyspark.sql import SparkSession
import pyspark as ps

spark = SparkSession.builder.appName("MyApp").config("spark.some.config.option", "some-value").getOrCreate()

conf = ps.SparkConf().setMaster("local[*]").setAppName("MyApp") # simulação de clusters com threads do processador
conf.set("spark.executor.heartbeatInterval", "3600s")

<pyspark.conf.SparkConf at 0x1cfa736bf50>

## Consultando a versão

In [12]:
spark.version

'3.5.3'

## Carregando dados

In [13]:
df_spark = spark.read.format("csv").option("header", "true").load("listing_status.csv")
df_spark

DataFrame[symbol: string, name: string, exchange: string, assetType: string, ipoDate: string, delistingDate: string, status: string]

In [17]:
df_spark.show(20, truncate=False)

+-------+-------------------------------------------------------------------+---------+---------+----------+-------------+------+
|symbol |name                                                               |exchange |assetType|ipoDate   |delistingDate|status|
+-------+-------------------------------------------------------------------+---------+---------+----------+-------------+------+
|A      |Agilent Technologies Inc                                           |NYSE     |Stock    |1999-11-18|null         |Active|
|AA     |Alcoa Corp                                                         |NYSE     |Stock    |2016-10-18|null         |Active|
|AAA    |ALTERNATIVE ACCESS FIRST PRIORITY CLO BOND ETF                     |NYSE ARCA|ETF      |2020-09-09|null         |Active|
|AAAU   |Goldman Sachs Physical Gold ETF                                    |BATS     |ETF      |2018-08-15|null         |Active|
|AACG   |ATA Creativity Global                                              |NASDAQ   |Sto

In [18]:
df_pd = df_spark.toPandas()

df_pd.head()

Unnamed: 0,symbol,name,exchange,assetType,ipoDate,delistingDate,status
0,A,Agilent Technologies Inc,NYSE,Stock,1999-11-18,,Active
1,AA,Alcoa Corp,NYSE,Stock,2016-10-18,,Active
2,AAA,ALTERNATIVE ACCESS FIRST PRIORITY CLO BOND ETF,NYSE ARCA,ETF,2020-09-09,,Active
3,AAAU,Goldman Sachs Physical Gold ETF,BATS,ETF,2018-08-15,,Active
4,AACG,ATA Creativity Global,NASDAQ,Stock,2008-01-29,,Active


In [19]:
df_spark2 = spark.createDataFrame(df_pd)
df_spark2.show(10, truncate=False)

+-------+-----------------------------------------------------------------+---------+---------+----------+-------------+------+
|symbol |name                                                             |exchange |assetType|ipoDate   |delistingDate|status|
+-------+-----------------------------------------------------------------+---------+---------+----------+-------------+------+
|A      |Agilent Technologies Inc                                         |NYSE     |Stock    |1999-11-18|null         |Active|
|AA     |Alcoa Corp                                                       |NYSE     |Stock    |2016-10-18|null         |Active|
|AAA    |ALTERNATIVE ACCESS FIRST PRIORITY CLO BOND ETF                   |NYSE ARCA|ETF      |2020-09-09|null         |Active|
|AAAU   |Goldman Sachs Physical Gold ETF                                  |BATS     |ETF      |2018-08-15|null         |Active|
|AACG   |ATA Creativity Global                                            |NASDAQ   |Stock    |2008-01-2

## Operações básicas

In [20]:
df_spark_mod = df_spark.drop('delistingDate')
df_spark_mod.show(10)

+-------+--------------------+---------+---------+----------+------+
| symbol|                name| exchange|assetType|   ipoDate|status|
+-------+--------------------+---------+---------+----------+------+
|      A|Agilent Technolog...|     NYSE|    Stock|1999-11-18|Active|
|     AA|          Alcoa Corp|     NYSE|    Stock|2016-10-18|Active|
|    AAA|ALTERNATIVE ACCES...|NYSE ARCA|      ETF|2020-09-09|Active|
|   AAAU|Goldman Sachs Phy...|     BATS|      ETF|2018-08-15|Active|
|   AACG|ATA Creativity Gl...|   NASDAQ|    Stock|2008-01-29|Active|
|   AACT|Ares Acquisition ...|     NYSE|    Stock|2023-06-12|Active|
| AACT-U|Ares Acquisition ...|     NYSE|    Stock|2023-04-21|Active|
|AACT-WS|Ares Acquisition ...|     NYSE|    Stock|2023-06-12|Active|
|   AADI| Aadi Bioscience Inc|   NASDAQ|    Stock|2017-08-08|Active|
|   AADR|ADVISORSHARES DOR...|   NASDAQ|      ETF|2010-07-21|Active|
+-------+--------------------+---------+---------+----------+------+
only showing top 10 rows



In [21]:
df_spark_mod = df_spark_mod.select('symbol', 'exchange', 'assetType', 'ipoDate')
df_spark_mod.show(10)

+-------+---------+---------+----------+
| symbol| exchange|assetType|   ipoDate|
+-------+---------+---------+----------+
|      A|     NYSE|    Stock|1999-11-18|
|     AA|     NYSE|    Stock|2016-10-18|
|    AAA|NYSE ARCA|      ETF|2020-09-09|
|   AAAU|     BATS|      ETF|2018-08-15|
|   AACG|   NASDAQ|    Stock|2008-01-29|
|   AACT|     NYSE|    Stock|2023-06-12|
| AACT-U|     NYSE|    Stock|2023-04-21|
|AACT-WS|     NYSE|    Stock|2023-06-12|
|   AADI|   NASDAQ|    Stock|2017-08-08|
|   AADR|   NASDAQ|      ETF|2010-07-21|
+-------+---------+---------+----------+
only showing top 10 rows



In [27]:
df_stocks = df_spark_mod.filter(df_spark_mod.assetType == 'Stock')
df_stocks.show(10)

+-------+--------+---------+----------+
| symbol|exchange|assetType|   ipoDate|
+-------+--------+---------+----------+
|      A|    NYSE|    Stock|1999-11-18|
|     AA|    NYSE|    Stock|2016-10-18|
|   AACG|  NASDAQ|    Stock|2008-01-29|
|   AACT|    NYSE|    Stock|2023-06-12|
| AACT-U|    NYSE|    Stock|2023-04-21|
|AACT-WS|    NYSE|    Stock|2023-06-12|
|   AADI|  NASDAQ|    Stock|2017-08-08|
|   AAGR|  NASDAQ|    Stock|2023-12-07|
|  AAGRW|  NASDAQ|    Stock|2023-12-07|
|    AAL|  NASDAQ|    Stock|2005-09-27|
+-------+--------+---------+----------+
only showing top 10 rows



In [23]:
print(df_stocks.count())

7509


In [25]:
df_spark.filter(df_spark.assetType == 'ETF').count()

4119

## Exercícios

In [29]:
df_spark = (spark.read.format("csv")
      .option("header","true")
      .load("realtor-data.csv"))
df_spark.show(3)

+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
|brokered_by|  status|   price|bed|bath|acre_lot|   street|      city|      state|zip_code|house_size|prev_sold_date|
+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
|   103378.0|for_sale|105000.0|  3|   2|    0.12|1962661.0|  Adjuntas|Puerto Rico|   00601|     920.0|          NULL|
|    52707.0|for_sale| 80000.0|  4|   2|    0.08|1902874.0|  Adjuntas|Puerto Rico|   00601|    1527.0|          NULL|
|   103379.0|for_sale| 67000.0|  2|   1|    0.15|1404990.0|Juana Diaz|Puerto Rico|   00795|     748.0|          NULL|
+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
only showing top 3 rows



In [30]:
df_spark.count()

2226382

In [31]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, lit, when

In [32]:
df_spark.filter((col('state')=='Puerto Rico')&(col('price')>100000)).count()

2265

In [33]:
df_spark.filter((df_spark.state == 'Puerto Rico')&(df_spark.price > 100000)).count()

2265

In [34]:
# "pequena” quando house_size é menor que 1000.
# “media” quando house_size é menor que 5000 e maior que 1000. 
# “grande” quando house_size é maior que 5000.
# “sem info” quando house_size é null

df_spark_mod = df_spark.withColumn("tamanho", when(df_spark.house_size <= 1000, "pequena").when((df_spark.house_size > 1000) & (df_spark.house_size <= 5000), "media").when(df_spark.house_size > 5000, "grande").otherwise("sem info"))
df_spark_mod.show(10)

+-----------+--------+--------+---+----+--------+---------+-------------+-----------+--------+----------+--------------+--------+
|brokered_by|  status|   price|bed|bath|acre_lot|   street|         city|      state|zip_code|house_size|prev_sold_date| tamanho|
+-----------+--------+--------+---+----+--------+---------+-------------+-----------+--------+----------+--------------+--------+
|   103378.0|for_sale|105000.0|  3|   2|    0.12|1962661.0|     Adjuntas|Puerto Rico|   00601|     920.0|          NULL| pequena|
|    52707.0|for_sale| 80000.0|  4|   2|    0.08|1902874.0|     Adjuntas|Puerto Rico|   00601|    1527.0|          NULL|   media|
|   103379.0|for_sale| 67000.0|  2|   1|    0.15|1404990.0|   Juana Diaz|Puerto Rico|   00795|     748.0|          NULL| pequena|
|    31239.0|for_sale|145000.0|  4|   2|     0.1|1947675.0|        Ponce|Puerto Rico|   00731|    1800.0|          NULL|   media|
|    34632.0|for_sale| 65000.0|  6|   2|    0.05| 331151.0|     Mayaguez|Puerto Rico|   00