<a href="https://colab.research.google.com/github/moraesleonardo/PySpark_2/blob/main/TrabalhoSpark_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [398]:
# Acesso ao Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [399]:
# Instalar pyspark
!pip install pyspark



In [400]:
# Acesso à API DataFrame e Dataset em Spark
from pyspark.sql import SparkSession

In [401]:
# Iniciando a sparkSession
spark = SparkSession.builder.appName("Edmonton_Housing").master('local[*]').getOrCreate()

In [402]:
# Ler o arquivo edmonton_housing a partir do google drive
df_housing = spark.read.option("delimiter", ",").csv(path='/content/drive/My Drive/edmonton_housing_3.csv', schema= 'Price float, Bedrooms float, Bathrooms float, Square_Footage float, Year_Built float', header=True)

In [403]:
df_housing.show(20)

+-------+--------+---------+--------------+----------+
|  Price|Bedrooms|Bathrooms|Square_Footage|Year_Built|
+-------+--------+---------+--------------+----------+
| 399.99|     1.0|      1.0|         787.0|    1948.0|
|  357.0|     4.0|      2.0|         929.0|    1954.0|
|  499.6|     5.0|      2.0|         1.161|    1959.0|
|  398.8|     4.0|      3.0|         1.217|    1976.0|
|  279.9|     3.0|      2.0|          1.16|    1979.0|
|  292.0|     3.0|      2.0|         914.0|    1955.0|
| 314.85|     3.0|      2.0|         1.119|    1967.0|
|  189.9|     3.0|      1.0|         694.0|    1940.0|
|238.888|     4.0|      0.0|         926.0|    1959.0|
|  219.9|     3.0|      1.0|         1.109|    1942.0|
|  289.0|     4.0|      2.0|         1.401|    1945.0|
|  419.0|     3.0|      2.0|         815.0|    1959.0|
|  397.5|     4.0|      2.0|         992.0|    1953.0|
|  334.9|     3.0|      2.0|         1.038|    1980.0|
|  530.0|     4.0|      3.0|         1.766|    1999.0|
|  235.0| 

Preparação dos Dados da Tabela

In [404]:
# Selecionando colunas desejadas
selected_columns = ['Price', 'Bedrooms', 'Bathrooms', 'Square_Footage', 'Year_Built']
df_selected_housing = df_housing.select(selected_columns)
df_selected_housing.show(20)

#número de linhas
num_linhas = df_selected_housing.count()
print(f"O dataset possui {num_linhas} linhas.")

+-------+--------+---------+--------------+----------+
|  Price|Bedrooms|Bathrooms|Square_Footage|Year_Built|
+-------+--------+---------+--------------+----------+
| 399.99|     1.0|      1.0|         787.0|    1948.0|
|  357.0|     4.0|      2.0|         929.0|    1954.0|
|  499.6|     5.0|      2.0|         1.161|    1959.0|
|  398.8|     4.0|      3.0|         1.217|    1976.0|
|  279.9|     3.0|      2.0|          1.16|    1979.0|
|  292.0|     3.0|      2.0|         914.0|    1955.0|
| 314.85|     3.0|      2.0|         1.119|    1967.0|
|  189.9|     3.0|      1.0|         694.0|    1940.0|
|238.888|     4.0|      0.0|         926.0|    1959.0|
|  219.9|     3.0|      1.0|         1.109|    1942.0|
|  289.0|     4.0|      2.0|         1.401|    1945.0|
|  419.0|     3.0|      2.0|         815.0|    1959.0|
|  397.5|     4.0|      2.0|         992.0|    1953.0|
|  334.9|     3.0|      2.0|         1.038|    1980.0|
|  530.0|     4.0|      3.0|         1.766|    1999.0|
|  235.0| 

In [405]:
# Visualizar os tipos de dados por coluna
df_selected_housing

DataFrame[Price: float, Bedrooms: float, Bathrooms: float, Square_Footage: float, Year_Built: float]

In [406]:
# retirando linhas com valores nulos
df_selected_no_nulls = df_selected_housing.na.drop()

#número de linhas
num_linhas_no_null = df_selected_no_nulls.count()
print(f"O dataset possui {num_linhas_no_null} linhas.")

