Import packages

In [2]:
#https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler

Read epa data in from Azure Storage, cluster must be configured to access WASB

In [4]:
epaMpg = spark.read.csv('wasb://spark@dbshepstor.blob.core.windows.net/data/epaMpg.csv', header=True, inferSchema=True)
epaMpg.show()

In [5]:
type(epaMpg)

In [6]:
epaMpg.printSchema()

In [7]:
epaMpg.columns

In [8]:
epaMpg.corr('FuelEcon','Weight')

In [9]:
epaMpg.select("`Represented.Test.Veh.Make`").distinct().count()

In [10]:
epaMpg.filter(epaMpg.FuelEcon > 50).count()

In [11]:
epaMpg.first()

In [12]:

epaMpg.groupBy("`Represented.Test.Veh.Make`").agg({'FuelEcon': 'mean'}).collect()

In [13]:
%sql 
select `Represented.Test.Veh.Make`, avg(FuelEcon) as AvgEcon
from epaMpg
group by `Represented.Test.Veh.Make`
order by AvgEcon
limit 10 

Represented.Test.Veh.Make,AvgEcon
BUGATTI,9.8
LAMBORGHINI,12.87
Rolls Royce,14.48
BENTLEY,14.533333333333337
Aston Martin,16.9
Ferrari,17.139999999999997
MASERATI,18.6
FORD,19.2
Karma,19.2
Dodge,19.358823529411765


In [14]:
epaMpg.count()

In [15]:
epaMpg.count() - epaMpg.dropDuplicates().count()

In [16]:

carModels = epaMpg.select("`Model`", "`Represented.Test.Veh.Make`")
carModels.show()



Register a dataframe as a globalb temp table

In [18]:


carModels.createGlobalTempView("globalCarModels")




In [19]:

spark.sql("SELECT * FROM global_temp.globalCarModels").show()


In [20]:
#epaMpg.describe().show()

epaMpg.select("FuelEcon", "HorsePower", "Weight", "Gears","`Tested.Transmission.Type`").describe().show()


In [21]:
spark.sql("select * from epaMpg limit 10").show()

In [22]:
%sql 
select `Represented.Test.Veh.Make`,Model,HorsePower,FuelEcon
from epaMpg

order by HorsePower 

In [23]:
%sql 
select HorsePower,FuelEcon from epaMpg

In [24]:
epaMpg.corr("HorsePower","FuelEcon")

In [25]:
%sql 
select Fuelecon,Weight from epaMpg

In [26]:
%sql 
select HorsePower,Weight from epaMpg

In [27]:
%sql 
select `Represented.Test.Veh.Make`,Model,HorsePower,FuelEcon
from epaMpg
WHERE HorsePower > 400 and FuelEcon > 30
order by FuelEcon 

In [28]:
epaData = epaMpg.select("FuelEcon", "HorsePower", "Weight", "Gears")

In [29]:


splits = epaData.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print ("Training Rows:", train_rows, " Testing Rows:", test_rows)

In [30]:
type(splits)

In [31]:
train.show()

In [32]:
#"FuelEcon", "HorsePower", "Weight", "Gears","`Tested.Transmission.Type`"
# Now we'll assemble a vector of all the numeric feature columns (other than ArrDelay, which we wouldn't have for enroute flights)
assembler = VectorAssembler(inputCols = ["FuelEcon", "HorsePower", "Weight", "Gears"], outputCol="features")

type(assembler)

In [33]:
training = assembler.transform(train).select(col("features"), (col("FuelEcon").cast("Int").alias("label")))
training.show()

In [34]:
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(training)
print ("Model trained!")

In [35]:
# Generate the features vector and label
testing = assembler.transform(test).select(col("features"), (col("FuelEcon")).cast("Int").alias("trueLabel"))
testing.show()

In [36]:
prediction = model.transform(testing)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show()

In [37]:
predicted.createOrReplaceTempView("regressionPredictions")

In [38]:
%sql
SELECT trueLabel, prediction FROM regressionPredictions

trueLabel,prediction
9,7.670476333029644
11,10.450748084713
11,11.023120356636491
11,11.624527895622574
13,13.526379287150366
13,13.266406176460992
13,12.976977936255064
13,13.39407097600868
14,14.368052456107453
14,14.010236840947822


In [39]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predicted)
print ("Root Mean Square Error (RMSE):", rmse)