# PySpark Introduction

Upload this jupyter notebook to Google drive, and open this tutorial with Google Colab

In [None]:
# install the dependencies:
%env spark_version=2.4.4
%env hadoop_version=2.7

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-${spark_version}/spark-${spark_version}-bin-hadoop${hadoop_version}.tgz
!tar xf spark-${spark_version}-bin-hadoop${hadoop_version}.tgz
!python --version

In [None]:
# set environment
import os

current_directory = os.getcwd()

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "{}/spark-{}-bin-hadoop{}".format(current_directory, os.environ["spark_version"], os.environ["hadoop_version"])

In [None]:
# install findspark pyspark PyArrow
!pip install findspark
!pip install pyspark
#!pip install PyArrow

In [None]:
import findspark
findspark.init() # os.environ["SPARK_HOME"]

from pyspark import SparkConf
from pyspark.sql import SparkSession
# create SparkConf
conf = SparkConf().setAppName('pyspark-app').setMaster('local[*]')
# create SparkSession instance
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

In [None]:
# get sparkContext from sparkSession
sc = spark.sparkContext
sc

# Part A: RDD

RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster. 

* Distributed: RDD are distributed in nature.
* Fault tolerant: In case of any failure, they recover automatically.
* Immutable in nature : We can create RDD once but can’t change it. And we can transform a RDD after applying transformations.
* Lazy Evaluations: Which means that a task is not executed until an action is performed.

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [None]:
# download data

import requests

url = "https://github.com/liuhoward/teaching/raw/master/big_data/"
blackfriday_file = "BlackFriday_lite.csv"

r = requests.get(url + blackfriday_file)
open(blackfriday_file, 'wb').write(r.content)

In [None]:
# read from local file

RDDread = sc.textFile (f"file:///{current_directory}/{blackfriday_file}")

# show 3 elements
RDDread.take(3)

In [None]:
# create RDD from list of strings
sentences = sc.parallelize(
   ["scala is easier than java", 
   "python is easier than java", 
   "hadoop is good", 
   "spark is easier than hadoop", 
   "spark vs hadoop", 
   "pyspark is python api for spark",
   "pyspark and spark"]
)

sentences.collect()

In [None]:
# create RDD from list of numbers

nums = sc.parallelize([1,2,3,4])

nums.collect()

RDDs have two sets of parallel operations:

* Transformation − These are the operations, which are applied on a RDD to create a new RDD. They return pointers to new RDDs without computing them, it rather waits for an action to compute itself. Such as
map()
flatMap()
filter()
sample()
union()
intersection()
distinct()
join()

* Action − These are the operations that are applied on RDD, which instructs Spark to perform computation and send the result back to the driver. The collect() funcion is an operation which retrieves all elements of the distributed RDD to the driver. Such as
reduce()
reduceByKey()
collect()
count() 
first()  

In [None]:
# count()
# Number of elements in the RDD is returned.
sentences.count()

In [None]:
# Action: max, min, sum, variance and stdev

nums.max(),nums.min(), nums.sum(),nums.variance(),nums.stdev() 

In [None]:
# filter(f)
# A new RDD is returned containing the elements, 
# which satisfies the function inside the filter. 

spark_sentences = sentences.filter(lambda x: 'spark' in x)
spark_sentences.collect()

In [None]:
# map(f)
# A new RDD is returned by applying a function to each element in the RDD.

# map sentence -> length
sentence_length = sentences.map(lambda x: len(x))
sentence_length.collect()


In [None]:
# map sentence -> (sentence, length)
sentence_length = sentences.map(lambda x: (x, len(x)))
sentence_length.collect()

In [None]:
# split each sentence by space
word_list = sentences.map(lambda x: x.split(' '))

word_list.collect()

In [None]:
# flatMap()
# This transformation apply changes to each line same as map 
# but the return is not a iterable of iterables but it is only an iterable holding entire RDD contents.

word_list1 = sentences.flatMap(lambda x: x.split(' '))
print(word_list1.collect())


In [None]:
# Union()
# Union is basically used to merge two RDDs together if they have the same structure.

