# PySpark ML - Introduction

Spark is a framework for working with Big Data. In this chapter you'll cover some background about Spark and Machine Learning. You'll then find out how to connect to Spark using Python and load CSV data.

## Preparing the environment

### Importing libraries

In [1]:
import time

import pyspark
from pyspark.sql.types import (StructType, StructField,
                               DoubleType, IntegerType, StringType)
from pyspark.sql import SparkSession

### Connect to Spark

In [2]:
spark = (SparkSession.builder
                     .master('local[*]') \
                     .appName('spark_application') \
                     .config("spark.sql.repl.eagerEval.enabled", True)  # eval DataFrame in notebooks
                     .getOrCreate())

sc = spark.sparkContext
print(f'Spark version: {spark.version}')

Spark version: 3.5.1


### Loading data

In [3]:
schema_df = StructType([
    StructField("mon", IntegerType()),
    StructField("dom", IntegerType()),
    StructField("dow", IntegerType()),
    StructField("carrier", StringType()),
    StructField("flight", IntegerType()),
    StructField("org", StringType()),
    StructField("mile", IntegerType()),
    StructField("depart", DoubleType()),
    StructField("duration", IntegerType()),
    StructField("delay", IntegerType())
])

flights_data = spark.read.csv('data-sources/flights.csv', header=True, schema=schema_df, nullValue='NA')
flights_data.createOrReplaceTempView("flights")
flights_data.printSchema()
flights_data.limit(2)

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)



mon,dom,dow,carrier,flight,org,mile,depart,duration,delay
11,20,6,US,19,JFK,2153,9.48,351,
0,22,2,UA,1107,ORD,316,16.33,82,30.0


In [4]:
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

sms_data = spark.read.csv("data-sources/sms.csv", sep=';', header=False, schema=schema)
sms_data.createOrReplaceTempView("sms")
sms_data.printSchema()
sms_data.limit(2)

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



id,text,label
1,"Sorry, I'll call ...",0
2,Dont worry. I gue...,0


In [5]:
schema_df = StructType([
    StructField("maker", StringType()),
    StructField("model", StringType()),
    StructField("origin", StringType()),
    StructField("type", StringType()),
    StructField("cyl", IntegerType()),
    StructField("size", DoubleType()),
    StructField("weight", IntegerType()),
    StructField("length", DoubleType()),
    StructField("rpm", IntegerType()),
    StructField("consumption", DoubleType())
])

cars_data = spark.read.csv('data-sources/cars.csv', header=True, schema=schema_df, nullValue='NA')
cars_data.createOrReplaceTempView("cars")
cars_data.printSchema()
cars_data.limit(2)

root
 |-- maker: string (nullable = true)
 |-- model: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- type: string (nullable = true)
 |-- cyl: integer (nullable = true)
 |-- size: double (nullable = true)
 |-- weight: integer (nullable = true)
 |-- length: double (nullable = true)
 |-- rpm: integer (nullable = true)
 |-- consumption: double (nullable = true)



maker,model,origin,type,cyl,size,weight,length,rpm,consumption
Mazda,RX-7,non-USA,Sporty,,1.3,2895,169.0,6500,4.0
Geo,Metro,non-USA,Small,3.0,1.0,1695,151.0,5700,2.0


### Tables catalogue

In [6]:
spark.catalog.listTables()

