In [8]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# criar spark session
spark = SparkSession \
    .builder \
    .config('spark.jars.packages','org.postgresql:postgresql:42.2.20') \
    .appName('spark etl - cht') \
    .getOrCreate()

POSTGRES_DB_IP = '172.19.0.1'

df = pd.read_csv("https://raw.githubusercontent.com/dadosmaistodos/challenge-data-engineering/main/california_housing_train.csv");

spark_df = spark.createDataFrame(df)

spark_df.printSchema()


spark_df.write \
  .format("jdbc") \
  .option("url", f"jdbc:postgresql://{POSTGRES_DB_IP}:5432/cht") \
  .option("dbtable", "cht_raw") \
  .option("user", "postgres") \
  .option("driver", "org.postgresql.Driver") \
  .option("password", "postgres")\
  .mode("overwrite")\
  .save()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [43]:
cht_raw = spark.read \
  .format("jdbc") \
  .option("url", f"jdbc:postgresql://{POSTGRES_DB_IP}:5432/cht") \
  .option("dbtable", "stg.stg_cht_raw") \
  .option("user", "postgres") \
  .option("driver", "org.postgresql.Driver") \
  .option("password", "postgres")\
  .load()

cht_raw.printSchema()

cht_raw.show()

root
 |-- cht_id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)

+--------------------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|              cht_id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+--------------------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|2908c11b-007c-483...|  -118.33|    34.2|              43.0|     2322.0|         418.0|    1106.0|     433.0|       4.3631|     

In [115]:
from pyspark.sql import Window as W
from pyspark.sql import functions as F
cht_raw.createOrReplaceTempView("cht_view_in_memory")

df_hct_unique = spark.sql("""
                  select 
                  *
                  from cht_view_in_memory
                  """)


# df_cht_dp = spark.sql("""
#                   select
#                       'longitude' as name,
#                       longitude as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'latitude' as name,
#                       latitude as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'housing_median_age' as name,
#                       housing_median_age as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'total_rooms' as name,
#                       total_rooms as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'total_bedrooms' as name,
#                       total_bedrooms as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'population' as name,
#                       population as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'households' as name,
#                       households as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'median_income' as name,
#                       median_income as value
#                   from cht_view_in_memory
#                   union all
#                   select
#                       'median_house_value' as name,
#                       median_house_value as value
#                   from cht_view_in_memory
#                   """)
# df_cht_dp.createOrReplaceTempView("cht_view_all_metrics")

# df_cht_metrics = spark.sql("""
#                   select distinct
#                   *
#                   from cht_view_all_metrics
#                   """).show()
# df_dp = df_cht_metrics.select(stddev(col('value')).over(W.partitionBy('name')).alias('desvio_padrao')).show()
# df_cht_metrics.withColumn('desvio_padrao', stddev(col('value')).over(W.partitionBy('name'))).show()

df = df_hct_unique.select(
    stddev(col('longitude')).alias('longitude_dp'),
    stddev(col('latitude')).alias('latitude_dp'),
    stddev(col('housing_median_age')).alias('housing_median_age_dp'),
    stddev(col('total_rooms')).alias('total_rooms_dp'),
    stddev(col('total_bedrooms')).alias('total_bedrooms_dp'),
    stddev(col('population')).alias('population_dp'),
    stddev(col('households')).alias('households_dp'),
    stddev(col('median_income')).alias('median_income_dp'),
    stddev(col('median_house_value')).alias('median_house_value_dp')
)

df.createOrReplaceTempView("cht_dp_view_in_memory")

df_cht_dp = spark.sql("""
                  select 
                      'longitude_dp' as name,
                      longitude_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union 
                  select 
                      'latitude_dp' as name,
                      latitude_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union
                  select 
                      'housing_median_age_dp' as name,
                      housing_median_age_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union
                  select 
                      'total_rooms_dp' as name,
                      total_rooms_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union
                  select 
                      'total_bedrooms_dp' as name,
                      total_bedrooms_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union
                  select 
                      'population_dp' as name,
                      population_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union
                  select 
                      'households_dp' as name,
                      households_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union
                  select 
                      'median_income_dp' as name,
                      median_income_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  union
                  select 
                      'median_house_value_dp' as name,
                      median_house_value_dp as desvio_padrao
                  from cht_dp_view_in_memory
                  """)
