# PySpark I

## Configurações Básicas

In [31]:
from pyspark.sql import SparkSession
import pyspark as ps

In [32]:
spark = SparkSession.builder.appName("MyApp").config("spark.some.config.option", "some-value").getOrCreate()

conf = ps.SparkConf().setMaster("local[*]").setAppName("MyApp") # simulação de clusters com threads do processador
conf.set("spark.executor.heartbeatInterval", "3600s")

<pyspark.conf.SparkConf at 0x2932456a450>

## Consultando a versão

In [33]:
spark.version

'3.5.5'

## Carregando dados

In [34]:
df_spark = spark.read.format("csv").option("header", "true").load("listing_status.csv")
df_spark

DataFrame[symbol: string, name: string, exchange: string, assetType: string, ipoDate: string, delistingDate: string, status: string]

In [37]:
df_spark.show(20, truncate=False)

+-------+-------------------------------------------------------------------+---------+---------+----------+-------------+------+
|symbol |name                                                               |exchange |assetType|ipoDate   |delistingDate|status|
+-------+-------------------------------------------------------------------+---------+---------+----------+-------------+------+
|A      |Agilent Technologies Inc                                           |NYSE     |Stock    |1999-11-18|null         |Active|
|AA     |Alcoa Corp                                                         |NYSE     |Stock    |2016-10-18|null         |Active|
|AAA    |ALTERNATIVE ACCESS FIRST PRIORITY CLO BOND ETF                     |NYSE ARCA|ETF      |2020-09-09|null         |Active|
|AAAU   |Goldman Sachs Physical Gold ETF                                    |BATS     |ETF      |2018-08-15|null         |Active|
|AACG   |ATA Creativity Global                                              |NASDAQ   |Sto

In [None]:
import pandas as pd

df_pd = df_spark.toPandas()
df_pd.head()

In [None]:
df_spark2 = spark.createDataFrame(df_pd)
df_spark2.show(10, truncate=False)

## Operações básicas

In [45]:
df_spark_mod = df_spark.drop('delistingDate')
df_spark_mod.show(10)

+-------+--------------------+---------+---------+----------+------+
| symbol|                name| exchange|assetType|   ipoDate|status|
+-------+--------------------+---------+---------+----------+------+
|      A|Agilent Technolog...|     NYSE|    Stock|1999-11-18|Active|
|     AA|          Alcoa Corp|     NYSE|    Stock|2016-10-18|Active|
|    AAA|ALTERNATIVE ACCES...|NYSE ARCA|      ETF|2020-09-09|Active|
|   AAAU|Goldman Sachs Phy...|     BATS|      ETF|2018-08-15|Active|
|   AACG|ATA Creativity Gl...|   NASDAQ|    Stock|2008-01-29|Active|
|   AACT|Ares Acquisition ...|     NYSE|    Stock|2023-06-12|Active|
| AACT-U|Ares Acquisition ...|     NYSE|    Stock|2023-04-21|Active|
|AACT-WS|Ares Acquisition ...|     NYSE|    Stock|2023-06-12|Active|
|   AADI| Aadi Bioscience Inc|   NASDAQ|    Stock|2017-08-08|Active|
|   AADR|ADVISORSHARES DOR...|   NASDAQ|      ETF|2010-07-21|Active|
+-------+--------------------+---------+---------+----------+------+
only showing top 10 rows



In [46]:
df_spark_mod = df_spark_mod.select('symbol', 'exchange', 'assetType', 'ipoDate')
df_spark_mod.show(10)

+-------+---------+---------+----------+
| symbol| exchange|assetType|   ipoDate|
+-------+---------+---------+----------+
|      A|     NYSE|    Stock|1999-11-18|
|     AA|     NYSE|    Stock|2016-10-18|
|    AAA|NYSE ARCA|      ETF|2020-09-09|
|   AAAU|     BATS|      ETF|2018-08-15|
|   AACG|   NASDAQ|    Stock|2008-01-29|
|   AACT|     NYSE|    Stock|2023-06-12|
| AACT-U|     NYSE|    Stock|2023-04-21|
|AACT-WS|     NYSE|    Stock|2023-06-12|
|   AADI|   NASDAQ|    Stock|2017-08-08|
|   AADR|   NASDAQ|      ETF|2010-07-21|
+-------+---------+---------+----------+
only showing top 10 rows



In [47]:
from pyspark.sql.functions import col

df_spark_mod = df_spark_mod.withColumn('ipoDate', col('ipoDate').cast('date'))
df_spark_mod.show(10)

+-------+---------+---------+----------+
| symbol| exchange|assetType|   ipoDate|
+-------+---------+---------+----------+
|      A|     NYSE|    Stock|1999-11-18|
|     AA|     NYSE|    Stock|2016-10-18|
|    AAA|NYSE ARCA|      ETF|2020-09-09|
|   AAAU|     BATS|      ETF|2018-08-15|
|   AACG|   NASDAQ|    Stock|2008-01-29|
|   AACT|     NYSE|    Stock|2023-06-12|
| AACT-U|     NYSE|    Stock|2023-04-21|
|AACT-WS|     NYSE|    Stock|2023-06-12|
|   AADI|   NASDAQ|    Stock|2017-08-08|
|   AADR|   NASDAQ|      ETF|2010-07-21|
+-------+---------+---------+----------+
only showing top 10 rows