A_marks = [("physics",85),("maths",75),("chemistry",95)]
B_marks = [("physics",65),("maths",45),("chemistry",85)]

A = sc.parallelize(A_marks)
B = sc.parallelize(B_marks)

A.union(B).collect()

In [None]:
# join()
# This transformation joins two RDDs based on a common key.

A_marks = [("physics",85),("maths",75),("chemistry",95)]
B_marks = [("physics",65),("maths",45),("chemistry",85)]

A = sc.parallelize(A_marks)
B = sc.parallelize(B_marks)

A.join(B).collect()

# what if we change "maths" in B_marks to "maths2"?

In [None]:
# intersection() Transformation in Spark
# Intersection gives you the common terms or objects from the two RDDS.

A_words = ['scala', 'is', 'easier', 'than', 'java']
B_words = ['python', 'is', 'easier', 'than', 'java']

A = sc.parallelize(A_words)
B = sc.parallelize(B_words)

AB_intersect = A.intersection(B)
AB_intersect.collect()

In [None]:
# distinct()

A_words = ['scala', 'is', 'easier', 'than', 'java', 'python', 'is', 'easier', 'than', 'java']

A = sc.parallelize(A_words)

distinct_A = A.distinct()
distinct_A.collect()

In [None]:
# reduce()
# The .reduce(function) transformation reduces all 
# elements of the RDD into one using a specific method.

nums = sc.parallelize([1,2,3,4])

adding = nums.reduce(lambda x,y: x+y)
print(adding)

In [None]:
# reduceByKey()
# The .reduceByKey() method works in a similar way to the .reduce(), 
# but it performs a reduction on a key-by-key basis.

pairs = [('a', 3), ('d', 4), ('a', 6)]
pairs_rdd = sc.parallelize(pairs)

new_pairs_count = pairs_rdd.reduceByKey(lambda x,y: x+y)
new_pairs_count.collect()

### word count using RDD

In [None]:
sentences = sc.parallelize(
   ["scala is easier than java", 
   "python is easier than java", 
   "hadoop is good", 
   "spark is easier than hadoop", 
   "spark vs hadoop", 
   "pyspark is python api for spark",
   "pyspark and spark"]
)
counts = sentences.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda x, y: x + y)

counts.collect()

# Part B: SparkSQL (Dataframe)

SparkSQL, Spark's interface for working with structured data. From Spark 2.0 and forward, this is the preferred way of implementing Spark code, as it contains all of the latest optimisations.

A **Dataset** is a distributed collection of data which provides the benefits of RDDs (strong typing, ability to use lambda functions) with the benefits of SparkSQL's optimized execution engine.

A **DataFrame** is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database, or a data frame in Python/R. Conceptually, a DataFrame is a Dataset of Rows.

As with **RDDs**, applications can create DataFrames from an existing RDD, a Hive table or from Spark data sources.

In Apache Spark, a DataFrame is a distributed collection of rows under named columns. In simple terms, it is same as a table in relational database or an Excel sheet with Column headers. It also shares some common characteristics with RDD:

* Distributed: RDD and DataFrame both are distributed in nature.
* Immutable in nature : We can create DataFrame / RDD once but can’t change it. And we can transform a DataFrame / RDD  after applying transformations.
* Lazy Evaluations: Which means that a task is not executed until an action is performed.

Advantages of DataFrames:

* DataFrames are designed for processing large collection of structured or semi-structured data.
* Observations in Spark DataFrame are organised under named columns, which helps Apache Spark to understand the schema of a DataFrame. This helps Spark optimize execution plan on these queries.
* DataFrame in Apache Spark has the ability to handle petabytes of data.
* DataFrame has a support for wide range of data format and sources.
* It has API support for different languages like Python, R, Scala, Java.

The Spark Python API (PySpark) exposes the Spark programming model to Python. PySpark benefits a lot from SparkSQL, as there is performance parity between Scala, Java, Python and R interfaces for Spark which use the same optimizer.

#### Create Dataframe 

In [None]:
# create Dataframe from RDD