df_cht_dp.createOrReplaceTempView("cht_view_all_dp_2")

df_all_dp = spark.sql("""
                        select name, desvio_padrao
                        from cht_view_all_dp_2
                        where desvio_padrao = (select max(desvio_padrao) from cht_view_all_dp)
                    """).show()


+--------------------+------------------+
|                name|     desvio_padrao|
+--------------------+------------------+
|median_house_valu...|115983.76438720878|
+--------------------+------------------+



In [127]:
df_cht_dp = spark.sql("""
                  select 
                      'longitude' as name,
                      longitude as value
                  from cht_view_in_memory
                  union 
                  select 
                      'latitude' as name,
                      latitude as value
                  from cht_view_in_memory
                  union
                  select 
                      'housing_median_age' as name,
                      housing_median_age as value
                  from cht_view_in_memory
                  union
                  select 
                      'total_rooms' as name,
                      total_rooms as value
                  from cht_view_in_memory
                  union
                  select 
                      'total_bedrooms' as name,
                      total_bedrooms as value
                  from cht_view_in_memory
                  union
                  select 
                      'population' as name,
                      population as value
                  from cht_view_in_memory
                  union
                  select 
                      'households' as name,
                      households as value
                  from cht_view_in_memory
                  union
                  select 
                      'median_income' as name,
                      median_income as value
                  from cht_view_in_memory
                  union
                  select 
                      'median_house_value' as name,
                      median_house_value as value
                  from cht_view_in_memory
                  """)
df_cht_dp.createOrReplaceTempView("cht_view_all_metrics")

df_all_metrics = spark.sql("""
                            select name, value
                            from cht_view_all_metrics
                            where value = (select max(value) from cht_view_all_metrics)
                            union
                            select name, value
                            from cht_view_all_metrics
                            where value = (select min(value) from cht_view_all_metrics)
                           """).show()

+------------------+--------+
|              name|   value|
+------------------+--------+
|median_house_value|500001.0|
|         longitude| -124.35|
+------------------+--------+



In [149]:
cht_math = spark.read \
  .format("jdbc") \
  .option("url", f"jdbc:postgresql://{POSTGRES_DB_IP}:5432/cht") \
  .option("dbtable", "edw.cht_math") \
  .option("user", "postgres") \
  .option("driver", "org.postgresql.Driver") \
  .option("password", "postgres")\
  .load()

cht_math.createOrReplaceTempView("cht_math_in_memory")

cht_math_result = spark.sql("""
                  select 
                  distinct housing_median_age, hma_cat, c_ns
                  from cht_math_in_memory
                  """)
cht_math_result.show()

+------------------+-----------+-----+
|housing_median_age|    hma_cat| c_ns|
+------------------+-----------+-----+
|              50.0|   acima_37|  sul|
|              18.0|     ate_29|norte|
|              50.0|   acima_37|norte|
|              43.0|   acima_37|norte|
|              17.0|de_0_ate_18|  sul|
|              21.0|     ate_29|norte|
|              24.0|     ate_29|norte|
|              38.0|   acima_37|  sul|
|              27.0|     ate_29|  sul|
|               9.0|de_0_ate_18|norte|
|              22.0|     ate_29|  sul|
|              11.0|de_0_ate_18|  sul|
|              46.0|   acima_37|  sul|
|              34.0|     ate_37|norte|
|              23.0|     ate_29|norte|
|              20.0|     ate_29|norte|
|              29.0|     ate_37|norte|
|              11.0|de_0_ate_18|norte|
|              16.0|de_0_ate_18|norte|
|              26.0|     ate_29|  sul|
+------------------+-----------+-----+
only showing top 20 rows