# Informação sobre quantas linhas com valores nulos foram retiradas
diferenca = num_linhas - num_linhas_no_null
print(f"Retirado o total de {diferenca} linha(s) com valores nulos.")

O dataset possui 1615 linhas.
Retirado o total de 82 linha(s) com valores nulos.


Importação dos pacotes para trabalhar com vetores

In [407]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [408]:
# Gerar uma nova columa que representa o vetor de features

assembler = VectorAssembler(inputCols=["Bedrooms", "Bathrooms", "Square_Footage", "Year_Built"],outputCol="features")
print(assembler)

VectorAssembler_1f6d120b43b4


In [409]:
df_novo = assembler.transform(df_selected_no_nulls)
df_novo.show(truncate=False)

+-------+--------+---------+--------------+----------+-----------------------------------+
|Price  |Bedrooms|Bathrooms|Square_Footage|Year_Built|features                           |
+-------+--------+---------+--------------+----------+-----------------------------------+
|399.99 |1.0     |1.0      |787.0         |1948.0    |[1.0,1.0,787.0,1948.0]             |
|357.0  |4.0     |2.0      |929.0         |1954.0    |[4.0,2.0,929.0,1954.0]             |
|499.6  |5.0     |2.0      |1.161         |1959.0    |[5.0,2.0,1.1610000133514404,1959.0]|
|398.8  |4.0     |3.0      |1.217         |1976.0    |[4.0,3.0,1.2170000076293945,1976.0]|
|279.9  |3.0     |2.0      |1.16          |1979.0    |[3.0,2.0,1.159999966621399,1979.0] |
|292.0  |3.0     |2.0      |914.0         |1955.0    |[3.0,2.0,914.0,1955.0]             |
|314.85 |3.0     |2.0      |1.119         |1967.0    |[3.0,2.0,1.11899995803833,1967.0]  |
|189.9  |3.0     |1.0      |694.0         |1940.0    |[3.0,1.0,694.0,1940.0]             |

In [410]:
df_novo = df_novo.withColumnRenamed("Price", "label")
df_novo.show()

