In [1]:
# importando bibliotecas
from pyspark.sql import SparkSession
import pyspark as ps
spark = (SparkSession
        .builder
        .appName('atv-sala')
        .config('spark.some.config.option', 'some-value')
        .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/05 11:25:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/05 11:25:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# configurando sessao
conf = ps.SparkConf().setMaster("yarn-client").setAppName("spark-mer")
conf.set('spark.executor.heartbeatInterval', '3600s')

<pyspark.conf.SparkConf at 0x7f973840cc50>

In [3]:
spark_realtor = (spark.read.format('csv')
                .option("header", "true")
                .load("realtor-data.csv"))
spark_realtor

                                                                                

DataFrame[status: string, bed: string, bath: string, acre_lot: string, city: string, state: string, zip_code: string, house_size: string, prev_sold_date: string, price: string]

In [4]:
# 1) conte os registros
print(f"Total de registros: {spark_realtor.count()}")

Total de registros: 100000


In [5]:
# verificando tabela
spark_realtor.show(2)

+--------+---+----+--------+--------+-----------+--------+----------+--------------+--------+
|  status|bed|bath|acre_lot|    city|      state|zip_code|house_size|prev_sold_date|   price|
+--------+---+----+--------+--------+-----------+--------+----------+--------------+--------+
|for_sale|3.0| 2.0|    0.12|Adjuntas|Puerto Rico|   601.0|     920.0|          NULL|105000.0|
|for_sale|4.0| 2.0|    0.08|Adjuntas|Puerto Rico|   601.0|    1527.0|          NULL| 80000.0|
+--------+---+----+--------+--------+-----------+--------+----------+--------------+--------+
only showing top 2 rows



In [6]:
# importando funções
import pyspark.sql.functions as f   
from pyspark.sql.functions import col, lit, when

In [7]:
# 2) total de registros no estado de puerto rico com preço maior que 100.000
realtor_pr_100k = spark_realtor.filter((col('state')== 'Puerto Rico') & (col('price') > 100000))
realtor_pr_100k.show(5)

+--------+---+----+--------+-------------+-----------+--------+----------+--------------+--------+
|  status|bed|bath|acre_lot|         city|      state|zip_code|house_size|prev_sold_date|   price|
+--------+---+----+--------+-------------+-----------+--------+----------+--------------+--------+
|for_sale|3.0| 2.0|    0.12|     Adjuntas|Puerto Rico|   601.0|     920.0|          NULL|105000.0|
|for_sale|4.0| 2.0|     0.1|        Ponce|Puerto Rico|   731.0|    1800.0|          NULL|145000.0|
|for_sale|4.0| 3.0|    0.46|San Sebastian|Puerto Rico|   612.0|    2520.0|          NULL|179000.0|
|for_sale|5.0| 3.0|    7.46|   Las Marias|Puerto Rico|   670.0|    5403.0|          NULL|300000.0|
|for_sale|3.0| 2.0|    0.08|   Juana Diaz|Puerto Rico|   795.0|    1045.0|          NULL|150000.0|
+--------+---+----+--------+-------------+-----------+--------+----------+--------------+--------+
only showing top 5 rows



In [8]:
# fazendo o count 
print(f"Número de registros no Estado do Porto Rico com valor maior que 100 mil dolares: {realtor_pr_100k.count()}")



Número de registros no Estado do Porto Rico com valor maior que 100 mil dolares: 17127


                                                                                

In [9]:
realtor_tamanho = spark_realtor.withColumn('tamanho',
                                        when(col('house_size') < 1000, 'pequena')
                                        .when((col('house_size') > 1000) & (col('house_size') < 5000), 'media')
                                        .when(col('house_size') > 5000, 'grande')
                                        .otherwise('sem info'))
realtor_tamanho.show(5)

+--------+---+----+--------+----------+-----------+--------+----------+--------------+--------+--------+
|  status|bed|bath|acre_lot|      city|      state|zip_code|house_size|prev_sold_date|   price| tamanho|
+--------+---+----+--------+----------+-----------+--------+----------+--------------+--------+--------+
|for_sale|3.0| 2.0|    0.12|  Adjuntas|Puerto Rico|   601.0|     920.0|          NULL|105000.0| pequena|
|for_sale|4.0| 2.0|    0.08|  Adjuntas|Puerto Rico|   601.0|    1527.0|          NULL| 80000.0|   media|
|for_sale|2.0| 1.0|    0.15|Juana Diaz|Puerto Rico|   795.0|     748.0|          NULL| 67000.0| pequena|
|for_sale|4.0| 2.0|     0.1|     Ponce|Puerto Rico|   731.0|    1800.0|          NULL|145000.0|   media|
|for_sale|6.0| 2.0|    0.05|  Mayaguez|Puerto Rico|   680.0|      NULL|          NULL| 65000.0|sem info|
+--------+---+----+--------+----------+-----------+--------+----------+--------------+--------+--------+
only showing top 5 rows



In [10]:
# salve e leia o arquivo em parquet
spark_realtor.write.parquet('parquet_realtor.parquet', mode='overwrite')
realtor_parquet = spark.read.parquet('parquet_realtor.parquet')
realtor_parquet

                                                                                

DataFrame[status: string, bed: string, bath: string, acre_lot: string, city: string, state: string, zip_code: string, house_size: string, prev_sold_date: string, price: string]

In [13]:
# gere um cache da tabela com 32 partições
#antes
realtor_part = spark_realtor.repartition(32)
realtor_part.rdd.getNumPartitions()

[Stage 14:>                                                         (0 + 2) / 2]

32

In [15]:
# depois
realtor_part = spark_realtor.repartition(32)
realtor_part.rdd.getNumPartitions()

32

In [18]:
# pivot table da contagem de state e status
realtor_pivot = (spark_realtor
                .groupBy('state')
                .pivot('status')
                .count())
realtor_pivot.show()

+--------------+--------+--------------+
|         state|for_sale|ready_to_build|
+--------------+--------+--------------+
|   Connecticut|   12178|          NULL|
|       Vermont|    1324|          NULL|
|   Puerto Rico|   24679|          NULL|
|Virgin Islands|    2573|          NULL|
|  Rhode Island|    2401|          NULL|
|      Virginia|       3|          NULL|
|    New Jersey|       2|          NULL|
| Massachusetts|   52513|           181|
| New Hampshire|    2232|          NULL|
|     Tennessee|      16|          NULL|
|South Carolina|      24|          NULL|
|      New York|    1874|          NULL|
+--------------+--------+--------------+



In [28]:
# mostre 5 cidades que possuem as maiores casas 
import pandas as pd
(spark_realtor.fillna(0, subset='house_size')
            .groupBy('city')
            .agg(f.max('house_size').alias('hs_max'))
            .orderBy(f.desc('hs_max'))
            .show(5))


+----------+-------+
|      city| hs_max|
+----------+-------+
|     Acton|99999.0|
|Middletown|99999.0|
|  Chicopee| 9999.0|
|   Holyoke| 9999.0|
|    Dorado|  999.0|
+----------+-------+
only showing top 5 rows



In [31]:
pd_realtor = spark_realtor.toPandas()
pd_realtor = pd_realtor.reset_index()
realtor_spark_novo = spark.createDataFrame(pd_realtor)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


In [47]:
df = pd.DataFrame({'state': ['Connecticuti',
                            'Puerto Rico',
                            'Massachusetts',
                            'New York'],
                'population': [3606, 3264, 6985, 8468]})
df

Unnamed: 0,state,population
0,Connecticuti,3606
1,Puerto Rico,3264
2,Massachusetts,6985
3,New York,8468


In [49]:

df_spark = spark.createDataFrame(df)
df_join = spark_realtor.join(df_spark, 'state', 'inner')
df_join.count()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
                                                                                

79247

In [50]:
df_join.show()

+-----------+--------+----+----+--------+------------+--------+----------+--------------+---------+----------+
|      state|  status| bed|bath|acre_lot|        city|zip_code|house_size|prev_sold_date|    price|population|
+-----------+--------+----+----+--------+------------+--------+----------+--------------+---------+----------+
|Puerto Rico|for_sale|NULL| 6.0|    NULL|    San Juan|   907.0|    5530.0|          NULL|2500000.0|      3264|
|Puerto Rico|for_sale| 1.0| 1.0|    0.07|    San Juan|   995.0|    4180.0|          NULL| 410000.0|      3264|
|Puerto Rico|for_sale| 4.0| 3.0|    NULL|    San Juan|   907.0|    3000.0|          NULL|2975000.0|      3264|
|Puerto Rico|for_sale|NULL|NULL|    0.32|    Toa Baja|   949.0|      NULL|          NULL|  24900.0|      3264|
|Puerto Rico|for_sale| 1.0| 1.0|    NULL|    San Juan|   907.0|     812.0|          NULL| 450000.0|      3264|
|Puerto Rico|for_sale| 2.0| 2.0|    NULL|      Dorado|   646.0|    1172.0|          NULL|1125000.0|      3264|
|