In [97]:
import findspark
findspark.init()

In [98]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [100]:
sc = SparkContext()

In [99]:
rdd1 = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])
rdd3 = sc.parallelize(range(100))

AttributeError: 'NoneType' object has no attribute 'sc'

In [5]:
rdd1.reduce(lambda x,y : x+y)

('a', 7, 'a', 2, 'b', 2)

In [6]:
rdd2.flatMapValues(lambda x : x).collect()

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

In [7]:
rdd = sc.textFile("datasets/CaliforniaHousing/cal_housing.data")
header = sc.textFile("datasets/CaliforniaHousing/cal_housing.domain")

In [8]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [11]:
rdd.take(4)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 '-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000',
 '-122.250000,37.850000,52.000000,1274.000000,235.000000,558.000000,219.000000,5.643100,341300.000000']

In [14]:
rdd = rdd.map(lambda line : line.split(","))

In [15]:
from pyspark.sql import Row

In [19]:
df = rdd.map(lambda line : Row(
                              longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8]
))

In [25]:
hasattr(rdd, "toDF")

False

In [26]:
spark = SparkSession(sc)

In [27]:
hasattr(rdd, "toDF")

True

In [28]:
df = df.toDF()

In [29]:
df.schema

StructType(List(StructField(longitude,StringType,true),StructField(latitude,StringType,true),StructField(housingMedianAge,StringType,true),StructField(totalRooms,StringType,true),StructField(totalBedRooms,StringType,true),StructField(population,StringType,true),StructField(households,StringType,true),StructField(medianIncome,StringType,true),StructField(medianHouseValue,StringType,true)))

In [30]:
df.show()

+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|  longitude| latitude|housingMedianAge| totalRooms|totalBedRooms| population| households|medianIncome|medianHouseValue|
+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|-122.230000|37.880000|       41.000000| 880.000000|   129.000000| 322.000000| 126.000000|    8.325200|   452600.000000|
|-122.220000|37.860000|       21.000000|7099.000000|  1106.000000|2401.000000|1138.000000|    8.301400|   358500.000000|
|-122.240000|37.850000|       52.000000|1467.000000|   190.000000| 496.000000| 177.000000|    7.257400|   352100.000000|
|-122.250000|37.850000|       52.000000|1274.000000|   235.000000| 558.000000| 219.000000|    5.643100|   341300.000000|
|-122.250000|37.850000|       52.000000|1627.000000|   280.000000| 565.000000| 259.000000|    3.846200|   342200.000000|
|-122.250000|37.850000|       52

In [31]:
df.columns

['longitude',
 'latitude',
 'housingMedianAge',
 'totalRooms',
 'totalBedRooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue']

In [32]:
df.dtypes

[('longitude', 'string'),
 ('latitude', 'string'),
 ('housingMedianAge', 'string'),
 ('totalRooms', 'string'),
 ('totalBedRooms', 'string'),
 ('population', 'string'),
 ('households', 'string'),
 ('medianIncome', 'string'),
 ('medianHouseValue', 'string')]

In [34]:
df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- totalRooms: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)



In [43]:
from pyspark.sql.types import *

df = df.withColumn("longitude", df["longitude"].cast(FloatType()))
df = df.withColumn("latitude", df["latitude"].cast(FloatType())) 
df =df.withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType()))  
df =df.withColumn("totalRooms", df["totalRooms"].cast(FloatType()))   
df=df.withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) 
df=df.withColumn("population", df["population"].cast(FloatType()))  
df=df.withColumn("households", df["households"].cast(FloatType()))  
df=df.withColumn("medianIncome", df["medianIncome"].cast(FloatType())) 
df=df.withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType())) 

In [45]:
df.show(5)

+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedRooms|population|households|medianIncome|medianHouseValue|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|  -122.23|   37.88|            41.0|     880.0|        129.0|     322.0|     126.0|      8.3252|        452600.0|
|  -122.22|   37.86|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|        358500.0|
|  -122.24|   37.85|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|        352100.0|
|  -122.25|   37.85|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|        341300.0|
|  -122.25|   37.85|            52.0|    1627.0|        280.0|     565.0|     259.0|      3.8462|        342200.0|
+---------+--------+----------------+----------+-------------+----------+-------

In [46]:
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- totalRooms: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)



In [47]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [55]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge", ascending = False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [57]:
df.describe().show()

+-------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|  housingMedianAge|        totalRooms|    totalBedRooms|        population|        households|      medianIncome|  medianHouseValue|
+-------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|  count|              20640|             20640|             20640|             20640|            20640|             20640|             20640|             20640|             20640|
|   mean|-119.56970444871473| 35.63186143109965|28.639486434108527|2635.7630813953488|537.8980135658915|1425.4767441860465| 499.5396802325581|3.8706710030346416|206855.81690891474|
| stddev| 2.0035317429328914|2.1359523806029554|12.585557612111613|2181.6152515827994|421.24790

