In [31]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

In [2]:
# Create SparkSession
spark = SparkSession.builder.appName("MT-DE-Exercise").getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv("./california_housing_train.csv", header=True, inferSchema=True)

df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [3]:
# Get preliminary statistics from dataframe
df.describe().show()

+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              17000|             17000|             17000|            17000|            17000|             17000|            17000|             17000|             17000|
|   mean|-119.56210823529375|  35.6252247058827| 28.58935294117647|2643.664411764706|539.4108235294118|1429.5739411764705|501.2219411764706| 3.883578100000021|207300.91235294117|
| stddev| 2.0051664084260357|2.1373397946570867|12.586936981660406|2179.947071452777|421.4994515798648| 1

In [10]:
# Get the number of missing values for every column
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+



In [11]:
# Get the minimum/maximum value of every column.
df.agg({col: "min" for col in df.columns}).show()
df.agg({col: "max" for col in df.columns}).show()

+---------------+-------------+---------------+-------------------+--------------+------------------+----------------+-----------------------+-----------------------+
|min(households)|min(latitude)|min(population)|min(total_bedrooms)|min(longitude)|min(median_income)|min(total_rooms)|min(median_house_value)|min(housing_median_age)|
+---------------+-------------+---------------+-------------------+--------------+------------------+----------------+-----------------------+-----------------------+
|            1.0|        32.54|            3.0|                1.0|       -124.35|            0.4999|             2.0|                14999.0|                    1.0|
+---------------+-------------+---------------+-------------------+--------------+------------------+----------------+-----------------------+-----------------------+

+---------------+-------------+---------------+-------------------+--------------+------------------+----------------+-----------------------+----------------------

In [12]:
# Get column that has the higher standard deviation
max_stddev_col = df.toPandas().std().idxmax()
print(max_stddev_col)

median_house_value


In [13]:
# Get min/max values of the column with the higher standard deviation
df.select(F.min(F.col(max_stddev_col))).show()
df.select(F.max(F.col(max_stddev_col))).show()

+-----------------------+
|min(median_house_value)|
+-----------------------+
|                14999.0|
+-----------------------+

+-----------------------+
|max(median_house_value)|
+-----------------------+
|               500001.0|
+-----------------------+



2.1 - Criar coluna hma_cat, baseada na coluna housing_median_age, conforme as regras abaixo:

    *  Se < 18 então de_0_ate_18.
    *  Se >= 18 E < 29 entao ate_29.
    *  Se >= 29 E < 37 entao ate_37.
    *  Se >= 37 então acima_37.    

2.2 - Criar a coluna c_ns:

    * Onde longitude abaixo (<) de -119 recebe o valor norte e acima(>=) sul. 

2.3 - Renomer as colunas:

    * hma_cat > age
    * c_ns > california_region

In [24]:
df = df.withColumn("hma_cat",
    when(df.housing_median_age < 18, "de_0_ate_18")
    .when((df.housing_median_age >= 18) & (df.housing_median_age < 29), "ate_29")
    .when((df.housing_median_age >= 29) & (df.housing_median_age < 37), "ate_37")
    .otherwise("acima_37")
)

df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|    hma_cat|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|de_0_ate_18|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|     ate_29|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|de_0_ate_18|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|de_0_ate_18|
|  -114.57|   33.57|              20.0|     1454.0|         32

In [25]:
df = df.withColumn("c_ns",
    when(df.longitude < -119, "norte")
    .otherwise("sul")
)

df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------+----+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|    hma_cat|c_ns|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------+----+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|de_0_ate_18| sul|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|     ate_29| sul|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|de_0_ate_18| sul|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|de_0_ate_18| sul|
|  -114.57|   33.57|       

In [30]:
# Renomear colunas categóricas
df = df.withColumnRenamed("hma_cat", "age").withColumnRenamed("c_ns", "california_region")

df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------+-----------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|        age|california_region|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------+-----------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|de_0_ate_18|              sul|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|     ate_29|              sul|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|de_0_ate_18|              sul|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|

Escrever o resultado localmente em parquet, armazenar os dados no seguinte formato:

| Coluna              | Datatype    |
| --------------------| ----------- |
| `age`               | `string`    |
| `california_region` | `string`    |
| `total_rooms`       | `double`    |
| `total_bedrooms`    | `double`    |
| `population`        | `double`    |
| `households`        | `double`    |
| `median_house_value`| `double`    |

In [37]:
write_schema = StructType([
    StructField("age", StringType()),
    StructField("california_region", StringType()),
    StructField("total_rooms", DoubleType()),
    StructField("total_bedrooms", DoubleType()),
    StructField("population", DoubleType()),
    StructField("households", DoubleType()),
    StructField("median_house_value", DoubleType())
])

df.write.option("schema", write_schema).mode("overwrite").parquet("./output/eda")