+-------+--------+---------+--------------+----------+--------------------+
|  label|Bedrooms|Bathrooms|Square_Footage|Year_Built|            features|
+-------+--------+---------+--------------+----------+--------------------+
| 399.99|     1.0|      1.0|         787.0|    1948.0|[1.0,1.0,787.0,19...|
|  357.0|     4.0|      2.0|         929.0|    1954.0|[4.0,2.0,929.0,19...|
|  499.6|     5.0|      2.0|         1.161|    1959.0|[5.0,2.0,1.161000...|
|  398.8|     4.0|      3.0|         1.217|    1976.0|[4.0,3.0,1.217000...|
|  279.9|     3.0|      2.0|          1.16|    1979.0|[3.0,2.0,1.159999...|
|  292.0|     3.0|      2.0|         914.0|    1955.0|[3.0,2.0,914.0,19...|
| 314.85|     3.0|      2.0|         1.119|    1967.0|[3.0,2.0,1.118999...|
|  189.9|     3.0|      1.0|         694.0|    1940.0|[3.0,1.0,694.0,19...|
|238.888|     4.0|      0.0|         926.0|    1959.0|[4.0,0.0,926.0,19...|
|  219.9|     3.0|      1.0|         1.109|    1942.0|[3.0,1.0,1.108999...|
|  289.0|   

In [411]:
df_novo = df_novo.select("label", "features")
df_novo.show(truncate=False)

+-------+-----------------------------------+
|label  |features                           |
+-------+-----------------------------------+
|399.99 |[1.0,1.0,787.0,1948.0]             |
|357.0  |[4.0,2.0,929.0,1954.0]             |
|499.6  |[5.0,2.0,1.1610000133514404,1959.0]|
|398.8  |[4.0,3.0,1.2170000076293945,1976.0]|
|279.9  |[3.0,2.0,1.159999966621399,1979.0] |
|292.0  |[3.0,2.0,914.0,1955.0]             |
|314.85 |[3.0,2.0,1.11899995803833,1967.0]  |
|189.9  |[3.0,1.0,694.0,1940.0]             |
|238.888|[4.0,0.0,926.0,1959.0]             |
|219.9  |[3.0,1.0,1.1089999675750732,1942.0]|
|289.0  |[4.0,2.0,1.4010000228881836,1945.0]|
|419.0  |[3.0,2.0,815.0,1959.0]             |
|397.5  |[4.0,2.0,992.0,1953.0]             |
|334.9  |[3.0,2.0,1.0379999876022339,1980.0]|
|530.0  |[4.0,3.0,1.7660000324249268,1999.0]|
|235.0  |[3.0,1.0,958.0,1955.0]             |
|950.0  |[4.0,4.0,2.0889999866485596,1958.0]|
|325.0  |[3.0,2.0,1.0230000019073486,1951.0]|
|330.0  |[4.0,2.0,1.11699998378753

Vamos verificar o número de linhas do dataframe

In [412]:
df_novo.count()

1615

Antes de treinar o modelo, vamos separar um conjunto de dados para servir de validação para o modelo.

In [413]:
(trainingData, testData) = df_novo.randomSplit([0.7, 0.3])
print(trainingData.count(), testData.count())

1101 514


In [414]:
trainingData.show(5)
testData.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 54.0|[1.0,1.0,578.0,19...|
| 55.0|[1.0,1.0,578.0,19...|
| 55.0|[1.0,1.0,646.0,19...|
| 56.0|[1.0,1.0,541.0,19...|
| 59.0|[1.0,1.0,532.0,19...|
+-----+--------------------+
only showing top 5 rows

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 60.1|[3.0,2.0,1.743999...|
| 64.8|[1.0,1.0,732.0,19...|
| 65.0|[1.0,1.0,390.0,19...|
| 65.7|[3.0,2.0,1.332000...|
| 67.5|[1.0,1.0,691.0,19...|
+-----+--------------------+
only showing top 5 rows



Vamos trabalhar com regressão linear para prever o Price baseado nas features

In [415]:
from pyspark.ml.regression import LinearRegression, LinearRegressionTrainingSummary
from pyspark.ml.evaluation import RegressionEvaluator

Instanciando o estimator.

In [416]:
lr = LinearRegression()

Treinando um modelo: geração de um modelo (transformer) ajustado aos dados da base de treinamento.

In [417]:
lrModelo = lr.fit(trainingData)

In [418]:
resultado = lrModelo.summary
print("Coefficients: %s" % str(lrModelo.coefficients))
print("Intercept: %s" % str(lrModelo.intercept))
print(resultado.meanSquaredError)
print(resultado.rootMeanSquaredError)
print(resultado.r2)

Coefficients: [31.121591429480663,101.64881320695609,-0.07998985324776334,-0.0036177045780663134]
Intercept: 68.05405646553267
19497.772221979034
139.63442348496676
0.563817736754264


Testando o modelo (transformer) na base de teste (validação)

In [419]:
df_resultado = lrModelo.transform(testData)
df_resultado.show(30, truncate=False)

+------+-----------------------------------+------------------+
|label |features                           |prediction        |
+------+-----------------------------------+------------------+
|60.1  |[3.0,2.0,1.74399995803833,1979.0]  |357.41751750718595|
|64.8  |[1.0,1.0,732.0,1980.0]             |135.10883346003538|
|65.0  |[1.0,1.0,390.0,1972.0]             |162.49430490739496|
|65.7  |[3.0,2.0,1.3320000171661377,1979.0]|357.4504733219944 |
|67.5  |[1.0,1.0,691.0,1971.0]             |138.42097678439626|
|69.0  |[1.0,1.0,662.0,1979.0]             |140.71174089195688|
|69.0  |[2.0,1.0,715.0,1963.0]             |167.65175337255513|
|69.9  |[1.0,1.0,481.0,1970.0]             |155.22246367100462|
|69.9  |[1.0,1.0,704.0,1971.0]             |137.3811086921753 |
|72.5  |[1.0,1.0,690.0,1972.0]             |138.49734893306595|
|74.9  |[2.0,1.0,823.0,1983.0]             |158.94049513023538|
|75.0  |[1.0,1.0,807.0,1967.0]             |129.15662462596796|
|75.0  |[2.0,1.0,938.0,1973.0]          

Avaliando a qualidade do modelo na base de teste

In [420]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(df_resultado)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 138.507
