# Logistic Regression with Spark
What is Apache Spark [http://spark.apache.org/](http://spark.apache.org/)? Learn more about Apache Spark through [**Big Data University**](http://bigdatauniversity.com):

- [**Spark Fundamentals I**](http://bigdatauniversity.com/bdu-wp/bdu-course/spark-fundamentals/)
    - Describe what Spark is all about know why you would want to use Spark 
    - Use Resilient Distributed Datasets operations 
    - Use Scala, Java, or Python to create and run a Spark application 
    - Create applications using Spark SQL, MLlib, Spark Streaming, and GraphX 
    - Configure, monitor and tune Spark  
    
- [**Spark Fundamentals II**](http://bigdatauniversity.com/bdu-wp/bdu-course/spark-fundamentals-ii/) 
    - Apache Spark architecture overview 
    - Understanding input, partitioning, and parallelization 
    - Optimizations for efficiently operating on and joining multiple datasets 
    - Understanding how Spark instructions are translated into jobs and what causes multiple stages within a job 
    - Efficiently using Spark’s memory caching for iterative processing 
    - Developing, testing, and debugging Spark applications using SBT, Eclipse   
    


### Importing Needed packages

In [None]:
from pyspark.sql import SQLContext
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
import pandas as pd
import pylab as pl
import numpy as np
import matplotlib 
%matplotlib inline

### Downloading Data
To download the data, we will use !wget

In [None]:
!wget -O /resources/FuelConsumption.csv https://ibm.box.com/shared/static/ez95yurarnp0q31l9jl1ma51mh6qtxj2.csv


##Understanding the Data

###`FuelConsumption.csv`:
We have downloaded a fuel consumption dataset, **`FuelConsumption.csv`**, which contains model-specific fuel consumption ratings and estimated carbon dioxide emissions for new light-duty vehicles for retail sale in Canada. [Dataset source](http://open.canada.ca/data/en/dataset/98f1a129-f628-4ce4-b24d-6f16bf24dd64)


- **MAKE**
- **MODEL**
- **VEHICLE CLASS**
- **ENGINE SIZE**
- **CYLINDERS**
- **TRANSMISSION**
- **FUEL**
- **FUEL CONSUMPTION in CITY(L/100 km)** 
- **FUEL CONSUMPTION in HWY (L/100 km)** 
- **FUEL CONSUMPTION COMB (L/100 km)** 
- **FUEL CONSUMPTION COMB (mpg)** 
- **CO2 EMISSIONS (g/km)**


## Reading the data in

In [None]:
rawRDD = sc.textFile('/resources/FuelConsumption.csv') 
header = rawRDD.first() #extract header
header

In [None]:
carRDD = rawRDD.filter(lambda x:x !=header).map(lambda line: line.split(","))
carRDD.take(2)

### Data exploration
We use **column summary statistics** for RDD[Vector] through the function **colStats** available in **Statistics**.

In [None]:
sd=carRDD.map(lambda x: [float(x[4]),float(x[5]),float(x[12])])
summary =Statistics.colStats(sd)
print(summary.mean())
print(summary.variance())

### Preprocessing: Labeling dataset
We make a **labeled point** data type for regression. It includes a feature vector and a label (which is a floating-point value).

In [None]:
# Load and parse the data
def parseFeature(record):
    features = [record[4],record[5]]  # ENGINESIZE,CYLINDERS
    label =  [0,1][float(record[12])>256.22]  # 0:low, 1:High
    return LabeledPoint(label,features)
lblRDD=carRDD.map(parseFeature)

In [None]:
lblRDD.take(5)

### Preprocessing: Spliting dataset into train and test dtasets

In [None]:
lblRDD.count()

In [None]:
trainRDD,testRDD=lblRDD.randomSplit([0.7,0.3])
trainRDD.count()

In [None]:
testRDD.count()


### Modeling

In [None]:
# Build the model
model = LogisticRegressionWithLBFGS.train(trainRDD)
model

### Prediction

In [None]:
# Make prediction.
test_case=[5.7, 8.0]
prediction = model.predict(test_case)
print prediction

In [None]:
# Make prediction.
test_case=[4, 6.0]
prediction = model.predict(test_case)
print prediction

### Evaluation

In [None]:
# Evaluating the model on training data
labelsAndPreds = testRDD.map(lambda p: (p.label, model.predict(p.features)))
labelsAndPreds.take(5)

In [None]:
testErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(trainRDD.count())
print("Test Error = " + str(testErr))



### Saving and Loading the Model

In [None]:
# Save and load model
model.save(sc, "myModelPath2")
sameModel = LogisticRegressionModel.load(sc, "myModelPath2")

### Contact the Notebook Authors

1. **[Saeed Aghabozorgi](https://ca.linkedin.com/in/saeedaghabozorgi), Data Scientist, IBM.** saeed[at]ca.ibm.com  
1. **[Polong Lin](https://ca.linkedin.com/in/polonglin), Data Scientist, IBM.** polong[at]ca.ibm.com