# Del 10: Spark

Viri:
- [From Pandas to PySpark with Koalas](https://towardsdatascience.com/from-pandas-to-pyspark-with-koalas-e40f293be7c8)
- [Spark tips. DataFrame API](https://luminousmen.com/post/spark-tips-dataframe-api)
- [Scaling relational databases with Apache Spark SQL and DataFrames](https://opensource.com/article/19/3/sql-scale-apache-spark-sql-and-dataframes)
- [The Hitchhikers guide to handle Big Data using Spark](https://towardsdatascience.com/the-hitchhikers-guide-to-handle-big-data-using-spark-90b9be0fe89a)
- [1/3 - Things you need to know about Hadoop and YARN being a Spark developer](https://luminousmen.com/post/hadoop-yarn-spark)
- [2/3 - Spark core concepts explained](https://luminousmen.com/post/spark-core-concepts-explained)
- [3/3 - Spark. Anatomy of Spark application](https://luminousmen.com/post/spark-anatomy-of-spark-application)
- [Create your first ETL Pipeline in Apache Spark and Python](https://towardsdatascience.com/create-your-first-etl-pipeline-in-apache-spark-and-python-ec3d12e2c169)
- [Apache Spark Tutorial – Learn Spark from Experts](https://intellipaat.com/blog/tutorial/spark-tutorial/#Five-Vs-of-Big-Data)
- [First Steps With PySpark and Big Data Processing](https://realpython.com/pyspark-intro/)
- [Apache Spark in Python: Beginner's Guide](https://www.datacamp.com/community/tutorials/apache-spark-python)

## Big data 

### Whatis Big Data?
- Veliko različnih definiciji
- Big data is a term used to refer to the study and applications of data sets that are too complex for
traditional data-processing software

### The 3 V's of Big Data

Volume,Variety and Velocity
- Volume: Size ofthe data
- Variety: Different sources and formats
- Velocity: Speed of the data

### Big Data concepts and Terminology
- **Clustered computing**: Collection of resources of multiple machines
- **Parallel computing**: Simultaneous computation
- **Distributed computing**: Collection of nodes (networked computers) that run in parallel
- **Batch processing**: Breaking the job into small pieces and running them on individual machines
- **Real-time processing**: Immediate processing of data

### Big Data processing systems

#### Hadoop/MapReduce:

- Scalable and fault tolerant framework written in Java
- Open source
- Batch processing

#### Apache Spark

- General purpose and lightning fast cluster computing system
- Open source
- Both batch and real-time data processing

## Why Apache Spark over Hadoop?

Both Hadoop and Spark are open-source projects by the Apache Software Foundation, and they are the flagship products used for Big Data Analytics. The key difference between MapReduce and Spark is their approach toward data processing. Spark can perform in-memory processing, while Hadoop MapReduce has to read from and write to a disk. 

### Differences Between Hadoop and Spark

#### Speed

Spark is a general-purpose cluster computing tool. It runs applications up to 100 times faster in memory and 10 times faster on disk than Hadoop. For Spark, this is possible as it reduces the number of read/write cycles to disk and stores data in memory.

#### Easy to Manage

Spark can perform batch processing, interactive Data Analytics, Machine Learning, and streaming, everything in the same cluster. This functionality makes Apache Spark a complete Data Analytics engine. With Spark, there is no need for managing various Spark components for each task.

Hadoop MapReduce provides only the batch-processing engine. So, in Hadoop, we need a different engine for each task. It is very difficult to manage many components.

#### Real-time Analysis

Spark can easily process real-time data, i.e., real-time event streaming at a rate of millions of events/second, e.g., the data that is streaming live from Twitter, Facebook, Instagram, etc. Spark efficiently processes live streams.

Here, MapReduce fails as it cannot handle real-time data processing since it is meant to perform only batch processing on huge volumes of data.

<img src="images/Spark-vs-Hadoop.jpg" >

These are the major differences between Apache Spark and Hadoop. But, what if we use Apache Spark with Hadoop? When we use both technologies together, it provides a more powerful cluster computing with batch processing and real-time processing.

## Introduction to Apache Spark

- Distributed cluster computing framework
- Efcient in-memory computations for large data sets
- Lightning fast data processing framework
- Provides support for Java, Scala, Python, R and SQL

Spark is a platform for cluster computing. Spark lets you spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.

As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.

However, with greater computing power comes greater complexity.

Deciding whether or not Spark is the best solution for your problem takes some experience, but you can consider questions like:

Is my data too big to work with on a single machine?
Can my calculations be easily parallelized?


### What is Spark?

Apache Spark is an open-source cluster-computing framework for real-time processing developed by the Apache Software Foundation. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.

Below are some of the features of Apache Spark which gives it an edge over other frameworks:

<img src="images/py1.png">

<ul style="text-align: justify;"><li><span><strong>Speed:</strong> It is&nbsp;100x faster than traditional&nbsp;large-scale data processing frameworks.</span></li><li><span><strong>Powerful Caching:</strong> Simple programming layer provides powerful caching and disk persistence capabilities.</span></li><li><span><strong>Deployment:</strong>&nbsp;Can be deployed through Mesos, Hadoop via Yarn, or Spark’s own cluster manager.</span></li><li><span><strong>Real Time:</strong> </span>Real-time<span> computation &amp; low latency because of in-memory computation.</span></li><li><span><strong>Polyglot:</strong> It is one of the most important </span>features<span> of this framework as it can be programmed in Scala, Java, Python and R.</span></li></ul>

Apache Spark is a general-purpose & lightning fast cluster computing system. It provides a high-level API. For example, Java, Scala, Python, and R. Apache Spark is a tool for Running Spark Applications. Spark is 100 times faster than Bigdata Hadoop and 10 times faster than accessing data from disk.
Spark is written in Scala but provides rich APIs in Scala, Java, Python, and R.

### History Of Apache Spark

Apache Spark was introduced in 2009 in the UC Berkeley R&D Lab, later it becomes AMPLab. It was open sourced in 2010 under BSD license. In 2013 spark was donated to Apache Software Foundation where it became top-level Apache project in 2014.

###  Apache Spark Components

Apache Spark puts the promise for faster data processing and easier development. How Spark achieves this? To answer this question, let’s introduce the Apache Spark ecosystem which is the important topic in Apache Spark introduction that makes Spark fast and reliable. These components of Spark resolves the issues that cropped up while using Hadoop MapReduce.

<img src="images/Big-Data-processing-using-Apache-Spark-Introduction-Spark-components.png">

#### Spark Core

It is the kernel of Spark, which provides an execution platform for all the Spark applications. It is a generalized platform to support a wide array of applications.

#### Spark SQL

It enables users to run SQL/HQL queries on the top of Spark. Using Apache Spark SQL, we can process structured as well as semi-structured data. It also provides an engine for Hive to run unmodified queries up to 100 times faster on existing deployments.

#### Spark Streaming

Apache Spark Streaming enables powerful interactive and data analytics application across live streaming data. The live streams are converted into micro-batches which are executed on top of spark core.

#### Spark MLlib

It is the scalable machine learning library which delivers both efficiencies as well as the high-quality algorithm. Apache Spark MLlib is one of the hottest choices for Data Scientist due to its capability of in-memory data processing, which improves the performance of iterative algorithm drastically.

#### Spark GraphX

Apache Spark GraphX is the graph computation engine built on top of spark that enables to process graph data at scale.

### Spark modes of deployment

- **Local mode**: Single machine such as your laptop
    - Local model convenient for testing, debugging and demonstration
- **Cluster mode**: Set of pre-dened machines
    - Good for production



- Workow: Local -> clusters
- No code change necessary

## Apache Spark Architecture

Apache Spark has a well-defined layer architecture which is designed on two main abstractions

<ul><li><strong>Resilient Distributed Dataset (RDD)</strong>: RDD is an immutable (read-only), fundamental collection of elements or items that can be operated on many devices at the same time (parallel processing). Each dataset in an RDD can be divided into logical portions, which are then executed on different nodes of a cluster.</li><li><strong>Directed Acyclic Graph (DAG):</strong>Directed Acyclic Graph is the scheduling layer of Apache Spark Architecture that implements&nbsp;<strong>stage-oriented scheduling.</strong>&nbsp;Compared to MapReduce, which creates a graph in two stages, Map and Reduce, Apache Spark Architecture can create DAGs that contains many stages.</li></ul>

Apache Spark Framework uses a master–slave architecture which consists of a driver, which runs as a master node, and many executors which run across the worker nodes in the cluster. Apache Spark can be used for batch processing and real-time processing as well.

<img src="images/Spark-Arch.webp">

Apache Spark Architecture Driver Program calls the main program of an application and also creates the Spark Context. A Spark Context consists of all the basic functionalities. The Spark Driver also contains various other components like DAG Scheduler, Task Scheduler, Backend Scheduler, and Block Manager which are responsible for translating the user written code into jobs which are actually executed on the cluster.
The Spark Driver and the Spark Context collectively watch over the job execution within the cluster. The Spark Driver works with the Cluster Manager to manage various other jobs. Cluster Manager does all the resource allocating work. And then, the job is split into multiple smaller tasks which are further distributed onto the worker nodes.
Whenever an RDD is created in the Spark Context, it can be distributed across many worker nodes and can also be cached there.
Worker nodes execute the tasks which are assigned by the Cluster Manager and returns it back to the Spark Context.
Executor is responsible for the execution of these tasks. Lifetime of executors is same as that of the Spark Application. If you want to increase the performance of the system, you can increase the number of workers so that the jobs can be divided into more logical portions.

## Spark: Python or Scala?

In a nutshell, both languages have their advantages and disadvantages when you’re working with Spark. Deciding for one or the other depends on your projects’ needs, your own or your teams’ capabilities, … The general advice that is given is to use Scala unless you’re already proficient in it or if you don’t have much programming experience. That means that, in the end, it’s important that you know how to work with both!

## Big Data Concepts in Python

### Lambda Functions

In [1]:
x = ['Python', 'programming', 'is', 'awesome!']

In [1]:
y = [1,2,3,4,5,6,7]

def multiply(element):
    return element * 2

for x in y:
    print(multiply(x))
        
    
    

2
4
6
8
10
12
14


In [4]:
# ker se nam za eno tako kratko stvar ne splača pisati cele funkcije,
# se nam splača uporabiti lambda funkcijo (anonimna funkcija)

In [3]:
for x in y:
    print((lambda x: x*2)(x))

2
4
6
8
10
12
14


In [5]:
x = ['Python', 'programming', 'is', 'awesome!']

In [6]:
sorted(x)

['Python', 'awesome!', 'is', 'programming']

In [9]:
# pri sortiranju smo upoštevali velike in male črke, tega pa nočemo
# nočemo niti spremeniti dataseta

In [8]:
sorted(x, key = lambda arg: arg.lower())

['awesome!', 'is', 'programming', 'Python']

### filter(), map(), and reduce()

In [10]:
# to so funkcije za transformacijo podatkov
# velikokrat namesto tega uporabljamo list comprehension

In [11]:
x = ['Python', 'programming', 'is', 'awesome!']

You can imagine using filter() to replace a common for loop pattern like the following:

In [12]:
filter(lambda arg: len(arg) < 8, x)

<filter at 0x7f127272e5f8>

In [13]:
# moramo mu napisati še list, da deluje
list(filter(lambda arg: len(arg) < 8, x))

['Python', 'is']

In [14]:
# isti rezultat na klasičen način

def is_less_than_8_characters(item):
    return len(item) < 8

In [15]:
x = ['Python', 'programming', 'is', 'awesome!']
results = []

In [17]:
# klasika
for item in x:
    if is_less_than_8_characters(item):
        results.append(item)

In [18]:
print(results)

['Python', 'is']


In [19]:
# funkcija map() dobimo enako število vrednosti kot jih ima vhodni dataaset, samo da so mapirane

In [20]:
x = ['Python', 'programming', 'is', 'awesome!']

In [21]:
list(map(lambda arg: arg.upper(), x))

['PYTHON', 'PROGRAMMING', 'IS', 'AWESOME!']

In [22]:
# funkcija reduce(). Moramo najprej importati. Reduce vrne vedno en sam element, zato ne rabi list()

In [23]:
from functools import reduce

In [24]:
x = ['Python', 'programming', 'is', 'awesome!']

In [25]:
reduce(lambda val1, val2: val1+val2, x)

'Pythonprogrammingisawesome!'

## PySpark

- Apache Spark is written in Scala
- To support Python with Spark,Apache SparkCommunity released PySpark
- Similar computation speed and power as Scala
- PySparkAPIs are similar to Pandas and Scikit-learn

### Spark shell

In [26]:
# konzola, v kateri se pišejo ukazi

#### PySpark shell

### Installing Spark and PySpark

- [Installing Spark in Standalone Mode](http://www.informit.com/articles/article.aspx?p=2755929&seqNum=3)
- [Creating a Spark Standalone Cluster with Docker and docker-compose](https://medium.com/@marcovillarreal_40011/creating-a-spark-standalone-cluster-with-docker-and-docker-compose-ba9d743a157f)
- [Get Started with PySpark and Jupyter Notebook](https://www.sicara.ai/blog/2017-05-02-get-started-pyspark-jupyter-notebook-3-minutes)
- Uporaba oblačnih storitev

#### Priprava okolja

Registracija na strani [Databricks](https://community.cloud.databricks.com/)

1. Create new cluster
2. Create new notebook
3. Poimenujemo XXXXXX
4. Se povežemo v ustvarjen cluster (levo zgoraj)

### Introduction to PySpark

#### Connecting to a cluster

- SparkContext is an entry point into the world of Spark
- An entry point is a way of connecting to Spark cluster
- An entry point is like a key to the house
- PySpark has a default SparkContext called sc

#### Examining The SparkContext

In [None]:
# Verify SparkContext
print(sc)

In [None]:
# Print the version of SparkContext
print("The version of Spark Context in the PySpark shell is", sc.version)

In [None]:
# Print the Python version of SparkContext
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)

In [None]:
# Print the master of SparkContext
print("The master of Spark Context in the PySpark shell is", sc.master)

### Introduction to PySpark RDD

#### What is RDD?

- RDD = Resilient Distributed Datasets
- Resilient Distributed Datasets
    - Resilient: Ability to withstand failures
    - Distributed: Spanning across multiple machines
    - Datasets: Collection of partitioned data e.g,Arrays, Tables, Tuples etc.,

<img src="images/rdd.png">

#### Creating RDDs

- Parallelizing an existing collection of objects
- External datasets:
    - Files in HDFS
    - Objects in Amazon S3 bucket
    - lines in a text file
- From existing RDDs

- `Parallelized collection`
    - parallelize() for creating RDDs from python lists

In [None]:
numRDD = sc.parallelize([1,2,3,4])

In [None]:
helloRDD = sc.parallelize("Hello world")

In [None]:
type(helloRDD)

- `From external datasets`
    - textFile() for creating RDDs from external datasets

In [None]:
fileRDD = sc.textFile("/databricks-datasets/samples/docs/README.md")

In [None]:
type(fileRDD)

In [None]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object
print("The type of RDD is", type(RDD))

In [None]:
file_path = "/databricks-datasets/samples/docs/README.md"
# Print the file_path
print("The file_path is", file_path)

# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

#### Basic RDD Transformations and Actions


- Transformations create new RDDS
- Actions perform computation on the RDDs
- Transformations follow Lazy evaluation
- BasicRDD Transformations
    - map() , filter() , flatMap() , and union()

<img src="images/trans.png">

- `map()` transformation applies a function to all elements in the RDD

<img src="images/map.png">

- `collect()` return allthe elements ofthe dataset as an array

In [None]:
numRDD = sc.parallelize(list(range(1,10)))

# Create map() transformation to cube numbers
cubedRDD = numRDD.map(lambda x: x**3)

# Collect the results
numbers_all = cubedRDD.collect()

# Print the numbers from numbers_all
for numb in numbers_all:
    print(numb)

In [None]:
file_path = "/databricks-datasets/samples/docs/README.md"
fileRDD = sc.textFile(file_path)

# Filter the fileRDD to select lines with Spark keyword
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line)

# How many lines are there in fileRDD?
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())

# Print the first four lines of fileRDD
for line in fileRDD_filter.take(4): 
    print(line)

In [None]:
RDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))
RDD_flatmap.collect()

#### Example: Counting words in a document

In [None]:
file_path = "/databricks-datasets/samples/docs/README.md"

# Create a baseRDD from the file path
baseRDD = sc.textFile(file_path)

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split())

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

stop_words = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 'can', 'will', 'just', 'don', 'should', 'now']

# Convert the words in lower case and remove stop words from stop_words
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

# Display the first 10 words and their frequencies
for word in resultRDD.take(10):
    print(word)

# Swap the keys and values 
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
    print("{} has {} counts". format(word[1], word[0]))

### Abstracting Data with DataFrames


- PySpark SQL is a Spark library for structured data. It provides more information about the structure
of data and computation
- PySpark DataFrame is an immutable distributed collection of data with named columns
- Designed for processing both structured (e.g relational database) and semi-structured data (e.g
JSON)
- DataframeAPI is available in Python, R, Scala, and Java
- DataFrames in PySpark support both SQL queries ( SELECT * from table ) or expression methods
( df.select() )

#### Using DataFrames

#### Creating a SparkSession

- SparkContext is the main entry point for creating RDDs
- SparkSession provides a single point of entry to interact with Spark DataFrames
- SparkSession is used to create DataFrame, register DataFrames, execute SQL queries
- SparkSession is available in PySpark shell as spark

In [None]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

In [None]:
# Create my_spark
spark = SparkSession.builder.getOrCreate()

In [None]:
# Print my_spark
print(my_spark)

#### Viewing tables

In [None]:
# Print the tables in the catalog
print(spark.catalog.listTables())

In [None]:
# Print the tables in the catalog
print(spark.catalog.listTables())

#### Run SQL queries on tables

In [None]:
# Don't change this query
query = "FROM flights SELECT * LIMIT 10"

In [None]:
# Get the first 10 rows of flights
flights10 = spark.sql(query)

In [None]:
# Show the results
flights10.show()

- `printSchema()` operation prints the types of columns in the DataFrame

In [None]:
flights = spark.table("flights_small_csv")
flights.printSchema()

#### Pandafy a Spark DataFrame

In [None]:
# Don't change this query
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

In [None]:
# Run the query
flight_counts = spark.sql(query)

In [None]:
# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

In [None]:
# Print the head of pd_counts
print(pd_counts.head())

In [None]:
type(pd_counts)

#### Put pandas DataFrame into a Spark cluster

<p><img src="images/spark_figure.png" alt=""></p>

In [None]:
import pandas as pd
import numpy as np
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

In [None]:
# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

In [None]:
# Examine the tables in the catalog
print(spark.catalog.listTables())

In [None]:
# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView('temp')

In [None]:
# Construct a query to select the names of the people from the temporary table "people"
query = '''SELECT name FROM people'''

# Assign the result of Spark's query to people_df_names
people_df_names = spark.sql(query)

# Print the top 10 names of the people
people_df_names.show(10)

In [None]:
# Filter the people table to select female sex 
people_female_df = spark.sql('SELECT * FROM people WHERE sex=="female"')

# Filter the people table DataFrame to select male sex
people_male_df = spark.sql('SELECT * FROM people WHERE sex=="male"')

# Count the number of rows in both DataFrames
print("There are {} rows in the people_female_df and {} rows in the people_male_df DataFrames".format(people_female_df.count(), people_male_df.count()))

#### Import data into Spark

[Pyspark – Import any data](https://towardsdatascience.com/pyspark-import-any-data-f2856cda45fd)

In [None]:
# Don't change this file path
file_path = "/usr/local/share/datasets/airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path, header=True)

# Show the data
airports.show()

In [None]:
path_people = '/databricks-datasets/samples/people/people.json'
people = spark.read.json(path_people)

# Show the data
people.show()

Create a DataFrame from RDD

In [None]:
iphones_RDD = sc.parallelize([
("XS", 2018, 5.65, 2.79, 6.24),
("XR", 2018, 5.94, 2.98, 6.84),
("X10", 2017, 5.65, 2.79, 6.13),
("8Plus", 2017, 6.23, 3.07, 7.12)
])

names = ['Model', 'Year', 'Height', 'Width', 'Weight']
iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
type(iphones_df)
iphones_df.show()

### Manipulating data

- DataFrame operations: Transformations and Actions
- DataFrame Transformations:
    - select(), lter(), groupby(), orderby(), dropDuplicates() and withColumnRenamed()
- DataFrameActions :
    - printSchema(), head(), show(), count(), columns() and describe()

#### Creating columns

In [None]:
# Create the DataFrame flights
flights = spark.table("flights")

# Show the head
flights.show()

# Add duration_hrs
flights = flights.withColumn("duration_hrs", flights.air_time/60)

#### Filtering Data

In [None]:
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Print the data to check they're equal
long_flights1.show()
long_flights2.show()

#### Selecting

In [None]:
# Select the first set of columns
selected1 = flights.select("tailnum", "origin", "dest")

# Select the second set of columns
temp = flights.select(flights.origin, flights.dest, flights.carrier)

# Define first filter
filterA = flights.origin == "SEA"

# Define second filter
filterB = flights.dest == "PDX"

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)

#### Selecting II

In [None]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

#### Aggregating

In [None]:
# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin == "SEA").groupBy().max("air_time").show()

In [None]:
# Average duration of Delta flights
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").show()

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

#### Example: PySpark DataFrame subsetting and cleaning

In [None]:
people_df = spark.table("people")

# Select name, sex and date of birth columns
people_df_sub = people_df.select('name', 'sex', 'date of birth')

# Print the first 10 observations from people_df_sub
people_df_sub.show(10)

# Remove duplicate entries from people_df_sub
people_df_sub_nodup = people_df_sub.dropDuplicates()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(people_df_sub.count(), people_df_sub_nodup.count()))

#### Example: Filtering your DataFrame

In [None]:
# Filter people_df to select females 
people_df_female = people_df.filter(people_df.sex == "female")

# Filter people_df to select males
people_df_male = people_df.filter(people_df.sex == "male")

# Count the number of rows 
print("There are {} rows in the people_df_female DataFrame and {} rows in the people_df_male DataFrame".format(people_df_female.count(), people_df_male.count()))

### Data Visualization in PySpark using DataFrames

- Data visualization is a way of representing your data in graphs or charts
- Open source plotting tools to aid visualization in Python:
    - Matplotlib, Seaborn, Bokeh etc.,
- Plotting graphs using PySpark DataFrames is done using three methods
    - toPandas()
    - HandySpark library

#### Using Pandas for plotting DataFrames

In [None]:
import matplotlib.pyplot as plt
# Create a list of tuples
sample_list = [('Mona',20), ('Jennifer',34),('John',20), ('Jim',26)]

# Create a RDD from the list
rdd = sc.parallelize(sample_list)

# Create a PySpark DataFrame
names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])

# Check the column names of names_df
print("The column names of names_df are", names_df.columns)

# Convert to Pandas DataFrame  
df_pandas = names_df.toPandas()

# Create a horizontal bar plot
df_pandas.plot(kind='barh', x='Name', y='Age', colormap='winter_r')
display()