In [0]:
paises = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/frodriguez@insulet.com/pai_ses-2.csv")

In [0]:
from pyspark.sql.functions import col

paises = paises.withColumn("total_cases_per_million", col("total_cases_per_million").cast("double"))
paises = paises.withColumn("total_deaths_per_million", col("total_deaths_per_million").cast("double"))
paises = paises.withColumn("diabetes_prevalence", col("diabetes_prevalence").cast("double"))
paises = paises.withColumn("hdi", col("hdi").cast("double"))
# casting strings for doubles

paises

In [0]:
paises = paises.na.drop()

## Train/Test Split

![](https://files.training.databricks.com/images/301/TrainTestSplit.png)

**Question**: Why is it necessary to set a seed? What happens if I change my cluster configuration?

In [0]:
trainDF, testDF = paises.randomSplit([.8, .2], seed=42)
print(trainDF.cache().count())

Let's change the # of partitions (to simulate a different cluster configuration), and see if we get the same number of data points in our training set.

In [0]:
trainRepartitionDF, testRepartitionDF = (paises
                                         .repartition(24)
                                         .randomSplit([.8, .2], seed=42))

print(trainRepartitionDF.count())

# we just simulated a different cluster
# despite same seed 42, we have 158 rows instead of 165 (previous cell) 

In [0]:
display(trainDF.select("total_cases_per_million", "hdi"))

total_cases_per_million,hdi
3955.901,0.511
1925.701,0.581
249140.316,0.868
74322.974,0.89
117328.592,0.845
42044.81,0.778
134626.375,0.922
58789.766,0.756
1681.704,0.433
164586.66,0.931


In [0]:
display(trainDF.select("total_cases_per_million", "hdi").summary())

summary,total_cases_per_million,hdi
count,147.0,147.0
mean,62302.86707482991,0.7251632653061223
stddev,63405.76922256815,0.1522297309006577
min,68.961,0.394
25%,7099.542,0.594
50%,48589.891,0.75
75%,99247.528,0.851
max,253504.891,0.957


In [0]:
display(trainDF)

code,continent,country,date,total_cases_per_million,total_deaths_per_million,population_density,cardiovasc_death_rate,diabetes_prevalence,life_expectancy,hdi
AFG,Asia,Afghanistan,09/12/2021,3955.901,183.781,54.422,597.029,9.59,64.83,0.511
AGO,Africa,Angola,09/12/2021,1925.701,51.159,23.89,276.045,3.94,61.15,0.581
AND,Europe,Andorra,09/12/2021,249140.316,1719.368,163.755,109.135,7.97,83.73,0.868
ARE,Asia,United Arab Emirates,09/12/2021,74322.974,215.092,112.442,317.84,17.26,77.97,0.89
ARG,South America,Argentina,09/12/2021,117328.592,2559.3,16.177,191.032,5.5,76.67,0.845
ATG,North America,Antigua and Barbuda,09/12/2021,42044.81,1185.074,231.845,191.511,13.17,77.02,0.778
AUT,Europe,Austria,09/12/2021,134626.375,1438.781,106.749,145.183,6.35,81.54,0.922
AZE,Asia,Azerbaijan,09/12/2021,58789.766,786.24,119.309,559.812,7.11,73.0,0.756
BDI,Africa,Burundi,09/12/2021,1681.704,3.101,423.062,293.068,6.05,61.58,0.433
BEL,Europe,Belgium,09/12/2021,164586.66,2360.919,375.564,114.898,4.29,81.63,0.931


In [0]:
from pyspark.ml.regression import LinearRegression

In [0]:
display(trainDF.select("total_cases_per_million", "hdi"))

total_cases_per_million,hdi
3955.901,0.511
1925.701,0.581
249140.316,0.868
74322.974,0.89
117328.592,0.845
42044.81,0.778
134626.375,0.922
58789.766,0.756
1681.704,0.433
164586.66,0.931


In [0]:
from pyspark.ml.feature import VectorAssembler

# we want to output the values of chosen columns into a single column, as a vector, and we will call it features
vecAssembler = VectorAssembler(inputCols=["hdi"], outputCol="features")

# transform takes a df and changes columns, or appends columns
# combining column values into a single column
vecTrainDF = vecAssembler.transform(trainDF)

In [0]:
display(vecTrainDF.select("total_cases_per_million", "features"))
# it looks funny, but the last element in features represents the vector
# so, it will be the value of HDI

total_cases_per_million,features
3955.901,"List(1, 1, List(), List(0.511))"
1925.701,"List(1, 1, List(), List(0.581))"
249140.316,"List(1, 1, List(), List(0.868))"
74322.974,"List(1, 1, List(), List(0.89))"
117328.592,"List(1, 1, List(), List(0.845))"
42044.81,"List(1, 1, List(), List(0.778))"
134626.375,"List(1, 1, List(), List(0.922))"
58789.766,"List(1, 1, List(), List(0.756))"
1681.704,"List(1, 1, List(), List(0.433))"
164586.66,"List(1, 1, List(), List(0.931))"


In [0]:
lr = LinearRegression(featuresCol="features", labelCol="total_cases_per_million")
lrModel = lr.fit(vecTrainDF)

## Inspect the model

In [0]:
m = lrModel.coefficients[0]
b = lrModel.intercept

print(f"The formula for the linear regression line is y = {m:.2f}x + {b:.2f}")

# En este caso nuestra x es el valor de HDI
# si un pais tiene 0.5, esperariamos -1271.55
# si tuviera 0.9, esperariamos 111,667.68

## Apply model to test set

In [0]:
vecTestDF = vecAssembler.transform(testDF)

predDF = lrModel.transform(vecTestDF)

predDF.select("hdi", "features", "total_cases_per_million", "prediction").show()

## Evaluate Model

Let's see how our linear regression model with just one variable does. Does it beat our baseline model?

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_cases_per_million", metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse}")

r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"r2 is {r2}")

El error es muy alto, pero la r2 mejoro

-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>