<img src="bridge.jpg" alt="concrete">

Concrete is the most important material in civil engineering. The concrete compressive strength is a highly nonlinear function of age and ingredients. These ingredients include cement, blast furnace slag, fly ash, 
water, superplasticizer, coarse aggregate, and fine aggregate. You will use these data to predict the compresive strength of a concrete block. The actual concrete compressive strength (MPa) for a given mixture  - our training  data was determined in a laboratory.   Data from [here](https://archive.ics.uci.edu/ml/datasets/Concrete+Compressive+Strength)
  
We now want to be able to predict concrete compressive strength without needing to measure it in a lab. You will need to read the data into spark, clean it by removing some missing values, and prepare it for model fitting. You will then need to fit an appropriate machine learning model, and output your predictions and saved model.  
  
You can find the data in the file **concrete.csv**. Once you have built your best model with these data. Please make predictions on these new data **concrete_unmeasured.csv** for which we do not know the concrete compressive strength.  

### Start spark app

In [1]:
import findspark
findspark.init('/opt/spark')

In [2]:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("hlt_group_k") \
    .getOrCreate()

### Load and clean data

In [3]:
from pyspark.sql.types import DoubleType, IntegerType

In [4]:
data = spark.read.csv("concrete.csv",header=True)

In [5]:
data.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|        540|                      0|           0|       162|                  2.5|                 1040|                676|      28|                      79.98611076|
|        540|                      0|           0|       162|                  2.5|                 1055|                676|      28|                      61.88736576|
|      332.5|                  142.5|           0|       228|                    0|                  932|                594|     270|                     

In [6]:
data.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)
 |-- Concrete_compressive_strength_MPa: string (nullable = true)



In [7]:
data = data.dropna()

In [8]:
#convert all columns
for col_name in data.columns:
    data = data.withColumn(col_name, data[col_name].cast(DoubleType()))

In [9]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [10]:
data.printSchema()

root
 |-- Cement_kgm3: double (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: double (nullable = true)
 |-- Fly_Ash_kgm3: double (nullable = true)
 |-- Water_kgm3: double (nullable = true)
 |-- Superplasticizer_kgm3: double (nullable = true)
 |-- Coarse_Aggregate_kgm3: double (nullable = true)
 |-- Fine_Aggregate_kgm3: double (nullable = true)
 |-- Age_days: double (nullable = true)
 |-- Concrete_compressive_strength_MPa: double (nullable = true)



In [11]:
data = data.dropna()

In [12]:
data.printSchema()

root
 |-- Cement_kgm3: double (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: double (nullable = true)
 |-- Fly_Ash_kgm3: double (nullable = true)
 |-- Water_kgm3: double (nullable = true)
 |-- Superplasticizer_kgm3: double (nullable = true)
 |-- Coarse_Aggregate_kgm3: double (nullable = true)
 |-- Fine_Aggregate_kgm3: double (nullable = true)
 |-- Age_days: double (nullable = true)
 |-- Concrete_compressive_strength_MPa: double (nullable = true)



### Prepare data for model

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [14]:
# assemble variables to one feature column
assembler = VectorAssembler(
    inputCols = ["Cement_kgm3","Blast_Furnace_Slag_kgm3","Fly_Ash_kgm3","Water_kgm3","Superplasticizer_kgm3","Coarse_Aggregate_kgm3","Fine_Aggregate_kgm3","Age_days"],
    outputCol = "features")

#define the estimator - decision tree
dt = DecisionTreeRegressor(labelCol="Concrete_compressive_strength_MPa", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

### Fit pipeline and transform data

In [15]:
#fit the pipeline
PipelineModel = pipeline.fit(trainingData)

# transform using the pipeline
predictions = PipelineModel.transform(testData)

# evaluate model fit
predictions.select("prediction", "Concrete_compressive_strength_MPa")
evaluator = RegressionEvaluator(
    labelCol="Concrete_compressive_strength_MPa", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [16]:
predictions.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|      102.0|                  153.0|         0.0|     192.0|                  0.0|                887.0|              942.0|     3.0|                      4.565020596|[102.0,153.0,0.0,...|12.013511651966338|
|      108.3|                  162.4|         0.0|     203.5|                  0.0|                938.2|              849.0|     3.0|                      2.331807

In [17]:
predictions.select("prediction", "Concrete_compressive_strength_MPa").show()

+------------------+---------------------------------+
|        prediction|Concrete_compressive_strength_MPa|
+------------------+---------------------------------+
|12.013511651966338|                      4.565020596|
|12.013511651966338|                      2.331807832|
|28.872460687789474|                     29.231713972|
|12.013511651966338|                     10.089791784|
|28.872460687789474|                     22.347985588|
|28.872460687789474|                     31.023662096|
|28.872460687789474|                     24.290928956|
|28.872460687789474|                       33.3016908|
|12.013511651966338|                     13.664035368|
|28.872460687789474|                     36.588422892|
|28.872460687789474|                     28.937997196|
|28.872460687789474|                      26.96540636|
|36.152674059999995|                     29.073134492|
|28.872460687789474|                     44.207822168|
|28.872460687789474|                     44.698039604|
|28.872460

In [18]:
##Root mean square error
print(rmse)

8.121616052463242


In [21]:
#save the fitted pipeline for later use
#PipelineModel.save("my_pipeline")

### Predict new data

#### Prepare Data

In [26]:
new_data = spark.read.csv("concrete_unmeasured.csv",header=True)

In [27]:
new_data.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)



In [32]:
new_data = new_data.dropna()

In [33]:
#convert all columns
for col_name in new_data.columns:
    new_data = new_data.withColumn(col_name, new_data[col_name].cast(DoubleType()))

In [34]:
new_data = new_data.dropna()

In [35]:
new_data.printSchema()

root
 |-- Cement_kgm3: double (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: double (nullable = true)
 |-- Fly_Ash_kgm3: double (nullable = true)
 |-- Water_kgm3: double (nullable = true)
 |-- Superplasticizer_kgm3: double (nullable = true)
 |-- Coarse_Aggregate_kgm3: double (nullable = true)
 |-- Fine_Aggregate_kgm3: double (nullable = true)
 |-- Age_days: double (nullable = true)



#### Model Prediction

In [36]:
new_predictions = PipelineModel.transform(new_data)

In [37]:
predictions.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|      102.0|                  153.0|         0.0|     192.0|                  0.0|                887.0|              942.0|     3.0|                      4.565020596|[102.0,153.0,0.0,...|12.013511651966338|
|      108.3|                  162.4|         0.0|     203.5|                  0.0|                938.2|              849.0|     3.0|                      2.331807

### END

In [38]:
spark.stop()