In [1]:
#Refernce tutorial https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning
import findspark
findspark.init("/home/kyesh/Programs/spark-2.3.0-bin-hadoop2.7/")

In [2]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [3]:
# Load in the data
rdd = sc.textFile('/home/kyesh/Downloads/titanic/train.csv')

# Load in the header
header = sc.textFile('/home/kyesh/Downloads/titanic/trainHeader.csv')

In [4]:
header.collect()

[u'PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked']

In [5]:
rdd.take(2)

[u'1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S',
 u'2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C']

In [6]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

[[u'1',
  u'0',
  u'3',
  u'"Braund',
  u' Mr. Owen Harris"',
  u'male',
  u'22',
  u'1',
  u'0',
  u'A/5 21171',
  u'7.25',
  u'',
  u'S'],
 [u'2',
  u'1',
  u'1',
  u'"Cumings',
  u' Mrs. John Bradley (Florence Briggs Thayer)"',
  u'female',
  u'38',
  u'1',
  u'0',
  u'PC 17599',
  u'71.2833',
  u'C85',
  u'C']]

In [7]:
# Inspect the first line 
rdd.first()

# Take top elements
#rdd.top(2)

[u'1',
 u'0',
 u'3',
 u'"Braund',
 u' Mr. Owen Harris"',
 u'male',
 u'22',
 u'1',
 u'0',
 u'A/5 21171',
 u'7.25',
 u'',
 u'S']

In [10]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(PassengerId=line[0], 
                              Survived=line[1], 
                              Pclass=line[2],
                              Name1=line[3],
                              Name2=line[4],
                              Sex=line[5],
                              Age=line[6], 
                              SibSp=line[7],
                              Parch=line[8],
                              Ticket=line[9],
                              Fare=line[10],
                              Cabin=line[11],
                              Embarked=line[12])).toDF()

In [14]:
# Show the top 20 rows 
df.show(500)

+----+---------------+--------+--------+--------------------+--------------------+-----+-----------+------+------+-----+--------+------------------+
| Age|          Cabin|Embarked|    Fare|               Name1|               Name2|Parch|PassengerId|Pclass|   Sex|SibSp|Survived|            Ticket|
+----+---------------+--------+--------+--------------------+--------------------+-----+-----------+------+------+-----+--------+------------------+
|  22|               |       S|    7.25|             "Braund|    Mr. Owen Harris"|    0|          1|     3|  male|    1|       0|         A/5 21171|
|  38|            C85|       C| 71.2833|            "Cumings| Mrs. John Bradle...|    0|          2|     1|female|    1|       1|          PC 17599|
|  26|               |       S|   7.925|          "Heikkinen|        Miss. Laina"|    0|          3|     3|female|    0|       1|  STON/O2. 3101282|
|  35|           C123|       S|    53.1|           "Futrelle| Mrs. Jacques Hea...|    0|          4|     1

In [15]:
# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Name1: string (nullable = true)
 |-- Name2: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- PassengerId: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Ticket: string (nullable = true)



In [21]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [22]:
# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



In [23]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [24]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [25]:
df.describe().show()

+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.

In [26]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

In [27]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [28]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [29]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [30]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [31]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [42]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [43]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(0.8134519322113867, 0.14999),
 (1.3128391350327204, 0.14999),
 (1.8009833788708067, 0.14999),
 (0.8554237492441334, 0.283),
 (0.8192322857580903, 0.366)]

In [44]:
# Coefficients for the model
linearModel.coefficients

# Intercept for the model
#linearModel.intercept

DenseVector([0.0001, -0.0002, 0.0004, 0.4359, -0.0004, 0.0, 2.9623])

In [38]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

# Get the R2
linearModel.summary.r2

0.42282227755911483

In [16]:
spark.stop()