In [60]:
df_spark_mod = df_spark_mod.drop('currentDate')
df_spark_mod.show(10)

+-------+---------+---------+----------+---------+--------+
| symbol| exchange|assetType|   ipoDate|time_diff|maturity|
+-------+---------+---------+----------+---------+--------+
|      A|     NYSE|    Stock|1999-11-18|     9268|    long|
|     AA|     NYSE|    Stock|2016-10-18|     3089|   short|
|    AAA|NYSE ARCA|      ETF|2020-09-09|     1667|   short|
|   AAAU|     BATS|      ETF|2018-08-15|     2423|   short|
|   AACG|   NASDAQ|    Stock|2008-01-29|     6274|   short|
|   AACT|     NYSE|    Stock|2023-06-12|      661|   short|
| AACT-U|     NYSE|    Stock|2023-04-21|      713|   short|
|AACT-WS|     NYSE|    Stock|2023-06-12|      661|   short|
|   AADI|   NASDAQ|    Stock|2017-08-08|     2795|   short|
|   AADR|   NASDAQ|      ETF|2010-07-21|     5370|   short|
+-------+---------+---------+----------+---------+--------+
only showing top 10 rows



In [61]:
from pyspark.sql.functions import date_diff, current_date

df_spark_mod = df_spark_mod.withColumn('time_diff', date_diff(current_date(), col('ipoDate')))
df_spark_mod.show(10)

+-------+---------+---------+----------+---------+--------+
| symbol| exchange|assetType|   ipoDate|time_diff|maturity|
+-------+---------+---------+----------+---------+--------+
|      A|     NYSE|    Stock|1999-11-18|     9268|    long|
|     AA|     NYSE|    Stock|2016-10-18|     3089|   short|
|    AAA|NYSE ARCA|      ETF|2020-09-09|     1667|   short|
|   AAAU|     BATS|      ETF|2018-08-15|     2423|   short|
|   AACG|   NASDAQ|    Stock|2008-01-29|     6274|   short|
|   AACT|     NYSE|    Stock|2023-06-12|      661|   short|
| AACT-U|     NYSE|    Stock|2023-04-21|      713|   short|
|AACT-WS|     NYSE|    Stock|2023-06-12|      661|   short|
|   AADI|   NASDAQ|    Stock|2017-08-08|     2795|   short|
|   AADR|   NASDAQ|      ETF|2010-07-21|     5370|   short|
+-------+---------+---------+----------+---------+--------+
only showing top 10 rows



In [62]:
from pyspark.sql.functions import expr

df_spark_mod = df_spark_mod.withColumn('time_diff', expr("datediff(current_date(), ipoDate) / 365.25"))
df_spark_mod.show(10)

+-------+---------+---------+----------+---------+--------+
| symbol| exchange|assetType|   ipoDate|time_diff|maturity|
+-------+---------+---------+----------+---------+--------+
|      A|     NYSE|    Stock|1999-11-18|25.374401|    long|
|     AA|     NYSE|    Stock|2016-10-18| 8.457221|   short|
|    AAA|NYSE ARCA|      ETF|2020-09-09| 4.563997|   short|
|   AAAU|     BATS|      ETF|2018-08-15| 6.633812|   short|
|   AACG|   NASDAQ|    Stock|2008-01-29|17.177276|   short|
|   AACT|     NYSE|    Stock|2023-06-12| 1.809719|   short|
| AACT-U|     NYSE|    Stock|2023-04-21| 1.952088|   short|
|AACT-WS|     NYSE|    Stock|2023-06-12| 1.809719|   short|
|   AADI|   NASDAQ|    Stock|2017-08-08| 7.652293|   short|
|   AADR|   NASDAQ|      ETF|2010-07-21|14.702259|   short|
+-------+---------+---------+----------+---------+--------+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import when, lit

df_spark_mod = df_spark_mod.withColumn('maturity', when(df_spark_mod.time_diff > 25, lit('long')).otherwise(lit('short'))).drop('time_diff')
df_spark_mod.show(25)