In [59]:
from pyspark.sql.functions import *
df = df.withColumn("medianHouseValue", col("medianHouseValue") / 100000)

In [60]:
df.take(2)

[Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526),
 Row(longitude=-122.22000122070312, latitude=37.86000061035156, housingMedianAge=21.0, totalRooms=7099.0, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, medianHouseValue=3.585)]

In [61]:
# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))


In [62]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   

In [63]:
df.first()

Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [64]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [65]:
from pyspark.ml.linalg import DenseVector


In [69]:
input_data = df.rdd.map(lambda x : (x[0], DenseVector(x[1:])))

In [70]:
spark = SparkSession(sc)

In [71]:
df = spark.createDataFrame(input_data, ["label","features"])

In [72]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

In [73]:
stdscaler = StandardScaler(inputCol="features", outputCol="features_scaled")

scaler = stdscaler.fit(df)

scaled_df = scaler.transform(df)

In [76]:
scaled_df.collect()

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851])),
 Row(label=3.521, features=DenseVector([190.0, 496.0, 177.0, 7.2574, 8.2881, 2.8023, 0.1295]), features_scaled=DenseVector([0.451, 0.438, 0.463, 3.82, 3.3499, 0.2698, 2.2321])),
 Row(label=3.413, features=DenseVector([235.0, 558.0, 219.0, 5.6431, 5.8174, 2.5479, 0.1845]), features_scaled=DenseVector([0.5579, 0.4927, 0.5728, 2.9703, 2.3512, 0.2453, 3.179])),
 Row(label=3.422, features=DenseVector([280.0, 565.0, 259.0, 3.8462, 6.2819, 2.1815, 0.1721]), features_scaled=DenseVector([0.6647, 0.4989, 0.6774, 2.0245, 2.539, 0.21, 2.966])),
 Row(label=2.697, features=DenseVector([213.0, 413.0, 193.0, 4.0368, 4.7617, 2.1399, 0.2318]

In [77]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed = 1234)

In [78]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression


In [79]:
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam= 0.8)

In [80]:
linear_model = lr.fit(train_data)

In [81]:
pred = linear_model.transform(test_data)

In [82]:
pred.collect()

[Row(label=0.14999, features=DenseVector([239.0, 490.0, 164.0, 2.1, 3.7744, 2.9878, 0.3861]), features_scaled=DenseVector([0.5674, 0.4327, 0.4289, 1.1054, 1.5255, 0.2877, 6.6543]), prediction=1.5755352395152893),
 Row(label=0.225, features=DenseVector([73.0, 216.0, 63.0, 2.675, 4.6984, 3.4286, 0.2466]), features_scaled=DenseVector([0.1733, 0.1907, 0.1648, 1.408, 1.899, 0.3301, 4.2504]), prediction=1.7341233026776406),
 Row(label=0.388, features=DenseVector([263.0, 1274.0, 241.0, 1.9417, 4.9502, 5.2863, 0.2205]), features_scaled=DenseVector([0.6243, 1.125, 0.6303, 1.022, 2.0008, 0.509, 3.7994]), prediction=1.531875281086717),
 Row(label=0.394, features=DenseVector([310.0, 779.0, 275.0, 1.3289, 5.2291, 2.8327, 0.2156]), features_scaled=DenseVector([0.7359, 0.6879, 0.7193, 0.6995, 2.1135, 0.2727, 3.7153]), prediction=1.3628617899920583),
 Row(label=0.396, features=DenseVector([296.0, 1228.0, 289.0, 1.0513, 2.9516, 4.2491, 0.347]), features_scaled=DenseVector([0.7027, 1.0844, 0.7559, 0.553

In [83]:
preds = pred.select("prediction").rdd.map(lambda x : x[0])

labels = pred.select("label").rdd.map(lambda x : x[0])


In [84]:
pred_label = preds.zip(labels).collect()

In [85]:
pred_label[:5]

[(1.5755352395152893, 0.14999),
 (1.7341233026776406, 0.225),
 (1.531875281086717, 0.388),
 (1.3628617899920583, 0.394),
 (1.2862982565956895, 0.396)]

In [86]:
linear_model.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2758, 0.0, 0.0, 0.0])

In [87]:
linear_model.intercept

0.9963441266477807

In [94]:
linear_model.summary.r2

0.42060513220120754

In [95]:
linear_model.summary.rootMeanSquaredError

0.875727593215161

In [96]:
spark.stop()