In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [4]:
!ls /usr/lib/jvm/

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [6]:
!pip install pyspark


Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 47.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044184 sha256=ab63247198a5b4b8ce61e06fa8375f168ef087c51e719bfe009830545e4db1e5
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


Configuring a SparkSession

The entry point to using Spark SQL is an object called SparkSession. It initiates a Spark Application which all the code for that Session will run on.

.builder — gives access to Builder API which is used to configure the session .

.master() — determines where the program will run; "local[*]" sets it to run locally on all cores but you can use "local[1]" to run on one core for example. In this case, our programs will be run on Google’s servers.

.appName() — optional method to name the Spark Application

.getOrCreate() — gets an existing SparkSession or creates new one if none exists

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Big_Data_Application").getOrCreate()

To open a local file on Google Colab you need to run the following code which will prompt you to select a file from your computer:

In [25]:
from google.colab import files
files.upload()

{}

Now load our data into a Spark DataFrame using the .read.csv()

In [9]:
data = spark.read.csv('test.csv',inferSchema=True, header=True)

Data Exploration

Now let’s move into understanding how we can get more familiar with our data!
The first thing we can do is check the shape of of our DataFrame. Unlike Pandas, there is no dedicated method for this but we can use the .count() and .columns() to retrieve the information ourselves.
The .count() method returns the number of rows in the DataFrame and .columns returns a list of column names.

In [10]:
data.count(), len(data.columns)

(418, 11)

Viewing DataFrames
To view a DataFrame, use the .show() method:

In [12]:
data.show(5)

+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| null|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| null|       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| null|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| null|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| null|       S|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



As you can see, running data.show(5) displayed the first 5 rows of our DataFrame, along with the header. Calling .show() with no parameters will return the first 20 records.


Let’s see what our data is comprised of using the .printSchema() method (alternatively you can use .dtypes):

In [16]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



We can also selectively choose which columns we want to display with the .select() method.
Included the truncate=False parameter that adjusts the size of columns to prevent values from being cut off

In [35]:
data.select("PassengerId","Name","Sex").show(15, truncate=False)

+-----------+-------------------------------------------------------+------+
|PassengerId|Name                                                   |Sex   |
+-----------+-------------------------------------------------------+------+
|892        |Kelly, Mr. James                                       |male  |
|893        |Wilkes, Mrs. James (Ellen Needs)                       |female|
|894        |Myles, Mr. Thomas Francis                              |male  |
|895        |Wirz, Mr. Albert                                       |male  |
|896        |Hirvonen, Mrs. Alexander (Helga E Lindqvist)           |female|
|897        |Svensson, Mr. Johan Cervin                             |male  |
|898        |Connolly, Miss. Kate                                   |female|
|899        |Caldwell, Mr. Albert Francis                           |male  |
|900        |Abrahim, Mrs. Joseph (Sophie Halaut Easu)              |female|
|901        |Davies, Mr. John Samuel                                |male  |

Summary Statistics/Information

We can use the .describe() method to get summary statistics on columns of our choosing:

In [18]:
data.describe(["Fare","Age"]).show()

+-------+------------------+------------------+
|summary|              Fare|               Age|
+-------+------------------+------------------+
|  count|               417|               332|
|   mean|  35.6271884892086|30.272590361445783|
| stddev|55.907576179973844|14.181209235624424|
|    min|               0.0|              0.17|
|    max|          512.3292|              76.0|
+-------+------------------+------------------+



We might also want to get some information on what age groups are in the data and how they are distributed. We can use a groupBy() for this and sort it using .orderBy()



In [19]:
data.groupBy("Age").count().orderBy("count", ascending=False).show(10)

+----+-----+
| Age|count|
+----+-----+
|null|   86|
|24.0|   17|
|21.0|   17|
|22.0|   16|
|30.0|   15|
|18.0|   13|
|27.0|   12|
|26.0|   12|
|23.0|   11|
|25.0|   11|
+----+-----+
only showing top 10 rows



Filtering DataFrames

Lets create a new DataFrame that has the null values for User_Score and User_Count, and the “tbd” values filtered out using the .filter() method.

condition1 returns True for any record that does not have a null value in Age or in Fare. condition2 returns True for any record that does not have “female” in Sex.

In [23]:
condition1 = (data.Age.isNotNull()) | (data.Fare.isNotNull())
condition2 = data.Sex != 'female'
data1 = data.filter(condition1).filter(condition2)
data1.show()

+-----------+------+--------------------+----+----+-----+-----+----------+-------+-----+--------+
|PassengerId|Pclass|                Name| Sex| Age|SibSp|Parch|    Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+----+----+-----+-----+----------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|male|34.5|    0|    0|    330911| 7.8292| null|       Q|
|        894|     2|Myles, Mr. Thomas...|male|62.0|    0|    0|    240276| 9.6875| null|       Q|
|        895|     3|    Wirz, Mr. Albert|male|27.0|    0|    0|    315154| 8.6625| null|       S|
|        897|     3|Svensson, Mr. Joh...|male|14.0|    0|    0|      7538|  9.225| null|       S|
|        899|     2|Caldwell, Mr. Alb...|male|26.0|    1|    1|    248738|   29.0| null|       S|
|        901|     3|Davies, Mr. John ...|male|21.0|    2|    0| A/4 48871|  24.15| null|       S|
|        902|     3|    Ilieff, Mr. Ylio|male|null|    0|    0|    349220| 7.8958| null|       S|
|        903|     1|

