In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, log, col

spark = SparkSession.builder.appName("Proyecto1").getOrCreate()
 
# Cargar datos desde S3 (reemplaza con tu ruta real)

df = spark.read.csv("s3://javillarrmb/zones/raw/housing/Housing.csv", header=True, inferSchema=True)
 
df 

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,application_1748125271859_0006,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[price: int, area: int, bedrooms: int, bathrooms: int, stories: int, mainroad: string, guestroom: string, basement: string, hotwaterheating: string, airconditioning: string, parking: int, prefarea: string, furnishingstatus: string]

In [2]:
# Mapear columnas binarias

binary_columns = ['mainroad', 'guestroom', 'basement', 'hotwaterheating', 'airconditioning', 'prefarea']

for col_name in binary_columns:

    df = df.withColumn(col_name, when(col(col_name) == 'yes', 1).otherwise(0))
 
# One-hot encoding manual (puede usarse StringIndexer + OneHotEncoder si prefieres ML pipelines)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
 
indexer = StringIndexer(inputCol="furnishingstatus", outputCol="furnishingstatus_index")

encoder = OneHotEncoder(inputCols=["furnishingstatus_index"], outputCols=["furnishingstatus_vec"])
 
df = indexer.fit(df).transform(df)

df = encoder.fit(df).transform(df)
 
# Log transformaciones

df = df.withColumn("log_price", log(col("price")))

df = df.withColumn("log_area", log(col("area")))
 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Guardar en zona 'trusted'

df.write.mode("overwrite").parquet("s3a://javillarrmb/zones/trusted/housing/Housing.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.regression import LinearRegression

from pyspark.ml.evaluation import RegressionEvaluator
 
# Armar features

feature_cols = ['log_area', 'bedrooms', 'bathrooms', 'stories', 'mainroad', 'guestroom',

                'basement', 'hotwaterheating', 'airconditioning', 'parking', 'prefarea']

assembler = VectorAssembler(inputCols=feature_cols + ['furnishingstatus_vec'], outputCol="features")
 
df_features = assembler.transform(df)
 
# Split

train_data, test_data = df_features.randomSplit([0.8, 0.2], seed=42)
 
# Modelo

lr = LinearRegression(featuresCol='features', labelCol='log_price')

model = lr.fit(train_data)
 
# Predicción y evaluación

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(labelCol='log_price', predictionCol='prediction', metricName='rmse')

rmse = evaluator.evaluate(predictions)

print("RMSE:", rmse)

 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE: 0.23044645091312385

In [9]:
# Guardar en zona 'refined'

predictions.write.mode("overwrite").parquet("s3a://javillarrmb/zones/refined/predictions/Housing.parquet")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
pythonCopiarEditardf_stats = spark.sql("""
CREATE DATABASE IF NOT EXISTS proyecto1db;
""")
pythonCopiarEditardf_stats = spark.sql("""
CREATE EXTERNAL TABLE housing (
    area DOUBLE,
    bedrooms INT,
    log_area DOUBLE,
    log_price DOUBLE
)
STORED AS PARQUET
LOCATION 's3://javillarrmb/zones/trusted/housing/';
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df_sql = spark.sql("""
SELECT bedrooms, AVG(log_price) as avg_price
FROM housing
GROUP BY bedrooms
ORDER BY avg_price DESC
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…