[Table(name='cars', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='sms', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

## Ex. 1 - Creating a SparkSession

In this exercise, you'll spin up a local Spark cluster using all available cores. The cluster will be accessible via a SparkSession object.

The SparkSession class has a builder attribute, which is an instance of the Builder class. The Builder class exposes three important methods that let you:

- specify the location of the master node;
- name the application (optional); and
- retrieve an existing `SparkSession` or, if there is none, create a new one.

The `SparkSession` class has a version attribute which gives the version of Spark. Note: The version can also be accessed via the `__version__` attribute on the `pyspark` module.

Once you are finished with the cluster, it's a good idea to shut it down, which will free up its resources, making them available for other processes.

**Instructions:**

1. Import the `SparkSession` class from `pyspark.sql`. Already done!
2. Create a `SparkSession` object connected to a local cluster. Use all available cores. Name the application. Already Done!
3. Use the `version` attribute on the SparkSession object to retrieve the version of Spark running on the cluster. 
4. Shut down the cluster. Already done!

In [7]:
# What version of Spark?
print(spark.version)
print(pyspark.__version__)

3.5.1
3.5.1


## Loading Data

### Reading data from CSV

In [8]:
start = time.time()
df = spark.read.csv('data-sources/cars.csv', header=True)
df.printSchema()
print('Consumed time: ', time.time()-start)
df.limit(2)

root
 |-- mfr: string (nullable = true)
 |-- mod: string (nullable = true)
 |-- org: string (nullable = true)
 |-- type: string (nullable = true)
 |-- cyl: string (nullable = true)
 |-- size: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- len: string (nullable = true)
 |-- rpm: string (nullable = true)
 |-- cons: string (nullable = true)

Consumed time:  0.22862529754638672


mfr,mod,org,type,cyl,size,weight,len,rpm,cons
Mazda,RX-7,non-USA,Sporty,,1.3,2895,169,6500,4.0
Geo,Metro,non-USA,Small,3.0,1.0,1695,151,5700,2.0


In [9]:
start = time.time()
df = spark.read.csv('data-sources/cars.csv', header=True, inferSchema=True, nullValue='NA')
df.printSchema()
print('Consumed time: ', time.time()-start)
df.limit(2)

root
 |-- mfr: string (nullable = true)
 |-- mod: string (nullable = true)
 |-- org: string (nullable = true)
 |-- type: string (nullable = true)
 |-- cyl: integer (nullable = true)
 |-- size: double (nullable = true)
 |-- weight: integer (nullable = true)
 |-- len: integer (nullable = true)
 |-- rpm: integer (nullable = true)
 |-- cons: double (nullable = true)

Consumed time:  0.18919014930725098


mfr,mod,org,type,cyl,size,weight,len,rpm,cons
Mazda,RX-7,non-USA,Sporty,,1.3,2895,169,6500,4.0
Geo,Metro,non-USA,Small,3.0,1.0,1695,151,5700,2.0


In [10]:
schema_df = StructType([
    StructField("maker", StringType()),
    StructField("model", StringType()),
    StructField("origin", StringType()),
    StructField("type", StringType()),
    StructField("cyl", IntegerType()),
    StructField("size", DoubleType()),
    StructField("weight", IntegerType()),
    StructField("length", DoubleType()),
    StructField("rpm", IntegerType()),
    StructField("consumption", DoubleType())
])

start = time.time()
df = spark.read.csv('data-sources/cars.csv', header=True, schema=schema_df, nullValue='NA')
df.printSchema()
print('Consumed time: ', time.time()-start)
df.limit(2)

root
 |-- maker: string (nullable = true)
 |-- model: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- type: string (nullable = true)
 |-- cyl: integer (nullable = true)
 |-- size: double (nullable = true)
 |-- weight: integer (nullable = true)
 |-- length: double (nullable = true)
 |-- rpm: integer (nullable = true)
 |-- consumption: double (nullable = true)

Consumed time:  0.014031410217285156


maker,model,origin,type,cyl,size,weight,length,rpm,consumption
Mazda,RX-7,non-USA,Sporty,,1.3,2895,169.0,6500,4.0
Geo,Metro,non-USA,Small,3.0,1.0,1695,151.0,5700,2.0


## Ex. 2 - Loading flights data

In this exercise you're going to load some airline flight data from a CSV file.

**Notes on CSV format:**
- fields are separated by a comma (this is the default separator) and
- missing data are denoted by the string 'NA'.

**Data dictionary:**
- `mon` — month (integer between 1 and 12)
- `dom` — day of month (integer between 1 and 31)
- `dow` — day of week (integer; 1 = Monday and 7 = Sunday)
- `carrier` — carrier (IATA code)
- `flight` — flight number
- `org` — origin airport (IATA code)
- `mile` — distance (miles)
- `depart` — departure time (decimal hour)
- `duration` — expected duration (minutes)
- `delay` — delay (minutes)

**Instructions:**

1. Read data from a CSV file called `'flights.csv'`. Assign data types to columns automatically. Deal with missing data.
2. How many records are in the data?
3. Take a look at the first five records.
4. What data types have been assigned to the columns? Do these look correct?

In [11]:
# Read data from CSV file
df = spark.read.csv('data-sources/flights.csv', sep=',', header=True, inferSchema=True, nullValue='NA')

# Get number of records
print("The data contain %d records." % df.count())

# View the first five records
df.show(5)

# Check column data types
print(df.dtypes)

The data contain 50000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| NULL|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| NULL|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


## Ex. 3 - Loading SMS spam data

You've seen that it's possible to infer data types directly from the data. Sometimes it's convenient to have direct control over the column types. You do this by defining an explicit schema.

The file sms.csv contains a selection of SMS messages which have been classified as either 'spam' or 'ham'. There are a total of 5574 SMS, of which 747 have been labelled as spam.

**Notes on CSV format:**
- no header record and
- fields are separated by a semicolon (this is not the default separator).

**Data dictionary:**
- id — record identifier
- text — content of SMS message
- label — spam or ham (integer; 0 = ham and 1 = spam)

**Instructions:**
1. Specify the data schema, giving columns names (`"id", "text", and "label"`) and column types.
2. Read data from a delimited file called "sms.csv".
3. Print the schema for the resulting DataFrame.

In [12]:
# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
df = spark.read.csv("data-sources/sms.csv", sep=';', header=False, schema=schema)

# Print schema of DataFrame
df.printSchema()
df.limit(2)

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



id,text,label
1,"Sorry, I'll call ...",0
2,Dont worry. I gue...,0


## Close session

In [13]:
spark.stop()