In [155]:
cht_math_result_renamed = spark.sql("""
                  select 
                  *
                  from cht_math_in_memory
                  """)
cht_math_result_renamed = cht_math_result_renamed.withColumnRenamed('hma_cat','age')\
               .withColumnRenamed('c_ns','california_region')

cht_math_result_renamed.show()


cht_math_result_renamed.createOrReplaceTempView("cht_math_renamed_in_memory")



+--------------------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------+-----------------+
|              cht_id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|     age|california_region|
+--------------------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------+-----------------+
|7a9128a1-d6ff-44f...|  -118.33|    34.2|              43.0|     2322.0|         418.0|    1106.0|     433.0|       4.3631|          284600.0|acima_37|              sul|
|1b721a65-4372-4c3...|  -118.33|    34.2|              43.0|     1325.0|         254.0|     613.0|     248.0|       3.6071|          289000.0|acima_37|              sul|
|6d78a473-1857-452...|  -118.33|    34.2|              23.0|     7179.0|        1985.0|    4757.0|    1924.0|       3.1051|          206500.0|  ate_29

In [175]:
cht_math_result_renamed = spark.sql("""
                  select 
                      age,
                      california_region,
                      total_rooms,
                      total_bedrooms,
                      population,
                      households,
                      median_house_value
                  from cht_math_renamed_in_memory
                  """)

cht_math_result_renamed.printSchema()

cht_math_result_renamed.write.mode("overwrite").parquet('./export1.parquet')

spark.read.parquet('./export1.parquet').show()

# age	string
# california_region	string
# total_rooms	double
# total_bedrooms	double
# population	double
# households	double
# median_house_value	double

root
 |-- age: string (nullable = true)
 |-- california_region: string (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_house_value: double (nullable = true)

+--------+-----------------+-----------+--------------+----------+----------+------------------+
|     age|california_region|total_rooms|total_bedrooms|population|households|median_house_value|
+--------+-----------------+-----------+--------------+----------+----------+------------------+
|acima_37|              sul|     2322.0|         418.0|    1106.0|     433.0|          284600.0|
|acima_37|              sul|     1325.0|         254.0|     613.0|     248.0|          289000.0|
|  ate_29|              sul|     7179.0|        1985.0|    4757.0|    1924.0|          206500.0|
|acima_37|              sul|     2115.0|         463.0|    1133.0|     439.0|          222000.0|
|acima_3

In [179]:
cht_math_result_renamed = spark.sql("""
                  select 
                      age,
                      california_region,
                      sum(population) as s_population,
                      avg(median_house_value) as m_median_house_value
                  from cht_math_renamed_in_memory
                  group by 1,2
                  order by m_median_house_value desc
                  """)

cht_math_result_renamed.printSchema()

cht_math_result_renamed.write.mode("overwrite").parquet('./export2.parquet')

spark.read.parquet('./export2.parquet').show()


# * Age
# * California_region
# * S_population: Soma de population
# * M_median_house_value: Média de median_house_value

root
 |-- age: string (nullable = true)
 |-- california_region: string (nullable = true)
 |-- s_population: double (nullable = true)
 |-- m_median_house_value: double (nullable = true)

+-----------+-----------------+------------+--------------------+
|        age|california_region|s_population|m_median_house_value|
+-----------+-----------------+------------+--------------------+
|   acima_37|              sul|   2519076.0|   227694.8277657267|
|     ate_29|              sul|   3905630.0|  220571.65846153846|
|   acima_37|            norte|   2114160.0|  217732.95624136343|
|     ate_37|              sul|   3435282.0|  212266.99328593997|
|de_0_ate_18|              sul|   4157987.0|  209407.56504269212|
|     ate_37|            norte|   1792950.0|   195766.6032154341|
|     ate_29|            norte|   3107411.0|  188724.02380952382|
|de_0_ate_18|            norte|   3270261.0|  177826.69768637532|
+-----------+-----------------+------------+--------------------+