Building Models in PySpark

Building models in PySpark looks a little different than you might be used to, and you’ll see terms like Transformer, Estimator, and Param.


For an example of linear regression, let’s see if we can predict Fare from Pclass, Age and SibSp.

In [24]:
datam= data.select('Pclass','Age','SibSp','Fare')
datam.show()

+------+----+-----+-------+
|Pclass| Age|SibSp|   Fare|
+------+----+-----+-------+
|     3|34.5|    0| 7.8292|
|     3|47.0|    1|    7.0|
|     2|62.0|    0| 9.6875|
|     3|27.0|    0| 8.6625|
|     3|22.0|    1|12.2875|
|     3|14.0|    0|  9.225|
|     3|30.0|    0| 7.6292|
|     2|26.0|    1|   29.0|
|     3|18.0|    0| 7.2292|
|     3|21.0|    2|  24.15|
|     3|null|    0| 7.8958|
|     1|46.0|    0|   26.0|
|     1|23.0|    1|82.2667|
|     2|63.0|    1|   26.0|
|     1|47.0|    1| 61.175|
|     2|24.0|    1|27.7208|
|     2|35.0|    0|  12.35|
|     3|21.0|    0|  7.225|
|     3|27.0|    1|  7.925|
|     3|45.0|    0|  7.225|
+------+----+-----+-------+
only showing top 20 rows



VectorAssembler

The next step is to get our data into a form that PySpark can create a model with. To do this we use something called a VectorAssembler.

Here we’ve delineated what features we want our model to use as predictors so that VectorAssembler can take those columns and transform them into a single column (named “predictors”) that contains all the data we want to predict with.

In [43]:
from pyspark.ml.feature import VectorAssembler
inputcols = ["Pclass",  "Age", "SibSp"]
assembler = VectorAssembler(inputCols= inputcols, outputCol = "predictors")

Here we’ve delineated what features we want our model to use as predictors so that VectorAssembler can take those columns and transform them into a single column (named “predictors”) that contains all the data we want to predict with.

What VectorAssembler.transform() does is create a new DataFrame with a new column at the end where each row contains a list of all the features we included in the inputCols parameter when we created the assembler.

In [49]:
predictors = assembler.setHandleInvalid("skip").transform(datam)
predictors.columns

['Pclass', 'Age', 'SibSp', 'Fare', 'predictors']

The final step to getting our data ready to be used in a model is to collect the new predictions column we just made and Fare (our target variable) by themselves in a DataFrame.

In [50]:
model_data = predictors.select("predictors", "Fare")
model_data.show(5,truncate=False)

+--------------+-------+
|predictors    |Fare   |
+--------------+-------+
|[3.0,34.5,0.0]|7.8292 |
|[3.0,47.0,1.0]|7.0    |
|[2.0,62.0,0.0]|9.6875 |
|[3.0,27.0,0.0]|8.6625 |
|[3.0,22.0,1.0]|12.2875|
+--------------+-------+
only showing top 5 rows



Next is to split model_data into a training and testing set:

In [51]:
train_data,test_data = model_data.randomSplit([0.8,0.2])

In [55]:
train_data=train_data.na.drop()
test_data=test_data.na.drop()

Now to train the model!

In [56]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'predictors', labelCol = 'Fare')
lrModel = lr.fit(train_data)
pred = lrModel.evaluate(test_data)

We can also view the final predictions our model made:

The object named “pred” is a LinearRegressionSummary object and so to retrieve the DataFrame with predictions we call .predictions.show()

In [57]:
pred.predictions.show(5)

+--------------+-------+-----------------+
|    predictors|   Fare|       prediction|
+--------------+-------+-----------------+
|[1.0,17.0,0.0]|   47.1|85.51984845901637|
|[1.0,21.0,0.0]|  26.55|86.53135699305949|
|[1.0,22.0,0.0]|61.9792|86.78423412657025|
|[1.0,23.0,0.0]|   93.5|87.03711126008105|
|[1.0,25.0,1.0]|55.4417|97.49165067104053|
+--------------+-------+-----------------+
only showing top 5 rows



Model Evaluating

To get more detailed information on how our model performed, we can use RegressionEvaluator which is constructed like this:


In [58]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(
    labelCol="Fare", 
    predictionCol="prediction", 
    metricName="rmse")

Let’s compute some statistics for the model:

In [61]:
rmse = eval.evaluate(pred.predictions)
print("RMSE : ", rmse)

r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("R2 : ", r2)

RMSE :  38.386419215082974
R2 :  0.2822345780427099


From this we can interpret that our model tended to be about 38.386 dollars off from the actual Fare (according to rmse). The r² value for tells us that the predictors in our model are able to account for a little under 30% of the total variability in Fare. This was just a basic practice, and I recommend playing around with the model parameters and features and better Dataset for more practice!