+-------+---------+---------+----------+--------+
| symbol| exchange|assetType|   ipoDate|maturity|
+-------+---------+---------+----------+--------+
|      A|     NYSE|    Stock|1999-11-18|    long|
|     AA|     NYSE|    Stock|2016-10-18|   short|
|    AAA|NYSE ARCA|      ETF|2020-09-09|   short|
|   AAAU|     BATS|      ETF|2018-08-15|   short|
|   AACG|   NASDAQ|    Stock|2008-01-29|   short|
|   AACT|     NYSE|    Stock|2023-06-12|   short|
| AACT-U|     NYSE|    Stock|2023-04-21|   short|
|AACT-WS|     NYSE|    Stock|2023-06-12|   short|
|   AADI|   NASDAQ|    Stock|2017-08-08|   short|
|   AADR|   NASDAQ|      ETF|2010-07-21|   short|
|   AAGR|   NASDAQ|    Stock|2023-12-07|   short|
|  AAGRW|   NASDAQ|    Stock|2023-12-07|   short|
|    AAL|   NASDAQ|    Stock|2005-09-27|   short|
|  AAM-U|     NYSE|    Stock|2024-08-01|   short|
|   AAMC| NYSE MKT|    Stock|2012-12-13|   short|
|   AAME|   NASDAQ|    Stock|1984-09-07|    long|
|    AAN|     NYSE|    Stock|2020-11-25|   short|


In [64]:
df_stocks = df_spark_mod.filter(df_spark_mod.assetType == 'Stock')
df_stocks.show(10)

+-------+--------+---------+----------+--------+
| symbol|exchange|assetType|   ipoDate|maturity|
+-------+--------+---------+----------+--------+
|      A|    NYSE|    Stock|1999-11-18|    long|
|     AA|    NYSE|    Stock|2016-10-18|   short|
|   AACG|  NASDAQ|    Stock|2008-01-29|   short|
|   AACT|    NYSE|    Stock|2023-06-12|   short|
| AACT-U|    NYSE|    Stock|2023-04-21|   short|
|AACT-WS|    NYSE|    Stock|2023-06-12|   short|
|   AADI|  NASDAQ|    Stock|2017-08-08|   short|
|   AAGR|  NASDAQ|    Stock|2023-12-07|   short|
|  AAGRW|  NASDAQ|    Stock|2023-12-07|   short|
|    AAL|  NASDAQ|    Stock|2005-09-27|   short|
+-------+--------+---------+----------+--------+
only showing top 10 rows



In [65]:
print(df_stocks.count())

7509


In [66]:
df_spark.filter(df_spark.assetType == 'ETF').count()

4119

## Exercícios

In [67]:
df_spark = (spark.read.format("csv")
      .option("header","true")
      .load("realtor-data.csv"))
df_spark.show(3)

+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
|brokered_by|  status|   price|bed|bath|acre_lot|   street|      city|      state|zip_code|house_size|prev_sold_date|
+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
|   103378.0|for_sale|105000.0|  3|   2|    0.12|1962661.0|  Adjuntas|Puerto Rico|   00601|     920.0|          NULL|
|    52707.0|for_sale| 80000.0|  4|   2|    0.08|1902874.0|  Adjuntas|Puerto Rico|   00601|    1527.0|          NULL|
|   103379.0|for_sale| 67000.0|  2|   1|    0.15|1404990.0|Juana Diaz|Puerto Rico|   00795|     748.0|          NULL|
+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
only showing top 3 rows



In [68]:
df_spark.count()

2226382

In [69]:
df_spark.filter((col('state')=='Puerto Rico')&(col('price')>100000)).count()

2265

In [70]:
df_spark.filter((df_spark.state == 'Puerto Rico')&(df_spark.price > 100000)).count()

2265

In [None]:
# "pequena” quando house_size é menor que 1000.
# “media” quando house_size é menor que 5000 e maior que 1000. 
# “grande” quando house_size é maior que 5000.
# “sem info” quando house_size é null

df_spark_mod = df_spark.withColumn("tamanho", when(df_spark.house_size <= 1000, "pequena").when((df_spark.house_size > 1000) & (df_spark.house_size <= 5000), "media").when(df_spark.house_size > 5000, "grande").otherwise("sem info"))
df_spark_mod.show(10)

+-----------+--------+--------+---+----+--------+---------+-------------+-----------+--------+----------+--------------+--------+
|brokered_by|  status|   price|bed|bath|acre_lot|   street|         city|      state|zip_code|house_size|prev_sold_date| tamanho|
+-----------+--------+--------+---+----+--------+---------+-------------+-----------+--------+----------+--------------+--------+
|   103378.0|for_sale|105000.0|  3|   2|    0.12|1962661.0|     Adjuntas|Puerto Rico|   00601|     920.0|          NULL| pequena|
|    52707.0|for_sale| 80000.0|  4|   2|    0.08|1902874.0|     Adjuntas|Puerto Rico|   00601|    1527.0|          NULL|   media|
|   103379.0|for_sale| 67000.0|  2|   1|    0.15|1404990.0|   Juana Diaz|Puerto Rico|   00795|     748.0|          NULL| pequena|
|    31239.0|for_sale|145000.0|  4|   2|     0.1|1947675.0|        Ponce|Puerto Rico|   00731|    1800.0|          NULL|   media|
|    34632.0|for_sale| 65000.0|  6|   2|    0.05| 331151.0|     Mayaguez|Puerto Rico|   00