from pyspark.sql import Row

data = [('Anna',25),('Jack',22),('Tom',20),('Andy',26)]

rdd = sc.parallelize(data)

people = spark.createDataFrame(rdd, ["name", "age"])

print(f"type: {type(people)}")
people.show()

In [None]:
# Q: what is the relationship between Dataframe and RDD?

people.collect()

In [None]:
from pyspark.sql.types import *

# define schema to restrict type
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)])

people = spark.createDataFrame(rdd, schema=schema)

people.show()

In [None]:
# create dataframe from Pandas dataframe
import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
# spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(5, 3), columns=["A", "B", "C"])
pdf

In [None]:

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
df.show()

In [None]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.toPandas()
result_pdf

In [None]:
# create dataframe from json file

# download json data
import requests

url = "https://github.com/liuhoward/teaching/raw/master/big_data/"
json_file = "people.json"

r = requests.get(url + json_file)
open(json_file, 'wb').write(r.content)


In [None]:
# create dataframe from json file

# define schema
data_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", StringType(), True)])

people = spark.read.json(json_file, schema=data_schema)

people.show()

In [None]:
# create dataframe from csv file
from pyspark.sql.types import *

transactions = spark.read.csv(blackfriday_file, sep=',', header=True, inferSchema=True)

transactions.show(n=5)

In [None]:
# show schema details, see datatype of columns
transactions.printSchema()

In [None]:
# Show first n observations
transactions.head(3)

In [None]:
transactions.take(2)

In [None]:
# Count the number of rows

transactions.count()

In [None]:
# How many columns do we have

len(transactions.columns), transactions.columns

In [None]:
# get the summary statistics (mean, standard deviance, min ,max, count) of numerical columns

people.describe().show()

In [None]:
# get the summary statistics for age column

people.describe('age').show()

In [None]:
# select column(s) from the DataFrame

transactions.select('User_ID','Age').show(5)

In [None]:
# find the number of distinct product

transactions.select('Product_ID').distinct().count()

In [None]:
# calculate pair wise frequency of categorical columns

transactions.crosstab('Age', 'Gender').show()

In [None]:
# get the DataFrame which won’t have duplicate rows

transactions.select('Age','Gender').dropDuplicates().show()


In [None]:
#  drop the all rows with null value
transactions.dropna().count()


In [None]:
# fill '-1' inplace of null values 

transactions.fillna("-1").show(5)

#transactions.fillna(-1).show(5)

In [None]:
# filter the rows which have Purchase more than 15000

transactions.filter(transactions.Purchase > 15000).select("User_ID", "Product_ID", "Purchase").show(3)

In [None]:
# grou py age, count transactions in each age group

transactions.groupby('Age').count().show()

In [None]:
# grou py age, find the mean of each age group, use aggregation

# We can also apply sum, min, max, count with groupby

transactions.groupby('Age').agg({'Purchase': 'mean'}).show()

In [None]:
# sort the DataFrame based on column(s)
# transactions.sort("Purchase", ascending=False).show(5)
transactions.orderBy(transactions.Purchase.desc()).show(5)

In [None]:
# add the new column in DataFrame
# withColumn():
#     Column name which we want add /replace.
#     Expression on column.

# ‘Purchase_new’ is calculated by dviding Purchase column by 2

transactions.withColumn('Purchase_new', transactions.Purchase /2.0).select('User_ID','Purchase','Purchase_new').show(5)

In [None]:
# change User_ID from IntegerType to StringType

from pyspark.sql.types import StringType

transactions.withColumn("User_ID_Str", transactions["User_ID"].cast(StringType())).printSchema()

In [None]:
# stop
spark.stop()

## Reference

[1] https://spark.apache.org/docs/latest/api/python/pyspark.sql.html  
[2] https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/  
[3] https://www.guru99.com/pyspark-tutorial.html  
[4] https://github.com/andfanilo/pyspark-tutorial  
[5] https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html  
[6] https://data-flair.training/blogs/spark-rdd-tutorial/  
[7] https://spark.apache.org/docs/latest/sql-getting-started.html

# End