# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# **PySpark**: The Apache Spark Python API"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Introduction\n",
    "\n",
    "This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. The Spark Cluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.1. Connection\n",
    "\n",
    "To connect to the Spark cluster, create a SparkSession object with the following params:\n",
    "\n",
    "+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);\n",
    "+ **master:** Spark Master URL, same used by Spark Workers;\n",
    "+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.\\\n",
    "        builder.\\\n",
    "        appName(\"pyspark-notebook\").\\\n",
    "        master(\"spark://spark-master:7077\").\\\n",
    "        config(\"spark.executor.memory\", \"512m\").\\\n",
    "        getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = spark.sparkContext"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "tickets = sc.textFile(\"/data/aircrafts_data.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['aircraft_code,model,range',\n",
       " '773,\"Boeing 777-300\",11100',\n",
       " '763,\"Boeing 767-300\",7900',\n",
       " 'SU9,\"Sukhoi Superjet-100\",3000',\n",
       " '320,\"Airbus A320-200\",5700',\n",
       " '321,\"Airbus A321-200\",5600',\n",
       " '319,\"Airbus A319-100\",6700',\n",
       " '733,\"Boeing 737-300\",4200',\n",
       " 'CN1,\"Cessna 208 Caravan\",1200',\n",
       " 'CR2,\"Bombardier CRJ-200\",2700']"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "tickets.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "names = tickets.map(lambda x: (x.split(\",\")[0] , x.split(\",\")[1]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "boeing = names.filter(lambda x: x[1].startswith('\\\"B'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('773', '\"Boeing 777-300\"'),\n",
       " ('763', '\"Boeing 767-300\"'),\n",
       " ('733', '\"Boeing 737-300\"'),\n",
       " ('CR2', '\"Bombardier CRJ-200\"')]"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "boeing.collect()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}

        config("spark.executor.memory", "512m").\
        getOrCreate()

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

In [2]:
sc = spark.sparkContext

In [11]:
tickets = sc.textFile("/data/aircrafts_data.csv")

In [12]:
tickets.collect()

['aircraft_code,model,range',
 '773,"Boeing 777-300",11100',
 '763,"Boeing 767-300",7900',
 'SU9,"Sukhoi Superjet-100",3000',
 '320,"Airbus A320-200",5700',
 '321,"Airbus A321-200",5600',
 '319,"Airbus A319-100",6700',
 '733,"Boeing 737-300",4200',
 'CN1,"Cessna 208 Caravan",1200',
 'CR2,"Bombardier CRJ-200",2700']

In [16]:
names = tickets.map(lambda x: (x.split(",")[0] , x.split(",")[1]))

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [25]:
boeing = names.filter(lambda x: x[1].startswith('\"B'))

In [26]:
boeing.collect()

[('773', '"Boeing 777-300"'),
 ('763', '"Boeing 767-300"'),
 ('733', '"Boeing 737-300"'),
 ('CR2', '"Bombardier CRJ-200"')]