In [None]:
# TUTORIAL from https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning#basics

In [29]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression

In [2]:
# Create the spark context
sc = SparkContext("local", "Linear Regression Model")
spark = SparkSession(sc)

In [3]:
# Load the data
rdd = sc.textFile("/home/jmir/ai/learning/spark/cal_housing.data")

# Load the header
header = sc.textFile("/home/jmir/ai/learning/spark/cal_housing.domain")

In [4]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [5]:
rdd.take(5)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 '-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000',
 '-122.250000,37.850000,52.000000,1274.000000,235.000000,558.000000,219.000000,5.643100,341300.000000',
 '-122.250000,37.850000,52.000000,1627.000000,280.000000,565.000000,259.000000,3.846200,342200.000000']

In [6]:
# Split lines on commas (because it is a .csv)
rdd = rdd.map(lambda line: line.split(","))


In [7]:
# Inspect the same first 5 lines
rdd.take(5)  # or rdd.first() or rdd.top(5)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000'],
 ['-122.240000',
  '37.850000',
  '52.000000',
  '1467.000000',
  '190.000000',
  '496.000000',
  '177.000000',
  '7.257400',
  '352100.000000'],
 ['-122.250000',
  '37.850000',
  '52.000000',
  '1274.000000',
  '235.000000',
  '558.000000',
  '219.000000',
  '5.643100',
  '341300.000000'],
 ['-122.250000',
  '37.850000',
  '52.000000',
  '1627.000000',
  '280.000000',
  '565.000000',
  '259.000000',
  '3.846200',
  '342200.000000']]

In [8]:
# Let's construct a SchemaRDD to convert the RDD to a DataFrame

In [9]:
df = rdd.map(lambda line: Row(longitude=line[0],
                              latitude=line[1],
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()  # .toDF() requires a SparkSession (or SQLContext)!

In [10]:
# Let's inspect the df first.
# df.first()  or
# df.head(20)  or
# df.take(20)  or
df.show(20)  # this one preserves the crow/column format

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [11]:
df.columns  # gives you the columns of the DataFrame

['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

In [12]:
df.dtypes

[('households', 'string'),
 ('housingMedianAge', 'string'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('medianHouseValue', 'string'),
 ('medianIncome', 'string'),
 ('population', 'string'),
 ('totalBedRooms', 'string'),
 ('totalRooms', 'string')]

In [13]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



In [14]:
# We should assign the proper data types to the columns (we don't really want them to be strings!)
df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
   .withColumn("latitude", df["latitude"].cast(FloatType())) \
   .withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
   .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \
   .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \
   .withColumn("population", df["population"].cast(FloatType())) \
   .withColumn("households", df["households"].cast(FloatType())) \
   .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \
   .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

In [15]:
# But this method, using a User-Defined Function (UDF) looks prettier
def convertColumns(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df

In [16]:
# Assign column names to columns:
columns = df.columns

# Use the function!
df = convertColumns(df, columns, FloatType())

In [17]:
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



In [18]:
# All good now!
# Let's launch sql-like queries now
df.select("population" ,"totalBedrooms").show(10)

+----------+-------------+
|population|totalBedrooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [19]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge", ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [20]:
df.describe().show()

+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.

In [21]:
# Adjust the values of "medianHouseValue"
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
df.show(5)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|           3.521|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|           3.413|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|           3.422|      3.8462|     565.0|        280.0|    1627.0|
+----------+----------------+--------+---------+----------------+------------+--

In [22]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
       .withColumn("populationPerHousehold", col("population")/col("households")) \
       .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [23]:
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [24]:
df = df.select("medianHouseValue",
            "totalBedRooms",
            "population",
            "medianIncome",
            "roomsPerHousehold",
            "populationPerHousehold",
            "bedroomsPerRoom")

In [25]:
# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [26]:
df.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,8.32...|
|3.585|[1106.0,2401.0,8....|
|3.521|[190.0,496.0,7.25...|
|3.413|[235.0,558.0,5.64...|
|3.422|[280.0,565.0,3.84...|
+-----+--------------------+
only showing top 5 rows



In [27]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in "df" with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.show(5)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,8.32...|[0.30623297630686...|
|3.585|[1106.0,2401.0,8....|[2.62553233949916...|
|3.521|[190.0,496.0,7.25...|[0.45104081781631...|
|3.413|[235.0,558.0,5.64...|[0.55786627466754...|
|3.422|[280.0,565.0,3.84...|[0.66469173151877...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [55]:
# Split the data into train and test sets
train, test = scaled_df.randomSplit([.8, .2], seed=1234)

In [66]:
# Note that the argument elasticNetParam corresponds to α or the vertical 
# intercept and that the regParam or the regularization paramater corresponds to λ.

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=100, regParam=0.3, elasticNetParam=0.8)
# Fit the data to the model
linearModel = lr.fit(train)

In [67]:
# Generate predictions
pred = linearModel.transform(test)

# Extract predictions and labels
predictions = pred.select("prediction").rdd.map(lambda x: x[0])
labels = pred.select("label").rdd.map(lambda x: x[0])

In [68]:
# Zip `predictions` and `labels` into a list
predictionsAndLabel = predictions.zip(labels).collect()

predictionsAndLabel[:5]

[(1.133339359890201, 0.14999),
 (1.448056941581123, 0.14999),
 (1.5709834297182181, 0.14999),
 (1.7494269579295185, 0.283),
 (1.2432540600023263, 0.366)]

In [69]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.2798, 0.0, 0.0, 0.0])

In [70]:
# Intercept for the model
linearModel.intercept

0.9833539100326277

In [71]:
# RMSE
linearModel.summary.rootMeanSquaredError

0.8764168687315352

In [72]:
# R²
linearModel.summary.r2

0.42297595562448176

In [73]:
spark.stop()